""" Cd通量移除计算API接口 @description: 提供籽粒移除和秸秆移除的Cd通量计算功能 """ from fastapi import APIRouter, HTTPException, Query, Path from fastapi.responses import FileResponse from pydantic import BaseModel, Field from typing import Dict, Any, Optional import logging import os from ..services.cd_flux_removal_service import CdFluxRemovalService router = APIRouter() # ============================================================================= # 数据模型定义 # ============================================================================= class CdFluxRemovalResponse(BaseModel): """ Cd通量移除计算响应模型 @description: 标准化的API响应格式 """ success: bool = Field(..., description="是否成功") message: str = Field(..., description="响应消息") data: Optional[Dict[str, Any]] = Field(None, description="计算结果数据") class VisualizationResponse(BaseModel): """ 可视化结果响应模型 @description: 绘图接口的响应格式 """ success: bool = Field(..., description="是否成功") message: str = Field(..., description="响应消息") data: Optional[Dict[str, Any]] = Field(None, description="可视化结果数据") files: Optional[Dict[str, str]] = Field(None, description="生成的文件路径") # 设置日志 logger = logging.getLogger(__name__) # ============================================================================= # Cd通量移除计算接口 # ============================================================================= @router.get("/grain-removal", summary="计算籽粒移除Cd通量", description="根据指定地区计算籽粒移除Cd通量,公式:EXP(LnCropCd)*F11*0.5*15/1000", response_model=CdFluxRemovalResponse) async def calculate_grain_removal( area: str = Query(..., description="地区名称,如:韶关") ) -> Dict[str, Any]: """ 计算籽粒移除Cd通量 @param area: 地区名称 @returns: 籽粒移除Cd通量计算结果 计算公式:籽粒移除(g/ha/a) = EXP(LnCropCd) * F11 * 0.5 * 15 / 1000 数据来源: - LnCropCd: CropCd_output_data表 - F11: Parameters表(作物亩产量) """ try: service = CdFluxRemovalService() result = service.calculate_grain_removal_by_area(area) if not result["success"]: raise HTTPException( status_code=404, detail=result["message"] ) return result except HTTPException: raise except Exception as e: logger.error(f"计算地区 '{area}' 的籽粒移除Cd通量失败: {str(e)}") raise HTTPException( status_code=500, detail=f"计算失败: {str(e)}" ) @router.get("/straw-removal", summary="计算秸秆移除Cd通量", description="根据指定地区计算秸秆移除Cd通量,公式:[EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)]*F11*0.5*15/1000", response_model=CdFluxRemovalResponse) async def calculate_straw_removal( area: str = Query(..., description="地区名称,如:韶关") ) -> Dict[str, Any]: """ 计算秸秆移除Cd通量 @param area: 地区名称 @returns: 秸秆移除Cd通量计算结果 计算公式:秸秆移除(g/ha/a) = [EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000 数据来源: - LnCropCd: CropCd_output_data表 - F11: Parameters表(作物亩产量) """ try: service = CdFluxRemovalService() result = service.calculate_straw_removal_by_area(area) if not result["success"]: raise HTTPException( status_code=404, detail=result["message"] ) return result except HTTPException: raise except Exception as e: logger.error(f"计算地区 '{area}' 的秸秆移除Cd通量失败: {str(e)}") raise HTTPException( status_code=500, detail=f"计算失败: {str(e)}" ) # ============================================================================= # Cd通量移除可视化接口 # ============================================================================= @router.get("/grain-removal/visualize", summary="生成籽粒移除Cd通量可视化图表", description="计算籽粒移除Cd通量并生成栅格地图并返回图片文件") async def visualize_grain_removal( area: str = Query(..., description="地区名称,如:韶关"), level: str = Query(..., description="行政层级,必须为: county | city | province"), colormap: str = Query("green_yellow_red_purple", description="色彩方案"), resolution_factor: float = Query(4.0, description="分辨率因子(默认4.0,更快)"), enable_interpolation: bool = Query(False, description="是否启用空间插值(默认关闭以提升性能)"), cleanup_intermediate: bool = Query(True, description="是否清理中间文件(默认是)") ): """ 生成籽粒移除Cd通量可视化图表 @param area: 地区名称 @returns: 栅格地图文件 功能包括: 1. 计算籽粒移除Cd通量 2. 生成栅格地图 3. 直接返回图片文件 """ try: service = CdFluxRemovalService() # 行政层级校验(不允许模糊) if level not in ("county", "city", "province"): raise HTTPException(status_code=400, detail="参数 level 必须为 'county' | 'city' | 'province'") # 计算籽粒移除Cd通量(传入严格层级以便参数表查找做后缀标准化精确匹配) calc_result = service.calculate_grain_removal_by_area(area, level=level) if not calc_result["success"]: raise HTTPException( status_code=404, detail=calc_result["message"] ) # 获取包含坐标的结果数据 results_with_coords = service.get_coordinates_for_results(calc_result["data"]) if not results_with_coords: raise HTTPException( status_code=404, detail=f"未找到地区 '{area}' 的坐标数据,无法生成可视化" ) # 创建可视化 visualization_files = service.create_flux_visualization( area=area, level=level, calculation_type="grain_removal", results_with_coords=results_with_coords, colormap=colormap, resolution_factor=resolution_factor, enable_interpolation=enable_interpolation, cleanup_intermediate=cleanup_intermediate ) # 检查地图文件是否生成成功 map_file = visualization_files.get("map") if not map_file or not os.path.exists(map_file): raise HTTPException(status_code=500, detail="地图文件生成失败") return FileResponse( path=map_file, filename=f"{area}_grain_removal_cd_flux_map.jpg", media_type="image/jpeg" ) except HTTPException: raise except Exception as e: logger.error(f"生成地区 '{area}' 的籽粒移除Cd通量可视化失败: {str(e)}") raise HTTPException( status_code=500, detail=f"可视化生成失败: {str(e)}" ) @router.get("/straw-removal/visualize", summary="生成秸秆移除Cd通量可视化图表", description="计算秸秆移除Cd通量并生成栅格地图并返回图片文件") async def visualize_straw_removal( area: str = Query(..., description="地区名称,如:韶关"), level: str = Query(..., description="行政层级,必须为: county | city | province"), colormap: str = Query("green_yellow_red_purple", description="色彩方案"), resolution_factor: float = Query(4.0, description="分辨率因子(默认4.0,更快)"), enable_interpolation: bool = Query(False, description="是否启用空间插值(默认关闭以提升性能)"), cleanup_intermediate: bool = Query(True, description="是否清理中间文件(默认是)") ): """ 生成秸秆移除Cd通量可视化图表 @param area: 地区名称 @returns: 栅格地图文件 功能包括: 1. 计算秸秆移除Cd通量 2. 生成栅格地图 3. 直接返回图片文件 """ try: service = CdFluxRemovalService() # 行政层级校验(不允许模糊) if level not in ("county", "city", "province"): raise HTTPException(status_code=400, detail="参数 level 必须为 'county' | 'city' | 'province'") # 计算秸秆移除Cd通量(传入严格层级以便参数表查找做后缀标准化精确匹配) calc_result = service.calculate_straw_removal_by_area(area, level=level) if not calc_result["success"]: raise HTTPException( status_code=404, detail=calc_result["message"] ) # 获取包含坐标的结果数据 results_with_coords = service.get_coordinates_for_results(calc_result["data"]) if not results_with_coords: raise HTTPException( status_code=404, detail=f"未找到地区 '{area}' 的坐标数据,无法生成可视化" ) # 创建可视化 visualization_files = service.create_flux_visualization( area=area, level=level, calculation_type="straw_removal", results_with_coords=results_with_coords, colormap=colormap, resolution_factor=resolution_factor, enable_interpolation=enable_interpolation, cleanup_intermediate=cleanup_intermediate ) # 检查地图文件是否生成成功 map_file = visualization_files.get("map") if not map_file or not os.path.exists(map_file): raise HTTPException(status_code=500, detail="地图文件生成失败") return FileResponse( path=map_file, filename=f"{area}_straw_removal_cd_flux_map.jpg", media_type="image/jpeg" ) except HTTPException: raise except Exception as e: logger.error(f"生成地区 '{area}' 的秸秆移除Cd通量可视化失败: {str(e)}") raise HTTPException( status_code=500, detail=f"可视化生成失败: {str(e)}" ) @router.get("/export-data", summary="导出Cd通量移除计算数据", description="导出籽粒移除或秸秆移除的计算结果为CSV文件", response_model=CdFluxRemovalResponse) async def export_flux_data( area: str = Query(..., description="地区名称,如:韶关"), calculation_type: str = Query(..., description="计算类型:grain_removal 或 straw_removal") ) -> Dict[str, Any]: """ 导出Cd通量移除计算数据 @param area: 地区名称 @param calculation_type: 计算类型(grain_removal 或 straw_removal) @returns: 导出结果和文件路径 """ try: if calculation_type not in ["grain_removal", "straw_removal"]: raise HTTPException( status_code=400, detail="计算类型必须是 'grain_removal' 或 'straw_removal'" ) service = CdFluxRemovalService() # 根据类型计算相应的Cd通量 if calculation_type == "grain_removal": calc_result = service.calculate_grain_removal_by_area(area) else: calc_result = service.calculate_straw_removal_by_area(area) if not calc_result["success"]: raise HTTPException( status_code=404, detail=calc_result["message"] ) # 导出数据 csv_path = service.export_results_to_csv(calc_result["data"]) return { "success": True, "message": f"地区 '{area}' 的 {calculation_type} 数据导出成功", "data": { "area": area, "calculation_type": calculation_type, "exported_file": csv_path, "total_records": len(calc_result["data"]["results"]), "statistics": calc_result["data"]["statistics"] } } except HTTPException: raise except Exception as e: logger.error(f"导出地区 '{area}' 的 {calculation_type} 数据失败: {str(e)}") raise HTTPException( status_code=500, detail=f"数据导出失败: {str(e)}" )