vector.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. # 矢量数据API
  2. from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, BackgroundTasks
  3. from fastapi.responses import FileResponse
  4. from sqlalchemy.orm import Session
  5. from ..database import get_db
  6. from ..services import vector_service
  7. import os
  8. import shutil
  9. from typing import List
  10. router = APIRouter()
  11. @router.post("/import", summary="导入GeoJSON文件", description="将GeoJSON文件导入到数据库中")
  12. async def import_vector_data(file: UploadFile = File(...), db: Session = Depends(get_db)):
  13. """导入GeoJSON文件到数据库"""
  14. # 检查文件类型
  15. if not file.filename.endswith('.geojson'):
  16. raise HTTPException(status_code=400, detail="只支持GeoJSON文件")
  17. return await vector_service.import_vector_data(file, db)
  18. @router.get("/{vector_id}", summary="获取矢量数据", description="根据ID获取一条矢量数据记录")
  19. def get_vector_data(vector_id: int, db: Session = Depends(get_db)):
  20. """获取指定ID的矢量数据"""
  21. return vector_service.get_vector_data(db, vector_id)
  22. @router.get("/{vector_id}/export", summary="导出矢量数据", description="将指定ID的矢量数据导出为GeoJSON文件")
  23. async def export_vector_data(vector_id: int, db: Session = Depends(get_db)):
  24. """导出指定ID的矢量数据为GeoJSON文件"""
  25. result = vector_service.export_vector_data(db, vector_id)
  26. # 检查文件是否存在
  27. if not os.path.exists(result["file_path"]):
  28. if "temp_dir" in result and os.path.exists(result["temp_dir"]):
  29. shutil.rmtree(result["temp_dir"])
  30. raise HTTPException(status_code=404, detail="导出文件不存在")
  31. # 返回文件下载
  32. return FileResponse(
  33. path=result["file_path"],
  34. filename=os.path.basename(result["file_path"]),
  35. media_type="application/json",
  36. background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None
  37. )
  38. @router.post("/export/batch", summary="批量导出矢量数据", description="将多个ID的矢量数据批量导出为GeoJSON文件")
  39. async def export_vector_data_batch(vector_ids: List[int], db: Session = Depends(get_db)):
  40. """批量导出矢量数据为GeoJSON文件"""
  41. result = vector_service.export_vector_data_batch(db, vector_ids)
  42. # 检查文件是否存在
  43. if not os.path.exists(result["file_path"]):
  44. if "temp_dir" in result and os.path.exists(result["temp_dir"]):
  45. shutil.rmtree(result["temp_dir"])
  46. raise HTTPException(status_code=404, detail="导出文件不存在")
  47. # 返回文件下载
  48. return FileResponse(
  49. path=result["file_path"],
  50. filename=os.path.basename(result["file_path"]),
  51. media_type="application/json",
  52. background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None
  53. )