vector.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # 矢量数据API
  2. from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, BackgroundTasks, Query
  3. from fastapi.responses import FileResponse
  4. from sqlalchemy.orm import Session
  5. from ..database import get_db
  6. from ..services import vector_service
  7. from ..services.admin_boundary_service import get_boundary_geojson_by_name
  8. import os
  9. import shutil
  10. from typing import List, Optional
  11. router = APIRouter()
  12. @router.post("/import", summary="导入GeoJSON文件", description="将GeoJSON文件导入到数据库中")
  13. async def import_vector_data(file: UploadFile = File(...), db: Session = Depends(get_db)):
  14. """导入GeoJSON文件到数据库"""
  15. # 检查文件类型
  16. if not file.filename.endswith('.geojson'):
  17. raise HTTPException(status_code=400, detail="只支持GeoJSON文件")
  18. return await vector_service.import_vector_data(file, db)
  19. @router.get("/boundary", summary="按名称获取边界", description="按县/市/省名称获取边界GeoJSON")
  20. def get_boundary(name: str = Query(...), level: str = Query("auto"), db: Session = Depends(get_db)):
  21. """按名称和层级获取边界。level: county/city/province/auto"""
  22. try:
  23. return get_boundary_geojson_by_name(db, name, level)
  24. except Exception as e:
  25. raise HTTPException(status_code=404, detail=str(e))
  26. @router.get("/{vector_id}", summary="获取矢量数据", description="根据ID获取一条矢量数据记录")
  27. def get_vector_data(vector_id: int, db: Session = Depends(get_db)):
  28. """获取指定ID的矢量数据"""
  29. return vector_service.get_vector_data(db, vector_id)
  30. @router.get("/{vector_id}/export", summary="导出矢量数据", description="将指定ID的矢量数据导出为GeoJSON文件")
  31. async def export_vector_data(vector_id: int, db: Session = Depends(get_db)):
  32. """导出指定ID的矢量数据为GeoJSON文件"""
  33. result = vector_service.export_vector_data(db, vector_id)
  34. # 检查文件是否存在
  35. if not os.path.exists(result["file_path"]):
  36. if "temp_dir" in result and os.path.exists(result["temp_dir"]):
  37. shutil.rmtree(result["temp_dir"])
  38. raise HTTPException(status_code=404, detail="导出文件不存在")
  39. # 返回文件下载
  40. return FileResponse(
  41. path=result["file_path"],
  42. filename=os.path.basename(result["file_path"]),
  43. media_type="application/json",
  44. background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None
  45. )
  46. @router.get("/export/all", summary="导出矢量数据", description="将指定的矢量数据表导出为GeoJSON或JSON文件")
  47. async def export_all_vector_data_api(table_name: str = "vector_data", db: Session = Depends(get_db),format:Optional[str] = "geojson"):
  48. """导出指定矢量数据表为GeoJSON文件或普通JSON文件
  49. Args:
  50. table_name (str): 要导出的矢量数据表名,默认为'vector_data'
  51. db (Session): 数据库会话
  52. format (str,optional):导出格式支持'geojson'和'json',默认为'geojson'
  53. Returns:
  54. FileResponse: GeoJSON文件下载响应
  55. """
  56. #验证格式参数
  57. if format not in ["geojson" , "json"]:
  58. raise HTTPException(
  59. status_code=400,
  60. detail="只支持'geojson'和'json'格式"
  61. )
  62. result = vector_service.export_all_vector_data(db, table_name,export_format=format)#传递导出格式
  63. # 检查文件是否存在
  64. if not os.path.exists(result["file_path"]):
  65. if "temp_dir" in result and os.path.exists(result["temp_dir"]):
  66. shutil.rmtree(result["temp_dir"])
  67. raise HTTPException(status_code=404, detail="导出文件不存在")
  68. #根据格式设置正确的媒体类型
  69. media_type = "application/geo+json" if format == "geojson" else "application/json"
  70. # 返回文件下载
  71. return FileResponse(
  72. path=result["file_path"],
  73. filename=os.path.basename(result["file_path"]),
  74. media_type=media_type,
  75. background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None
  76. )
  77. @router.post("/export/batch", summary="批量导出矢量数据", description="将多个ID的矢量数据批量导出为GeoJSON文件")
  78. async def export_vector_data_batch(vector_ids: List[int], db: Session = Depends(get_db)):
  79. """批量导出矢量数据为GeoJSON文件"""
  80. result = vector_service.export_vector_data_batch(db, vector_ids)
  81. # 检查文件是否存在
  82. if not os.path.exists(result["file_path"]):
  83. if "temp_dir" in result and os.path.exists(result["temp_dir"]):
  84. shutil.rmtree(result["temp_dir"])
  85. raise HTTPException(status_code=404, detail="导出文件不存在")
  86. # 返回文件下载
  87. return FileResponse(
  88. path=result["file_path"],
  89. filename=os.path.basename(result["file_path"]),
  90. media_type="application/json",
  91. background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None
  92. )