Browse Source

提供农产品投入Cd通量计算&统计接口;
Cd通量移除计算初步接口提供

drggboy 1 week ago
parent
commit
976caa7879

+ 262 - 0
app/api/agricultural_input.py

@@ -0,0 +1,262 @@
+"""
+农业投入Cd通量计算API接口
+@description: 提供各个地区的农业投入输入Cd通量预测计算功能
+"""
+
+from fastapi import APIRouter, HTTPException, Query, Body
+from pydantic import BaseModel, Field
+from typing import Dict, Any, Optional
+import logging
+from ..services.agricultural_input_service import AgriculturalInputService
+
+router = APIRouter()
+
+# =============================================================================
+# 数据模型定义
+# =============================================================================
+
+class CdFluxCalculationRequest(BaseModel):
+    """
+    农业投入Cd通量计算请求模型
+    
+    @description: 用户提供的计算参数
+    """
+    # 镉含量参数 (mg/kg)
+    f3_nitrogen_cd_content: float = Field(..., description="氮肥镉含量平均值 (mg/kg)", ge=0)
+    f4_phosphorus_cd_content: float = Field(..., description="磷肥镉含量平均值 (mg/kg)", ge=0)
+    f5_potassium_cd_content: float = Field(..., description="钾肥镉含量平均值 (mg/kg)", ge=0)
+    f6_compound_cd_content: float = Field(..., description="复合肥镉含量平均值 (mg/kg)", ge=0)
+    f7_organic_cd_content: float = Field(..., description="有机肥镉含量平均值 (mg/kg)", ge=0)
+    f8_pesticide_cd_content: float = Field(..., description="农药镉含量 (mg/kg)", ge=0)
+    f9_farmyard_cd_content: float = Field(..., description="农家肥镉含量 (mg/kg)", ge=0)
+    f10_film_cd_content: float = Field(..., description="农膜镉含量 (mg/kg)", ge=0)
+    
+    # 使用量参数 (t/ha/a)
+    nf_nitrogen_usage: float = Field(..., description="氮肥单位面积使用量 (t/ha/a)", ge=0)
+    pf_phosphorus_usage: float = Field(..., description="磷肥单位面积使用量 (t/ha/a)", ge=0)
+    kf_potassium_usage: float = Field(..., description="钾肥单位面积使用量 (t/ha/a)", ge=0)
+    cf_compound_usage: float = Field(..., description="复合肥单位面积使用量 (t/ha/a)", ge=0)
+    of_organic_usage: float = Field(..., description="有机肥单位面积使用量 (t/ha/a)", ge=0)
+    p_pesticide_usage: float = Field(..., description="农药单位面积使用量 (t/ha/a)", ge=0)
+    ff_farmyard_usage: float = Field(..., description="农家肥单位面积使用量 (t/ha/a)", ge=0)
+    af_film_usage: float = Field(..., description="农膜(存留)单位面积使用量 (t/ha/a)", ge=0)
+    
+    # 可选的标识信息
+    description: Optional[str] = Field(None, description="计算描述信息")
+
+    model_config = {
+        "json_schema_extra": {
+            "example": {
+                "f3_nitrogen_cd_content": 0.12,
+                "f4_phosphorus_cd_content": 0.85,
+                "f5_potassium_cd_content": 0.05,
+                "f6_compound_cd_content": 0.45,
+                "f7_organic_cd_content": 0.22,
+                "f8_pesticide_cd_content": 0.08,
+                "f9_farmyard_cd_content": 0.15,
+                "f10_film_cd_content": 0.03,
+                "nf_nitrogen_usage": 0.25,
+                "pf_phosphorus_usage": 0.15,
+                "kf_potassium_usage": 0.12,
+                "cf_compound_usage": 0.30,
+                "of_organic_usage": 2.50,
+                "p_pesticide_usage": 0.02,
+                "ff_farmyard_usage": 1.80,
+                "af_film_usage": 0.05,
+                "description": "测试地区农业投入Cd通量计算"
+            }
+        }
+    }
+
+# 设置日志
+logger = logging.getLogger(__name__)
+
+# =============================================================================
+# 农业投入Cd通量计算接口
+# =============================================================================
+
+@router.get("/calculate-by-area",
+           summary="根据地区计算农业投入Cd通量",
+           description="根据指定地区从Parameters表中获取数据并计算农业投入输入Cd通量")
+async def calculate_cd_flux_by_area(
+    area: str = Query(..., description="地区名称,如:韶关")
+) -> Dict[str, Any]:
+    """
+    根据地区计算农业投入输入Cd通量
+    
+    @param area: 地区名称
+    @returns: 计算结果包括总通量和各项明细
+    
+    计算公式:农业投入输入Cd(g/ha/a) = F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
+    """
+    try:
+        service = AgriculturalInputService()
+        result = service.calculate_cd_flux_by_area(area)
+        
+        if not result["success"]:
+            raise HTTPException(
+                status_code=404, 
+                detail=result["message"]
+            )
+        
+        return result
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"计算地区 '{area}' 的Cd通量失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"计算失败: {str(e)}"
+        )
+
+@router.get("/calculate-all-areas",
+           summary="计算所有地区的农业投入Cd通量",
+           description="计算数据库中所有地区的农业投入输入Cd通量,并提供统计汇总")
+async def calculate_all_areas_cd_flux() -> Dict[str, Any]:
+    """
+    计算所有地区的农业投入输入Cd通量
+    
+    @returns: 所有地区的计算结果,按通量从高到低排序,包含统计汇总
+    """
+    try:
+        service = AgriculturalInputService()
+        result = service.calculate_all_areas_cd_flux()
+        
+        if not result["success"]:
+            raise HTTPException(
+                status_code=500, 
+                detail=result["message"]
+            )
+        
+        return result
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"计算所有地区Cd通量失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"计算失败: {str(e)}"
+        )
+
+@router.get("/available-areas",
+           summary="获取可用地区列表",
+           description="获取数据库中所有可用于计算的地区列表")
+async def get_available_areas() -> Dict[str, Any]:
+    """
+    获取数据库中所有可用的地区列表
+    
+    @returns: 可用地区列表
+    """
+    try:
+        service = AgriculturalInputService()
+        result = service.get_available_areas()
+        
+        if not result["success"]:
+            raise HTTPException(
+                status_code=500, 
+                detail=result["message"]
+            )
+        
+        return result
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"获取可用地区列表失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"获取失败: {str(e)}"
+        )
+
+@router.post("/calculate-with-custom-data",
+            summary="使用自定义数据计算农业投入Cd通量",
+            description="接收用户提供的参数数据,直接进行农业投入输入Cd通量计算")
+async def calculate_cd_flux_with_custom_data(
+    request: CdFluxCalculationRequest = Body(..., description="计算所需的参数数据")
+) -> Dict[str, Any]:
+    """
+    使用用户提供的自定义数据计算农业投入输入Cd通量
+    
+    @param request: 包含所有计算参数的请求体
+    @returns: 计算结果包括总通量和各项明细
+    
+    计算公式:农业投入输入Cd(g/ha/a) = F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
+    """
+    try:
+        service = AgriculturalInputService()
+        result = service.calculate_cd_flux_with_custom_data(request)
+        
+        if not result["success"]:
+            raise HTTPException(
+                status_code=400, 
+                detail=result["message"]
+            )
+        
+        return result
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"使用自定义数据计算Cd通量失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"计算失败: {str(e)}"
+        )
+
+@router.get("/calculation-formula",
+           summary="获取计算公式说明",
+           description="获取农业投入Cd通量的计算公式和参数说明")
+async def get_calculation_formula() -> Dict[str, Any]:
+    """
+    获取农业投入Cd通量的计算公式和参数说明
+    
+    @returns: 公式和参数详细说明
+    """
+    try:
+        formula_info = {
+            "success": True,
+            "message": "农业投入Cd通量计算公式",
+            "data": {
+                "formula": "农业投入输入Cd(g/ha/a) = F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF",
+                "unit": "g/ha/a",
+                "parameters": {
+                    "F3": "氮肥镉含量平均值(mg/kg)",
+                    "F4": "磷肥镉含量平均值(mg/kg)",
+                    "F5": "钾肥镉含量平均值(mg/kg)",
+                    "F6": "复合肥镉含量平均值(mg/kg)",
+                    "F7": "有机肥镉含量平均值(mg/kg)",
+                    "F8": "农药镉含量(mg/kg)",
+                    "F9": "农家肥镉含量(mg/kg)",
+                    "F10": "农膜镉含量(mg/kg)",
+                    "NF": "氮肥单位面积使用量(t/ha/a)",
+                    "PF": "磷肥单位面积使用量(t/ha/a)",
+                    "KF": "钾肥单位面积使用量(t/ha/a)",
+                    "CF": "复合肥单位面积使用量(t/ha/a)",
+                    "OF": "有机肥单位面积使用量(t/ha/a)",
+                    "P": "农药单位面积使用量(t/ha/a)",
+                    "FF": "农家肥单位面积使用量(t/ha/a)",
+                    "AF": "农膜(存留)单位面积使用量(t/ha/a)"
+                },
+                "components": {
+                    "nitrogen_fertilizer": "氮肥贡献量 = F3 × NF",
+                    "phosphorus_fertilizer": "磷肥贡献量 = F4 × PF",
+                    "potassium_fertilizer": "钾肥贡献量 = F5 × KF",
+                    "compound_fertilizer": "复合肥贡献量 = F6 × CF",
+                    "organic_fertilizer": "有机肥贡献量 = F7 × OF",
+                    "pesticide": "农药贡献量 = F8 × P",
+                    "farmyard_manure": "农家肥贡献量 = F9 × FF",
+                    "agricultural_film": "农膜贡献量 = F10 × AF"
+                }
+            }
+        }
+        
+        return formula_info
+        
+    except Exception as e:
+        logger.error(f"获取计算公式失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"获取失败: {str(e)}"
+        ) 

