123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- import os
- import json
- import tempfile
- import shutil
- import logging
- import sys
- from pathlib import Path
- from typing import Optional, Dict, Any
- from fastapi import APIRouter, File, UploadFile, Form, HTTPException, BackgroundTasks
- from fastapi.responses import FileResponse, JSONResponse
- from app.services.cd_flux_service import FluxCdVisualizationService
- # 固定文件名
- DEFAULT_MAP_FILENAME = "fluxcd_input_map.jpg"
- DEFAULT_HIST_FILENAME = "fluxcd_input_histogram.jpg"
- DEFAULT_CSV_FILENAME = "fluxcd_input.csv"
- LATEST_RESULT_FILENAME = "latest_result.json"
- # 配置日志
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- handler = logging.StreamHandler()
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- router = APIRouter()
- def get_base_dir():
- """获取基础目录路径(与土地数据处理函数一致)"""
- if getattr(sys, 'frozen', False):
- # 打包后的可执行文件
- return os.path.dirname(sys.executable)
- else:
- # 脚本运行模式
- return os.path.dirname(os.path.abspath(__file__))
- def get_static_dir():
- """获取静态资源目录"""
- base_dir = get_base_dir()
- return os.path.join(base_dir, "..", "static", "cd_flux")
- STATIC_DIR = get_static_dir() # 静态目录,即static/cd_flux
- def get_default_file_path(filename: str) -> str:
- """获取默认文件路径"""
- static_dir = get_static_dir()
- path = os.path.join(static_dir, filename)
- return path
- @router.post("/calculate",
- summary="上传CSV文件更新数据并生成Cd通量可视化结果",
- description="上传包含镉通量数据的CSV文件,更新数据库,并生成空间分布图和直方图")
- async def update_and_generate_fluxcd_visualization(
- background_tasks: BackgroundTasks,
- csv_file: UploadFile = File(..., description="CSV文件,包含经纬度和通量数据"),
- boundary_shp: Optional[str] = Form(None, description="可选,边界SHP文件路径(如果使用默认则不提供)")
- ) -> Dict[str, Any]:
- try:
- logger.info("开始处理Cd通量数据更新和可视化")
- # 创建临时目录保存上传的CSV文件
- temp_dir = tempfile.mkdtemp()
- temp_csv_path = os.path.join(temp_dir, csv_file.filename)
- # 保存上传的文件
- with open(temp_csv_path, "wb") as f:
- content = await csv_file.read()
- f.write(content)
- # 初始化服务
- service = FluxCdVisualizationService()
- # 步骤1: 更新数据
- update_result = service.update_from_csv(temp_csv_path)
- if not update_result.get("success"):
- logger.error(f"更新数据失败: {update_result.get('message')}")
- raise HTTPException(status_code=500, detail=update_result.get('message'))
- # 步骤2: 生成可视化结果
- # 使用静态目录作为输出目录
- static_dir = get_static_dir()
- os.makedirs(static_dir, exist_ok=True)
- generation_result = service.generate_cd_input_flux_map(
- output_dir=static_dir,
- boundary_shp=boundary_shp
- )
- return generation_result
- except HTTPException as he:
- raise he
- except Exception as e:
- logger.error(f"处理过程中出错: {str(e)}", exc_info=True)
- raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}")
- finally:
- # 清理临时目录(使用后台任务)
- background_tasks.add_task(shutil.rmtree, temp_dir, ignore_errors=True)
- @router.get("/map",
- summary="获取Cd输入通量空间分布图",
- description="返回默认的Cd输入通量空间分布图")
- async def get_fluxcd_map() -> FileResponse:
- try:
- # 直接返回静态目录中的默认地图
- map_path = get_default_file_path("fluxcd_input_map.jpg")
- if os.path.exists(map_path):
- return FileResponse(map_path, media_type="image/jpeg")
- else:
- raise HTTPException(status_code=404, detail="默认地图不存在,请先生成")
- except Exception as e:
- logger.error(f"获取地图失败: {str(e)}")
- raise HTTPException(status_code=500, detail=f"获取地图失败: {str(e)}")
- @router.get("/histogram",
- summary="获取Cd输入通量直方图",
- description="返回默认的Cd输入通量直方图")
- async def get_fluxcd_histogram() -> FileResponse:
- try:
- # 直接返回静态目录中的默认直方图
- hist_path = get_default_file_path("fluxcd_input_histogram.jpg")
- if os.path.exists(hist_path):
- return FileResponse(hist_path, media_type="image/jpeg")
- else:
- raise HTTPException(status_code=404, detail="默认直方图不存在,请先生成")
- except Exception as e:
- logger.error(f"获取直方图失败: {str(e)}")
- raise HTTPException(status_code=500, detail=f"获取直方图失败: {str(e)}")
- @router.get("/statistics",
- summary="获取Cd输入通量统计信息",
- description="返回默认的Cd输入通量统计信息")
- @router.get("/statistics",
- summary="获取Cd输入通量统计信息",
- description="返回最新生成的Cd输入通量统计信息")
- async def get_fluxcd_statistics() -> JSONResponse:
- try:
- latest_result_path = os.path.join(STATIC_DIR, LATEST_RESULT_FILENAME)
- if not os.path.exists(latest_result_path):
- raise HTTPException(status_code=404, detail="无可用统计信息,请先生成")
- with open(latest_result_path, "r", encoding="utf-8") as f:
- result = json.load(f)
- if not result.get("success"):
- raise HTTPException(status_code=404, detail="结果无效")
- stats = result.get("data", {}).get("statistics")
- if stats:
- return JSONResponse(content=stats)
- else:
- raise HTTPException(status_code=404, detail="统计信息不存在")
- except Exception as e:
- logger.error(f"获取统计信息失败: {str(e)}")
- raise HTTPException(status_code=500, detail=f"获取统计信息失败: {str(e)}")
- @router.get("/export-csv",
- summary="导出fluxcd_input.csv文件",
- description="导出默认的fluxcd_input.csv文件")
- async def export_fluxcd_csv() -> FileResponse:
- try:
- # 直接返回静态目录中的CSV文件
- csv_path = get_default_file_path("fluxcd_input.csv")
- if os.path.exists(csv_path):
- return FileResponse(
- csv_path,
- filename="fluxcd_input.csv",
- media_type="text/csv"
- )
- else:
- raise HTTPException(status_code=404, detail="CSV文件不存在,请先生成")
- except Exception as e:
- logger.error(f"导出CSV失败: {str(e)}")
- raise HTTPException(status_code=500, detail=f"导出CSV失败: {str(e)}")
|