# 矢量数据API from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, BackgroundTasks from fastapi.responses import FileResponse from sqlalchemy.orm import Session from ..database import get_db from ..services import vector_service import os import shutil from typing import List router = APIRouter() @router.post("/import", summary="导入GeoJSON文件", description="将GeoJSON文件导入到数据库中") async def import_vector_data(file: UploadFile = File(...), db: Session = Depends(get_db)): """导入GeoJSON文件到数据库""" # 检查文件类型 if not file.filename.endswith('.geojson'): raise HTTPException(status_code=400, detail="只支持GeoJSON文件") return await vector_service.import_vector_data(file, db) @router.get("/{vector_id}", summary="获取矢量数据", description="根据ID获取一条矢量数据记录") def get_vector_data(vector_id: int, db: Session = Depends(get_db)): """获取指定ID的矢量数据""" return vector_service.get_vector_data(db, vector_id) @router.get("/{vector_id}/export", summary="导出矢量数据", description="将指定ID的矢量数据导出为GeoJSON文件") async def export_vector_data(vector_id: int, db: Session = Depends(get_db)): """导出指定ID的矢量数据为GeoJSON文件""" result = vector_service.export_vector_data(db, vector_id) # 检查文件是否存在 if not os.path.exists(result["file_path"]): if "temp_dir" in result and os.path.exists(result["temp_dir"]): shutil.rmtree(result["temp_dir"]) raise HTTPException(status_code=404, detail="导出文件不存在") # 返回文件下载 return FileResponse( path=result["file_path"], filename=os.path.basename(result["file_path"]), media_type="application/json", background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None ) @router.post("/export/batch", summary="批量导出矢量数据", description="将多个ID的矢量数据批量导出为GeoJSON文件") async def export_vector_data_batch(vector_ids: List[int], db: Session = Depends(get_db)): """批量导出矢量数据为GeoJSON文件""" result = vector_service.export_vector_data_batch(db, vector_ids) # 检查文件是否存在 if not os.path.exists(result["file_path"]): if "temp_dir" in result and os.path.exists(result["temp_dir"]): shutil.rmtree(result["temp_dir"]) raise HTTPException(status_code=404, detail="导出文件不存在") # 返回文件下载 return FileResponse( path=result["file_path"], filename=os.path.basename(result["file_path"]), media_type="application/json", background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None )