+ 195 - 0
app/api/cd_flux_removal.py

@@ -0,0 +1,195 @@
+"""
+Cd通量移除计算API接口
+@description: 提供籽粒移除和秸秆移除的Cd通量计算功能
+"""
+
+from fastapi import APIRouter, HTTPException, Query, Path
+from pydantic import BaseModel, Field
+from typing import Dict, Any, Optional
+import logging
+from ..services.cd_flux_removal_service import CdFluxRemovalService
+
+router = APIRouter()
+
+# =============================================================================
+# 数据模型定义
+# =============================================================================
+
+class CdFluxRemovalResponse(BaseModel):
+    """
+    Cd通量移除计算响应模型
+    
+    @description: 标准化的API响应格式
+    """
+    success: bool = Field(..., description="是否成功")
+    message: str = Field(..., description="响应消息")
+    data: Optional[Dict[str, Any]] = Field(None, description="计算结果数据")
+
+
+# 设置日志
+logger = logging.getLogger(__name__)
+
+# =============================================================================
+# Cd通量移除计算接口
+# =============================================================================
+
+@router.get("/grain-removal",
+           summary="计算籽粒移除Cd通量",
+           description="根据指定地区计算籽粒移除Cd通量,公式:EXP(LnCropCd)*F11*0.5*15/1000",
+           response_model=CdFluxRemovalResponse)
+async def calculate_grain_removal(
+    area: str = Query(..., description="地区名称,如:韶关")
+) -> Dict[str, Any]:
+    """
+    计算籽粒移除Cd通量
+    
+    @param area: 地区名称
+    @returns: 籽粒移除Cd通量计算结果
+    
+    计算公式:籽粒移除(g/ha/a) = EXP(LnCropCd) * F11 * 0.5 * 15 / 1000
+    数据来源:
+    - LnCropCd: CropCd_output_data表
+    - F11: Parameters表(作物亩产量)
+    """
+    try:
+        service = CdFluxRemovalService()
+        result = service.calculate_grain_removal_by_area(area)
+        
+        if not result["success"]:
+            raise HTTPException(
+                status_code=404, 
+                detail=result["message"]
+            )
+        
+        return result
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"计算地区 '{area}' 的籽粒移除Cd通量失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"计算失败: {str(e)}"
+        )
+
+
+@router.get("/straw-removal",
+           summary="计算秸秆移除Cd通量",
+           description="根据指定地区计算秸秆移除Cd通量,公式:[EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)]*F11*0.5*15/1000",
+           response_model=CdFluxRemovalResponse)
+async def calculate_straw_removal(
+    area: str = Query(..., description="地区名称,如:韶关")
+) -> Dict[str, Any]:
+    """
+    计算秸秆移除Cd通量
+    
+    @param area: 地区名称
+    @returns: 秸秆移除Cd通量计算结果
+    
+    计算公式:秸秆移除(g/ha/a) = [EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000
+    数据来源:
+    - LnCropCd: CropCd_output_data表
+    - F11: Parameters表(作物亩产量)
+    """
+    try:
+        service = CdFluxRemovalService()
+        result = service.calculate_straw_removal_by_area(area)
+        
+        if not result["success"]:
+            raise HTTPException(
+                status_code=404, 
+                detail=result["message"]
+            )
+        
+        return result
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"计算地区 '{area}' 的秸秆移除Cd通量失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"计算失败: {str(e)}"
+        )
+
+
+@router.get("/both-removals",
+           summary="同时计算籽粒移除和秸秆移除Cd通量",
+           description="根据指定地区同时计算籽粒移除和秸秆移除Cd通量",
+           response_model=CdFluxRemovalResponse)
+async def calculate_both_removals(
+    area: str = Query(..., description="地区名称,如:韶关")
+) -> Dict[str, Any]:
+    """
+    同时计算籽粒移除和秸秆移除Cd通量
+    
+    @param area: 地区名称
+    @returns: 包含籽粒移除和秸秆移除Cd通量的计算结果
+    
+    计算公式:
+    - 籽粒移除(g/ha/a) = EXP(LnCropCd) * F11 * 0.5 * 15 / 1000
+    - 秸秆移除(g/ha/a) = [EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000
+    """
+    try:
+        service = CdFluxRemovalService()
+        result = service.calculate_both_removals_by_area(area)
+        
+        if not result["success"]:
+            raise HTTPException(
+                status_code=404, 
+                detail=result["message"]
+            )
+        
+        return result
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"计算地区 '{area}' 的Cd通量移除失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"计算失败: {str(e)}"
+        )
+
+
+@router.get("/sample/{farmland_id}/{sample_id}",
+           summary="计算特定样点的Cd通量移除",
+           description="根据特定农地ID和样点ID计算籽粒移除和秸秆移除Cd通量",
+           response_model=CdFluxRemovalResponse)
+async def calculate_removal_by_sample(
+    farmland_id: int = Path(..., description="农地ID", ge=1),
+    sample_id: int = Path(..., description="样点ID", ge=1),
+    area: str = Query(..., description="地区名称,如:韶关")
+) -> Dict[str, Any]:
+    """
+    计算特定样点的Cd通量移除
+    
+    @param farmland_id: 农地ID
+    @param sample_id: 样点ID
+    @param area: 地区名称
+    @returns: 特定样点的籽粒移除和秸秆移除Cd通量计算结果
+    
+    计算公式:
+    - 籽粒移除(g/ha/a) = EXP(LnCropCd) * F11 * 0.5 * 15 / 1000
+    - 秸秆移除(g/ha/a) = [EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000
+    """
+    try:
+        service = CdFluxRemovalService()
+        result = service.calculate_removal_by_sample(farmland_id, sample_id, area)
+        
+        if not result["success"]:
+            raise HTTPException(
+                status_code=404, 
+                detail=result["message"]
+            )
+        
+        return result
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"计算样点 {farmland_id}-{sample_id} 的Cd通量移除失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"计算失败: {str(e)}"
+        ) 

