123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- # 矢量数据API
- from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, BackgroundTasks
- from fastapi.responses import FileResponse
- from sqlalchemy.orm import Session
- from ..database import get_db
- from ..services import vector_service
- import os
- import shutil
- from typing import List
- router = APIRouter()
- @router.post("/import", summary="导入GeoJSON文件", description="将GeoJSON文件导入到数据库中")
- async def import_vector_data(file: UploadFile = File(...), db: Session = Depends(get_db)):
- """导入GeoJSON文件到数据库"""
- # 检查文件类型
- if not file.filename.endswith('.geojson'):
- raise HTTPException(status_code=400, detail="只支持GeoJSON文件")
-
- return await vector_service.import_vector_data(file, db)
- @router.get("/{vector_id}", summary="获取矢量数据", description="根据ID获取一条矢量数据记录")
- def get_vector_data(vector_id: int, db: Session = Depends(get_db)):
- """获取指定ID的矢量数据"""
- return vector_service.get_vector_data(db, vector_id)
- @router.get("/{vector_id}/export", summary="导出矢量数据", description="将指定ID的矢量数据导出为GeoJSON文件")
- async def export_vector_data(vector_id: int, db: Session = Depends(get_db)):
- """导出指定ID的矢量数据为GeoJSON文件"""
- result = vector_service.export_vector_data(db, vector_id)
-
- # 检查文件是否存在
- if not os.path.exists(result["file_path"]):
- if "temp_dir" in result and os.path.exists(result["temp_dir"]):
- shutil.rmtree(result["temp_dir"])
- raise HTTPException(status_code=404, detail="导出文件不存在")
-
- # 返回文件下载
- return FileResponse(
- path=result["file_path"],
- filename=os.path.basename(result["file_path"]),
- media_type="application/json",
- background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None
- )
- @router.get("/export/all", summary="导出矢量数据", description="将指定的矢量数据表导出为GeoJSON文件")
- async def export_all_vector_data_api(table_name: str = "vector_data", db: Session = Depends(get_db)):
- """导出指定矢量数据表为GeoJSON文件
-
- Args:
- table_name (str): 要导出的矢量数据表名,默认为'vector_data'
- db (Session): 数据库会话
-
- Returns:
- FileResponse: GeoJSON文件下载响应
- """
- result = vector_service.export_all_vector_data(db, table_name)
-
- # 检查文件是否存在
- if not os.path.exists(result["file_path"]):
- if "temp_dir" in result and os.path.exists(result["temp_dir"]):
- shutil.rmtree(result["temp_dir"])
- raise HTTPException(status_code=404, detail="导出文件不存在")
-
- # 返回文件下载
- return FileResponse(
- path=result["file_path"],
- filename=os.path.basename(result["file_path"]),
- media_type="application/json",
- background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None
- )
- @router.post("/export/batch", summary="批量导出矢量数据", description="将多个ID的矢量数据批量导出为GeoJSON文件")
- async def export_vector_data_batch(vector_ids: List[int], db: Session = Depends(get_db)):
- """批量导出矢量数据为GeoJSON文件"""
- result = vector_service.export_vector_data_batch(db, vector_ids)
-
- # 检查文件是否存在
- if not os.path.exists(result["file_path"]):
- if "temp_dir" in result and os.path.exists(result["temp_dir"]):
- shutil.rmtree(result["temp_dir"])
- raise HTTPException(status_code=404, detail="导出文件不存在")
-
- # 返回文件下载
- return FileResponse(
- path=result["file_path"],
- filename=os.path.basename(result["file_path"]),
- media_type="application/json",
- background=BackgroundTasks().add_task(shutil.rmtree, result["temp_dir"]) if "temp_dir" in result else None
- )
|