""" Cd通量移除计算服务 @description: 提供籽粒移除和秸秆移除的Cd通量计算功能 @author: AcidMap Team @version: 1.0.0 """ import logging import math from typing import Dict, Any, List, Optional from sqlalchemy.orm import sessionmaker, Session from sqlalchemy import create_engine, and_ from ..database import SessionLocal, engine from ..models.parameters import Parameters from ..models.CropCd_output import CropCdOutputData class CdFluxRemovalService: """ Cd通量移除计算服务类 @description: 提供基于CropCd_output_data和Parameters表数据的籽粒移除和秸秆移除Cd通量计算功能 """ def __init__(self): """ 初始化Cd通量移除服务 """ self.logger = logging.getLogger(__name__) def calculate_grain_removal_by_area(self, area: str) -> Dict[str, Any]: """ 根据地区计算籽粒移除Cd通量 @param area: 地区名称 @returns: 计算结果字典 计算公式:籽粒移除(g/ha/a) = EXP(LnCropCd) * F11 * 0.5 * 15 / 1000 """ try: with SessionLocal() as db: # 查询指定地区的参数 parameter = db.query(Parameters).filter(Parameters.area == area).first() if not parameter: return { "success": False, "message": f"未找到地区 '{area}' 的参数数据", "data": None } # 查询CropCd输出数据 crop_cd_outputs = db.query(CropCdOutputData).all() if not crop_cd_outputs: return { "success": False, "message": f"未找到CropCd输出数据", "data": None } # 计算每个样点的籽粒移除Cd通量 results = [] for output in crop_cd_outputs: crop_cd_value = math.exp(output.ln_crop_cd) # EXP(LnCropCd) grain_removal = crop_cd_value * parameter.f11 * 0.5 * 15 / 1000 results.append({ "farmland_id": output.farmland_id, "sample_id": output.sample_id, "ln_crop_cd": output.ln_crop_cd, "crop_cd_value": crop_cd_value, "f11_yield": parameter.f11, "grain_removal_flux": grain_removal }) # 计算统计信息 flux_values = [r["grain_removal_flux"] for r in results] statistics = { "total_samples": len(results), "mean_flux": sum(flux_values) / len(flux_values), "max_flux": max(flux_values), "min_flux": min(flux_values) } return { "success": True, "message": f"地区 '{area}' 的籽粒移除Cd通量计算成功", "data": { "area": area, "calculation_type": "grain_removal", "formula": "EXP(LnCropCd) * F11 * 0.5 * 15 / 1000", "unit": "g/ha/a", "results": results, "statistics": statistics } } except Exception as e: self.logger.error(f"计算地区 '{area}' 的籽粒移除Cd通量失败: {str(e)}") return { "success": False, "message": f"计算失败: {str(e)}", "data": None } def calculate_straw_removal_by_area(self, area: str) -> Dict[str, Any]: """ 根据地区计算秸秆移除Cd通量 @param area: 地区名称 @returns: 计算结果字典 计算公式:秸秆移除(g/ha/a) = [EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000 """ try: with SessionLocal() as db: # 查询指定地区的参数 parameter = db.query(Parameters).filter(Parameters.area == area).first() if not parameter: return { "success": False, "message": f"未找到地区 '{area}' 的参数数据", "data": None } # 查询CropCd输出数据 crop_cd_outputs = db.query(CropCdOutputData).all() if not crop_cd_outputs: return { "success": False, "message": f"未找到CropCd输出数据", "data": None } # 计算每个样点的秸秆移除Cd通量 results = [] for output in crop_cd_outputs: crop_cd_value = math.exp(output.ln_crop_cd) # EXP(LnCropCd) # 计算分母:EXP(LnCropCd)*0.76-0.0034 denominator = crop_cd_value * 0.76 - 0.0034 # 检查分母是否为零或负数,避免除零错误 if denominator <= 0: self.logger.warning(f"样点 {output.farmland_id}-{output.sample_id} 的分母值为 {denominator},跳过计算") continue # 计算秸秆移除Cd通量 straw_removal = (crop_cd_value / denominator) * parameter.f11 * 0.5 * 15 / 1000 results.append({ "farmland_id": output.farmland_id, "sample_id": output.sample_id, "ln_crop_cd": output.ln_crop_cd, "crop_cd_value": crop_cd_value, "denominator": denominator, "f11_yield": parameter.f11, "straw_removal_flux": straw_removal }) if not results: return { "success": False, "message": "所有样点的计算都因分母值无效而失败", "data": None } # 计算统计信息 flux_values = [r["straw_removal_flux"] for r in results] statistics = { "total_samples": len(results), "mean_flux": sum(flux_values) / len(flux_values), "max_flux": max(flux_values), "min_flux": min(flux_values) } return { "success": True, "message": f"地区 '{area}' 的秸秆移除Cd通量计算成功", "data": { "area": area, "calculation_type": "straw_removal", "formula": "[EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000", "unit": "g/ha/a", "results": results, "statistics": statistics } } except Exception as e: self.logger.error(f"计算地区 '{area}' 的秸秆移除Cd通量失败: {str(e)}") return { "success": False, "message": f"计算失败: {str(e)}", "data": None } def calculate_both_removals_by_area(self, area: str) -> Dict[str, Any]: """ 根据地区同时计算籽粒移除和秸秆移除Cd通量 @param area: 地区名称 @returns: 计算结果字典 """ try: # 分别计算籽粒移除和秸秆移除 grain_result = self.calculate_grain_removal_by_area(area) straw_result = self.calculate_straw_removal_by_area(area) if not grain_result["success"] or not straw_result["success"]: error_messages = [] if not grain_result["success"]: error_messages.append(f"籽粒移除计算失败: {grain_result['message']}") if not straw_result["success"]: error_messages.append(f"秸秆移除计算失败: {straw_result['message']}") return { "success": False, "message": "; ".join(error_messages), "data": None } return { "success": True, "message": f"地区 '{area}' 的Cd通量移除计算成功", "data": { "area": area, "grain_removal": grain_result["data"], "straw_removal": straw_result["data"] } } except Exception as e: self.logger.error(f"计算地区 '{area}' 的Cd通量移除失败: {str(e)}") return { "success": False, "message": f"计算失败: {str(e)}", "data": None } def calculate_removal_by_sample(self, farmland_id: int, sample_id: int, area: str) -> Dict[str, Any]: """ 根据特定样点计算籽粒移除和秸秆移除Cd通量 @param farmland_id: 农地ID @param sample_id: 样点ID @param area: 地区名称 @returns: 计算结果字典 """ try: with SessionLocal() as db: # 查询指定地区的参数 parameter = db.query(Parameters).filter(Parameters.area == area).first() if not parameter: return { "success": False, "message": f"未找到地区 '{area}' 的参数数据", "data": None } # 查询特定样点的CropCd输出数据 crop_cd_output = db.query(CropCdOutputData).filter( and_( CropCdOutputData.farmland_id == farmland_id, CropCdOutputData.sample_id == sample_id ) ).first() if not crop_cd_output: return { "success": False, "message": f"未找到样点 {farmland_id}-{sample_id} 的CropCd输出数据", "data": None } crop_cd_value = math.exp(crop_cd_output.ln_crop_cd) # EXP(LnCropCd) # 计算籽粒移除 grain_removal = crop_cd_value * parameter.f11 * 0.5 * 15 / 1000 # 计算秸秆移除 denominator = crop_cd_value * 0.76 - 0.0034 straw_removal = None straw_calculation_valid = True if denominator <= 0: straw_calculation_valid = False self.logger.warning(f"样点 {farmland_id}-{sample_id} 的分母值为 {denominator},秸秆移除计算无效") else: straw_removal = (crop_cd_value / denominator) * parameter.f11 * 0.5 * 15 / 1000 return { "success": True, "message": f"样点 {farmland_id}-{sample_id} 的Cd通量移除计算成功", "data": { "farmland_id": farmland_id, "sample_id": sample_id, "area": area, "ln_crop_cd": crop_cd_output.ln_crop_cd, "crop_cd_value": crop_cd_value, "f11_yield": parameter.f11, "grain_removal": { "formula": "EXP(LnCropCd) * F11 * 0.5 * 15 / 1000", "value": grain_removal, "unit": "g/ha/a" }, "straw_removal": { "formula": "[EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000", "value": straw_removal, "unit": "g/ha/a", "calculation_valid": straw_calculation_valid, "denominator": denominator if straw_calculation_valid else None } } } except Exception as e: self.logger.error(f"计算样点 {farmland_id}-{sample_id} 的Cd通量移除失败: {str(e)}") return { "success": False, "message": f"计算失败: {str(e)}", "data": None }