+ 11 - 3
app/main.py

@@ -1,6 +1,5 @@
 from fastapi import FastAPI
-import app.api.water
-from .api import vector, raster, cd_prediction, unit_grouping, water
+from .api import vector, raster, cd_prediction, unit_grouping, water, agricultural_input, cd_flux_removal
 from .database import engine, Base
 from fastapi.middleware.cors import CORSMiddleware
 import logging
@@ -100,6 +99,14 @@ app = FastAPI(
         {
             "name": "water",
             "description": "灌溉水模型相关接口",
+        },
+        {
+            "name": "agricultural-input",
+            "description": "农业投入Cd通量计算相关接口",
+        },
+        {
+            "name": "cd-flux-removal",
+            "description": "Cd通量移除计算相关接口",
         }
     ]
 )
@@ -121,7 +128,8 @@ app.include_router(raster.router, prefix="/api/raster", tags=["raster"])
 app.include_router(cd_prediction.router, prefix="/api/cd-prediction", tags=["cd-prediction"])
 app.include_router(unit_grouping.router, prefix="/api/unit-grouping", tags=["unit-grouping"])
 app.include_router(water.router, prefix="/api/water", tags=["water"])
-
+app.include_router(agricultural_input.router, prefix="/api/agricultural-input", tags=["agricultural-input"])
+app.include_router(cd_flux_removal.router, prefix="/api/cd-flux-removal", tags=["cd-flux-removal"])
 
 @app.get("/")
 async def root():

+ 335 - 0
app/services/agricultural_input_service.py

@@ -0,0 +1,335 @@
+"""
+农业投入Cd通量计算服务
+@description: 根据Parameters表中的数据计算各个地区的农业投入输入Cd通量预测
+@author: AcidMap Team
+@version: 1.0.0
+"""
+
+import logging
+from typing import Dict, Any, List, Optional
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy import create_engine
+from ..database import SessionLocal, engine
+from ..models.parameters import Parameters
+
+class AgriculturalInputService:
+    """
+    农业投入Cd通量计算服务类
+    
+    @description: 提供基于Parameters表数据的农业投入输入Cd通量预测计算功能
+    """
+    
+    def __init__(self):
+        """
+        初始化农业投入服务
+        """
+        self.logger = logging.getLogger(__name__)
+        
+    def calculate_cd_flux_by_area(self, area: str) -> Dict[str, Any]:
+        """
+        根据地区计算农业投入输入Cd通量
+        
+        @param area: 地区名称
+        @returns: 计算结果
+        """
+        try:
+            with SessionLocal() as db:
+                # 查询指定地区的参数
+                parameter = db.query(Parameters).filter(Parameters.area == area).first()
+                
+                if not parameter:
+                    return {
+                        "success": False,
+                        "message": f"未找到地区 '{area}' 的参数数据",
+                        "data": None
+                    }
+                
+                # 计算农业投入输入Cd通量
+                # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
+                cd_flux = (
+                    parameter.f3 * parameter.nf +  # 氮肥
+                    parameter.f4 * parameter.pf +  # 磷肥
+                    parameter.f5 * parameter.kf +  # 钾肥
+                    parameter.f6 * parameter.cf +  # 复合肥
+                    parameter.f7 * parameter.of +  # 有机肥
+                    parameter.f8 * parameter.p +   # 农药
+                    parameter.f9 * parameter.ff +  # 农家肥
+                    parameter.f10 * parameter.af   # 农膜
+                )
+                
+                # 计算各项明细
+                details = {
+                    "nitrogen_fertilizer": parameter.f3 * parameter.nf,      # 氮肥贡献
+                    "phosphorus_fertilizer": parameter.f4 * parameter.pf,    # 磷肥贡献
+                    "potassium_fertilizer": parameter.f5 * parameter.kf,     # 钾肥贡献
+                    "compound_fertilizer": parameter.f6 * parameter.cf,      # 复合肥贡献
+                    "organic_fertilizer": parameter.f7 * parameter.of,       # 有机肥贡献
+                    "pesticide": parameter.f8 * parameter.p,                 # 农药贡献
+                    "farmyard_manure": parameter.f9 * parameter.ff,          # 农家肥贡献
+                    "agricultural_film": parameter.f10 * parameter.af        # 农膜贡献
+                }
+                
+                return {
+                    "success": True,
+                    "message": f"成功计算地区 '{area}' 的农业投入输入Cd通量",
+                    "data": {
+                        "area": area,
+                        "total_cd_flux": round(cd_flux, 6),
+                        "unit": "g/ha/a",
+                        "details": {key: round(value, 6) for key, value in details.items()},
+                        "parameters_used": {
+                            "f3_nitrogen_cd_content": parameter.f3,
+                            "f4_phosphorus_cd_content": parameter.f4,
+                            "f5_potassium_cd_content": parameter.f5,
+                            "f6_compound_cd_content": parameter.f6,
+                            "f7_organic_cd_content": parameter.f7,
+                            "f8_pesticide_cd_content": parameter.f8,
+                            "f9_farmyard_cd_content": parameter.f9,
+                            "f10_film_cd_content": parameter.f10,
+                            "nf_nitrogen_usage": parameter.nf,
+                            "pf_phosphorus_usage": parameter.pf,
+                            "kf_potassium_usage": parameter.kf,
+                            "cf_compound_usage": parameter.cf,
+                            "of_organic_usage": parameter.of,
+                            "p_pesticide_usage": parameter.p,
+                            "ff_farmyard_usage": parameter.ff,
+                            "af_film_usage": parameter.af
+                        }
+                    }
+                }
+                
+        except Exception as e:
+            self.logger.error(f"计算地区 '{area}' 的Cd通量时发生错误: {str(e)}")
+            return {
+                "success": False,
+                "message": f"计算失败: {str(e)}",
+                "data": None
+            }
+    
+    def calculate_all_areas_cd_flux(self) -> Dict[str, Any]:
+        """
+        计算所有地区的农业投入输入Cd通量
+        
+        @returns: 所有地区的计算结果
+        """
+        try:
+            with SessionLocal() as db:
+                # 查询所有参数记录
+                parameters = db.query(Parameters).all()
+                
+                if not parameters:
+                    return {
+                        "success": False,
+                        "message": "数据库中未找到任何参数数据",
+                        "data": None
+                    }
+                
+                results = []
+                
+                for param in parameters:
+                    # 计算每个地区的Cd通量
+                    cd_flux = (
+                        param.f3 * param.nf +
+                        param.f4 * param.pf +
+                        param.f5 * param.kf +
+                        param.f6 * param.cf +
+                        param.f7 * param.of +
+                        param.f8 * param.p +
+                        param.f9 * param.ff +
+                        param.f10 * param.af
+                    )
+                    
+                    details = {
+                        "nitrogen_fertilizer": param.f3 * param.nf,
+                        "phosphorus_fertilizer": param.f4 * param.pf,
+                        "potassium_fertilizer": param.f5 * param.kf,
+                        "compound_fertilizer": param.f6 * param.cf,
+                        "organic_fertilizer": param.f7 * param.of,
+                        "pesticide": param.f8 * param.p,
+                        "farmyard_manure": param.f9 * param.ff,
+                        "agricultural_film": param.f10 * param.af
+                    }
+                    
+                    result_item = {
+                        "area": param.area,
+                        "total_cd_flux": round(cd_flux, 6),
+                        "details": {key: round(value, 6) for key, value in details.items()}
+                    }
+                    
+                    results.append(result_item)
+                
+                # 按总通量排序
+                results.sort(key=lambda x: x["total_cd_flux"], reverse=True)
+                
+                return {
+                    "success": True,
+                    "message": f"成功计算所有 {len(results)} 个地区的农业投入输入Cd通量",
+                    "data": {
+                        "total_areas": len(results),
+                        "unit": "g/ha/a",
+                        "results": results,
+                        "summary": {
+                            "max_cd_flux": max(results, key=lambda x: x["total_cd_flux"]) if results else None,
+                            "min_cd_flux": min(results, key=lambda x: x["total_cd_flux"]) if results else None,
+                            "average_cd_flux": round(sum(r["total_cd_flux"] for r in results) / len(results), 6) if results else 0
+                        }
+                    }
+                }
+                
+        except Exception as e:
+            self.logger.error(f"计算所有地区Cd通量时发生错误: {str(e)}")
+            return {
+                "success": False,
+                "message": f"计算失败: {str(e)}",
+                "data": None
+            }
+    
+    def get_available_areas(self) -> Dict[str, Any]:
+        """
+        获取数据库中所有可用的地区列表
+        
+        @returns: 可用地区列表
+        """
+        try:
+            with SessionLocal() as db:
+                # 查询所有不重复的地区
+                areas = db.query(Parameters.area).distinct().all()
+                area_list = [area[0] for area in areas if area[0]]
+                
+                return {
+                    "success": True,
+                    "message": f"成功获取 {len(area_list)} 个可用地区",
+                    "data": {
+                        "areas": sorted(area_list),
+                        "total_count": len(area_list)
+                    }
+                }
+                
+        except Exception as e:
+            self.logger.error(f"获取可用地区列表时发生错误: {str(e)}")
+            return {
+                "success": False,
+                "message": f"获取失败: {str(e)}",
+                "data": None
+            }
+    
+    def calculate_cd_flux_with_custom_data(self, request) -> Dict[str, Any]:
+        """
+        使用用户提供的自定义数据计算农业投入输入Cd通量
+        
+        @param request: 包含计算参数的请求对象
+        @returns: 计算结果
+        """
+        try:
+            # 进行参数验证
+            if not self._validate_calculation_parameters(request):
+                return {
+                    "success": False,
+                    "message": "提供的参数数据不完整或无效",
+                    "data": None
+                }
+            
+            # 计算农业投入输入Cd通量
+            # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
+            cd_flux = (
+                request.f3_nitrogen_cd_content * request.nf_nitrogen_usage +      # 氮肥
+                request.f4_phosphorus_cd_content * request.pf_phosphorus_usage +  # 磷肥
+                request.f5_potassium_cd_content * request.kf_potassium_usage +    # 钾肥
+                request.f6_compound_cd_content * request.cf_compound_usage +      # 复合肥
+                request.f7_organic_cd_content * request.of_organic_usage +        # 有机肥
+                request.f8_pesticide_cd_content * request.p_pesticide_usage +     # 农药
+                request.f9_farmyard_cd_content * request.ff_farmyard_usage +      # 农家肥
+                request.f10_film_cd_content * request.af_film_usage               # 农膜
+            )
+            
+            # 计算各项明细贡献
+            details = {
+                "nitrogen_fertilizer": request.f3_nitrogen_cd_content * request.nf_nitrogen_usage,
+                "phosphorus_fertilizer": request.f4_phosphorus_cd_content * request.pf_phosphorus_usage,
+                "potassium_fertilizer": request.f5_potassium_cd_content * request.kf_potassium_usage,
+                "compound_fertilizer": request.f6_compound_cd_content * request.cf_compound_usage,
+                "organic_fertilizer": request.f7_organic_cd_content * request.of_organic_usage,
+                "pesticide": request.f8_pesticide_cd_content * request.p_pesticide_usage,
+                "farmyard_manure": request.f9_farmyard_cd_content * request.ff_farmyard_usage,
+                "agricultural_film": request.f10_film_cd_content * request.af_film_usage
+            }
+            
+            # 构建返回结果
+            result_data = {
+                "total_cd_flux": round(cd_flux, 6),
+                "unit": "g/ha/a",
+                "details": {key: round(value, 6) for key, value in details.items()},
+                "input_parameters": {
+                    "cd_contents": {
+                        "f3_nitrogen_cd_content": request.f3_nitrogen_cd_content,
+                        "f4_phosphorus_cd_content": request.f4_phosphorus_cd_content,
+                        "f5_potassium_cd_content": request.f5_potassium_cd_content,
+                        "f6_compound_cd_content": request.f6_compound_cd_content,
+                        "f7_organic_cd_content": request.f7_organic_cd_content,
+                        "f8_pesticide_cd_content": request.f8_pesticide_cd_content,
+                        "f9_farmyard_cd_content": request.f9_farmyard_cd_content,
+                        "f10_film_cd_content": request.f10_film_cd_content
+                    },
+                    "usage_amounts": {
+                        "nf_nitrogen_usage": request.nf_nitrogen_usage,
+                        "pf_phosphorus_usage": request.pf_phosphorus_usage,
+                        "kf_potassium_usage": request.kf_potassium_usage,
+                        "cf_compound_usage": request.cf_compound_usage,
+                        "of_organic_usage": request.of_organic_usage,
+                        "p_pesticide_usage": request.p_pesticide_usage,
+                        "ff_farmyard_usage": request.ff_farmyard_usage,
+                        "af_film_usage": request.af_film_usage
+                    }
+                }
+            }
+            
+            # 如果有描述信息,添加到结果中
+            if request.description:
+                result_data["description"] = request.description
+            
+            return {
+                "success": True,
+                "message": "成功计算农业投入输入Cd通量",
+                "data": result_data
+            }
+            
+        except Exception as e:
+            self.logger.error(f"使用自定义数据计算Cd通量时发生错误: {str(e)}")
+            return {
+                "success": False,
+                "message": f"计算失败: {str(e)}",
+                "data": None
+            }
+    
+    def _validate_calculation_parameters(self, request) -> bool:
+        """
+        验证计算参数的有效性
+        
+        @param request: 请求对象
+        @returns: 验证是否通过
+        """
+        try:
+            # 检查所有必需的参数是否存在且为非负数
+            required_attrs = [
+                'f3_nitrogen_cd_content', 'f4_phosphorus_cd_content', 'f5_potassium_cd_content',
+                'f6_compound_cd_content', 'f7_organic_cd_content', 'f8_pesticide_cd_content',
+                'f9_farmyard_cd_content', 'f10_film_cd_content', 'nf_nitrogen_usage',
+                'pf_phosphorus_usage', 'kf_potassium_usage', 'cf_compound_usage',
+                'of_organic_usage', 'p_pesticide_usage', 'ff_farmyard_usage', 'af_film_usage'
+            ]
+            
+            for attr in required_attrs:
+                if not hasattr(request, attr):
+                    self.logger.warning(f"缺少必需参数: {attr}")
+                    return False
+                
+                value = getattr(request, attr)
+                if value is None or value < 0:
+                    self.logger.warning(f"参数 {attr} 无效: {value}")
+                    return False
+            
+            return True
+            
+        except Exception as e:
+            self.logger.error(f"验证参数时发生错误: {str(e)}")
+            return False 

+ 327 - 0
app/services/cd_flux_removal_service.py

@@ -0,0 +1,327 @@
+"""
+Cd通量移除计算服务
+@description: 提供籽粒移除和秸秆移除的Cd通量计算功能
+@author: AcidMap Team
+@version: 1.0.0
+"""
+
+import logging
+import math
+from typing import Dict, Any, List, Optional
+from sqlalchemy.orm import sessionmaker, Session
+from sqlalchemy import create_engine, and_
+from ..database import SessionLocal, engine
+from ..models.parameters import Parameters
+from ..models.CropCd_output import CropCdOutputData
+
+
+class CdFluxRemovalService:
+    """
+    Cd通量移除计算服务类
+    
+    @description: 提供基于CropCd_output_data和Parameters表数据的籽粒移除和秸秆移除Cd通量计算功能
+    """
+    
+    def __init__(self):
+        """
+        初始化Cd通量移除服务
+        """
+        self.logger = logging.getLogger(__name__)
+        
+    def calculate_grain_removal_by_area(self, area: str) -> Dict[str, Any]:
+        """
+        根据地区计算籽粒移除Cd通量
+        
+        @param area: 地区名称
+        @returns: 计算结果字典
+        
+        计算公式:籽粒移除(g/ha/a) = EXP(LnCropCd) * F11 * 0.5 * 15 / 1000
+        """
+        try:
+            with SessionLocal() as db:
+                # 查询指定地区的参数
+                parameter = db.query(Parameters).filter(Parameters.area == area).first()
+                
+                if not parameter:
+                    return {
+                        "success": False,
+                        "message": f"未找到地区 '{area}' 的参数数据",
+                        "data": None
+                    }
+                
+                # 查询CropCd输出数据
+                crop_cd_outputs = db.query(CropCdOutputData).all()
+                
+                if not crop_cd_outputs:
+                    return {
+                        "success": False,
+                        "message": f"未找到CropCd输出数据",
+                        "data": None
+                    }
+                
+                # 计算每个样点的籽粒移除Cd通量
+                results = []
+                for output in crop_cd_outputs:
+                    crop_cd_value = math.exp(output.ln_crop_cd)  # EXP(LnCropCd)
+                    grain_removal = crop_cd_value * parameter.f11 * 0.5 * 15 / 1000
+                    
+                    results.append({
+                        "farmland_id": output.farmland_id,
+                        "sample_id": output.sample_id,
+                        "ln_crop_cd": output.ln_crop_cd,
+                        "crop_cd_value": crop_cd_value,
+                        "f11_yield": parameter.f11,
+                        "grain_removal_flux": grain_removal
+                    })
+                
+                # 计算统计信息
+                flux_values = [r["grain_removal_flux"] for r in results]
+                statistics = {
+                    "total_samples": len(results),
+                    "mean_flux": sum(flux_values) / len(flux_values),
+                    "max_flux": max(flux_values),
+                    "min_flux": min(flux_values)
+                }
+                
+                return {
+                    "success": True,
+                    "message": f"地区 '{area}' 的籽粒移除Cd通量计算成功",
+                    "data": {
+                        "area": area,
+                        "calculation_type": "grain_removal",
+                        "formula": "EXP(LnCropCd) * F11 * 0.5 * 15 / 1000",
+                        "unit": "g/ha/a",
+                        "results": results,
+                        "statistics": statistics
+                    }
+                }
+                
+        except Exception as e:
+            self.logger.error(f"计算地区 '{area}' 的籽粒移除Cd通量失败: {str(e)}")
+            return {
+                "success": False,
+                "message": f"计算失败: {str(e)}",
+                "data": None
+            }
+    
+    def calculate_straw_removal_by_area(self, area: str) -> Dict[str, Any]:
+        """
+        根据地区计算秸秆移除Cd通量
+        
+        @param area: 地区名称
+        @returns: 计算结果字典
+        
+        计算公式:秸秆移除(g/ha/a) = [EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000
+        """
+        try:
+            with SessionLocal() as db:
+                # 查询指定地区的参数
+                parameter = db.query(Parameters).filter(Parameters.area == area).first()
+                
+                if not parameter:
+                    return {
+                        "success": False,
+                        "message": f"未找到地区 '{area}' 的参数数据",
+                        "data": None
+                    }
+                
+                # 查询CropCd输出数据
+                crop_cd_outputs = db.query(CropCdOutputData).all()
+                
+                if not crop_cd_outputs:
+                    return {
+                        "success": False,
+                        "message": f"未找到CropCd输出数据",
+                        "data": None
+                    }
+                
+                # 计算每个样点的秸秆移除Cd通量
+                results = []
+                for output in crop_cd_outputs:
+                    crop_cd_value = math.exp(output.ln_crop_cd)  # EXP(LnCropCd)
+                    
+                    # 计算分母:EXP(LnCropCd)*0.76-0.0034
+                    denominator = crop_cd_value * 0.76 - 0.0034
+                    
+                    # 检查分母是否为零或负数,避免除零错误
+                    if denominator <= 0:
+                        self.logger.warning(f"样点 {output.farmland_id}-{output.sample_id} 的分母值为 {denominator},跳过计算")
+                        continue
+                    
+                    # 计算秸秆移除Cd通量
+                    straw_removal = (crop_cd_value / denominator) * parameter.f11 * 0.5 * 15 / 1000
+                    
+                    results.append({
+                        "farmland_id": output.farmland_id,
+                        "sample_id": output.sample_id,
+                        "ln_crop_cd": output.ln_crop_cd,
+                        "crop_cd_value": crop_cd_value,
+                        "denominator": denominator,
+                        "f11_yield": parameter.f11,
+                        "straw_removal_flux": straw_removal
+                    })
+                
+                if not results:
+                    return {
+                        "success": False,
+                        "message": "所有样点的计算都因分母值无效而失败",
+                        "data": None
+                    }
+                
+                # 计算统计信息
+                flux_values = [r["straw_removal_flux"] for r in results]
+                statistics = {
+                    "total_samples": len(results),
+                    "mean_flux": sum(flux_values) / len(flux_values),
+                    "max_flux": max(flux_values),
+                    "min_flux": min(flux_values)
+                }
+                
+                return {
+                    "success": True,
+                    "message": f"地区 '{area}' 的秸秆移除Cd通量计算成功",
+                    "data": {
+                        "area": area,
+                        "calculation_type": "straw_removal",
+                        "formula": "[EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000",
+                        "unit": "g/ha/a",
+                        "results": results,
+                        "statistics": statistics
+                    }
+                }
+                
+        except Exception as e:
+            self.logger.error(f"计算地区 '{area}' 的秸秆移除Cd通量失败: {str(e)}")
+            return {
+                "success": False,
+                "message": f"计算失败: {str(e)}",
+                "data": None
+            }
+    
+    def calculate_both_removals_by_area(self, area: str) -> Dict[str, Any]:
+        """
+        根据地区同时计算籽粒移除和秸秆移除Cd通量
+        
+        @param area: 地区名称
+        @returns: 计算结果字典
+        """
+        try:
+            # 分别计算籽粒移除和秸秆移除
+            grain_result = self.calculate_grain_removal_by_area(area)
+            straw_result = self.calculate_straw_removal_by_area(area)
+            
+            if not grain_result["success"] or not straw_result["success"]:
+                error_messages = []
+                if not grain_result["success"]:
+                    error_messages.append(f"籽粒移除计算失败: {grain_result['message']}")
+                if not straw_result["success"]:
+                    error_messages.append(f"秸秆移除计算失败: {straw_result['message']}")
+                
+                return {
+                    "success": False,
+                    "message": "; ".join(error_messages),
+                    "data": None
+                }
+            
+            return {
+                "success": True,
+                "message": f"地区 '{area}' 的Cd通量移除计算成功",
+                "data": {
+                    "area": area,
+                    "grain_removal": grain_result["data"],
+                    "straw_removal": straw_result["data"]
+                }
+            }
+            
+        except Exception as e:
+            self.logger.error(f"计算地区 '{area}' 的Cd通量移除失败: {str(e)}")
+            return {
+                "success": False,
+                "message": f"计算失败: {str(e)}",
+                "data": None
+            }
+    
+    def calculate_removal_by_sample(self, farmland_id: int, sample_id: int, area: str) -> Dict[str, Any]:
+        """
+        根据特定样点计算籽粒移除和秸秆移除Cd通量
+        
+        @param farmland_id: 农地ID
+        @param sample_id: 样点ID
+        @param area: 地区名称
+        @returns: 计算结果字典
+        """
+        try:
+            with SessionLocal() as db:
+                # 查询指定地区的参数
+                parameter = db.query(Parameters).filter(Parameters.area == area).first()
+                
+                if not parameter:
+                    return {
+                        "success": False,
+                        "message": f"未找到地区 '{area}' 的参数数据",
+                        "data": None
+                    }
+                
+                # 查询特定样点的CropCd输出数据
+                crop_cd_output = db.query(CropCdOutputData).filter(
+                    and_(
+                        CropCdOutputData.farmland_id == farmland_id,
+                        CropCdOutputData.sample_id == sample_id
+                    )
+                ).first()
+                
+                if not crop_cd_output:
+                    return {
+                        "success": False,
+                        "message": f"未找到样点 {farmland_id}-{sample_id} 的CropCd输出数据",
+                        "data": None
+                    }
+                
+                crop_cd_value = math.exp(crop_cd_output.ln_crop_cd)  # EXP(LnCropCd)
+                
+                # 计算籽粒移除
+                grain_removal = crop_cd_value * parameter.f11 * 0.5 * 15 / 1000
+                
+                # 计算秸秆移除
+                denominator = crop_cd_value * 0.76 - 0.0034
+                straw_removal = None
+                straw_calculation_valid = True
+                
+                if denominator <= 0:
+                    straw_calculation_valid = False
+                    self.logger.warning(f"样点 {farmland_id}-{sample_id} 的分母值为 {denominator},秸秆移除计算无效")
+                else:
+                    straw_removal = (crop_cd_value / denominator) * parameter.f11 * 0.5 * 15 / 1000
+                
+                return {
+                    "success": True,
+                    "message": f"样点 {farmland_id}-{sample_id} 的Cd通量移除计算成功",
+                    "data": {
+                        "farmland_id": farmland_id,
+                        "sample_id": sample_id,
+                        "area": area,
+                        "ln_crop_cd": crop_cd_output.ln_crop_cd,
+                        "crop_cd_value": crop_cd_value,
+                        "f11_yield": parameter.f11,
+                        "grain_removal": {
+                            "formula": "EXP(LnCropCd) * F11 * 0.5 * 15 / 1000",
+                            "value": grain_removal,
+                            "unit": "g/ha/a"
+                        },
+                        "straw_removal": {
+                            "formula": "[EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000",
+                            "value": straw_removal,
+                            "unit": "g/ha/a",
+                            "calculation_valid": straw_calculation_valid,
+                            "denominator": denominator if straw_calculation_valid else None
+                        }
+                    }
+                }
+                
+        except Exception as e:
+            self.logger.error(f"计算样点 {farmland_id}-{sample_id} 的Cd通量移除失败: {str(e)}")
+            return {
+                "success": False,
+                "message": f"计算失败: {str(e)}",
+                "data": None
+            } 

+ 64 - 0
test_cd_flux_removal.py

@@ -0,0 +1,64 @@
+"""
+Cd通量移除接口测试脚本
+@description: 验证新创建的Cd通量移除计算接口是否正常工作
+"""
+
+import sys
+import os
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+from app.services.cd_flux_removal_service import CdFluxRemovalService
+import logging
+
+# 设置日志
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+def test_cd_flux_removal_service():
+    """
+    测试Cd通量移除服务的各个功能
+    """
+    print("=== Cd通量移除服务测试 ===")
+    
+    service = CdFluxRemovalService()
+    area = "韶关"  # 使用默认的测试区域
+    
+    print(f"\n1. 测试籽粒移除计算...")
+    try:
+        grain_result = service.calculate_grain_removal_by_area(area)
+        print(f"籽粒移除计算结果: {grain_result['success']}")
+        if grain_result['success']:
+            print(f"  - 计算样本数: {grain_result['data']['statistics']['total_samples']}")
+            print(f"  - 平均通量: {grain_result['data']['statistics']['mean_flux']:.6f} g/ha/a")
+        else:
+            print(f"  - 错误信息: {grain_result['message']}")
+    except Exception as e:
+        print(f"  - 测试失败: {str(e)}")
+    
+    print(f"\n2. 测试秸秆移除计算...")
+    try:
+        straw_result = service.calculate_straw_removal_by_area(area)
+        print(f"秸秆移除计算结果: {straw_result['success']}")
+        if straw_result['success']:
+            print(f"  - 计算样本数: {straw_result['data']['statistics']['total_samples']}")
+            print(f"  - 平均通量: {straw_result['data']['statistics']['mean_flux']:.6f} g/ha/a")
+        else:
+            print(f"  - 错误信息: {straw_result['message']}")
+    except Exception as e:
+        print(f"  - 测试失败: {str(e)}")
+    
+    print(f"\n3. 测试同时计算...")
+    try:
+        both_result = service.calculate_both_removals_by_area(area)
+        print(f"同时计算结果: {both_result['success']}")
+        if both_result['success']:
+            print(f"  - 成功获取籽粒和秸秆移除数据")
+        else:
+            print(f"  - 错误信息: {both_result['message']}")
+    except Exception as e:
+        print(f"  - 测试失败: {str(e)}")
+    
+    print(f"\n=== 测试完成 ===")
+
+if __name__ == "__main__":
+    test_cd_flux_removal_service()