cd_flux_removal_service.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. """
  2. Cd通量移除计算服务
  3. @description: 提供籽粒移除和秸秆移除的Cd通量计算功能
  4. @author: AcidMap Team
  5. @version: 1.0.0
  6. """
  7. import logging
  8. import math
  9. from typing import Dict, Any, List, Optional
  10. from sqlalchemy.orm import sessionmaker, Session
  11. from sqlalchemy import create_engine, and_
  12. from ..database import SessionLocal, engine
  13. from ..models.parameters import Parameters
  14. from ..models.CropCd_output import CropCdOutputData
  15. class CdFluxRemovalService:
  16. """
  17. Cd通量移除计算服务类
  18. @description: 提供基于CropCd_output_data和Parameters表数据的籽粒移除和秸秆移除Cd通量计算功能
  19. """
  20. def __init__(self):
  21. """
  22. 初始化Cd通量移除服务
  23. """
  24. self.logger = logging.getLogger(__name__)
  25. def calculate_grain_removal_by_area(self, area: str) -> Dict[str, Any]:
  26. """
  27. 根据地区计算籽粒移除Cd通量
  28. @param area: 地区名称
  29. @returns: 计算结果字典
  30. 计算公式:籽粒移除(g/ha/a) = EXP(LnCropCd) * F11 * 0.5 * 15 / 1000
  31. """
  32. try:
  33. with SessionLocal() as db:
  34. # 查询指定地区的参数
  35. parameter = db.query(Parameters).filter(Parameters.area == area).first()
  36. if not parameter:
  37. return {
  38. "success": False,
  39. "message": f"未找到地区 '{area}' 的参数数据",
  40. "data": None
  41. }
  42. # 查询CropCd输出数据
  43. crop_cd_outputs = db.query(CropCdOutputData).all()
  44. if not crop_cd_outputs:
  45. return {
  46. "success": False,
  47. "message": f"未找到CropCd输出数据",
  48. "data": None
  49. }
  50. # 计算每个样点的籽粒移除Cd通量
  51. results = []
  52. for output in crop_cd_outputs:
  53. crop_cd_value = math.exp(output.ln_crop_cd) # EXP(LnCropCd)
  54. grain_removal = crop_cd_value * parameter.f11 * 0.5 * 15 / 1000
  55. results.append({
  56. "farmland_id": output.farmland_id,
  57. "sample_id": output.sample_id,
  58. "ln_crop_cd": output.ln_crop_cd,
  59. "crop_cd_value": crop_cd_value,
  60. "f11_yield": parameter.f11,
  61. "grain_removal_flux": grain_removal
  62. })
  63. # 计算统计信息
  64. flux_values = [r["grain_removal_flux"] for r in results]
  65. statistics = {
  66. "total_samples": len(results),
  67. "mean_flux": sum(flux_values) / len(flux_values),
  68. "max_flux": max(flux_values),
  69. "min_flux": min(flux_values)
  70. }
  71. return {
  72. "success": True,
  73. "message": f"地区 '{area}' 的籽粒移除Cd通量计算成功",
  74. "data": {
  75. "area": area,
  76. "calculation_type": "grain_removal",
  77. "formula": "EXP(LnCropCd) * F11 * 0.5 * 15 / 1000",
  78. "unit": "g/ha/a",
  79. "results": results,
  80. "statistics": statistics
  81. }
  82. }
  83. except Exception as e:
  84. self.logger.error(f"计算地区 '{area}' 的籽粒移除Cd通量失败: {str(e)}")
  85. return {
  86. "success": False,
  87. "message": f"计算失败: {str(e)}",
  88. "data": None
  89. }
  90. def calculate_straw_removal_by_area(self, area: str) -> Dict[str, Any]:
  91. """
  92. 根据地区计算秸秆移除Cd通量
  93. @param area: 地区名称
  94. @returns: 计算结果字典
  95. 计算公式:秸秆移除(g/ha/a) = [EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000
  96. """
  97. try:
  98. with SessionLocal() as db:
  99. # 查询指定地区的参数
  100. parameter = db.query(Parameters).filter(Parameters.area == area).first()
  101. if not parameter:
  102. return {
  103. "success": False,
  104. "message": f"未找到地区 '{area}' 的参数数据",
  105. "data": None
  106. }
  107. # 查询CropCd输出数据
  108. crop_cd_outputs = db.query(CropCdOutputData).all()
  109. if not crop_cd_outputs:
  110. return {
  111. "success": False,
  112. "message": f"未找到CropCd输出数据",
  113. "data": None
  114. }
  115. # 计算每个样点的秸秆移除Cd通量
  116. results = []
  117. for output in crop_cd_outputs:
  118. crop_cd_value = math.exp(output.ln_crop_cd) # EXP(LnCropCd)
  119. # 计算分母:EXP(LnCropCd)*0.76-0.0034
  120. denominator = crop_cd_value * 0.76 - 0.0034
  121. # 检查分母是否为零或负数,避免除零错误
  122. if denominator <= 0:
  123. self.logger.warning(f"样点 {output.farmland_id}-{output.sample_id} 的分母值为 {denominator},跳过计算")
  124. continue
  125. # 计算秸秆移除Cd通量
  126. straw_removal = (crop_cd_value / denominator) * parameter.f11 * 0.5 * 15 / 1000
  127. results.append({
  128. "farmland_id": output.farmland_id,
  129. "sample_id": output.sample_id,
  130. "ln_crop_cd": output.ln_crop_cd,
  131. "crop_cd_value": crop_cd_value,
  132. "denominator": denominator,
  133. "f11_yield": parameter.f11,
  134. "straw_removal_flux": straw_removal
  135. })
  136. if not results:
  137. return {
  138. "success": False,
  139. "message": "所有样点的计算都因分母值无效而失败",
  140. "data": None
  141. }
  142. # 计算统计信息
  143. flux_values = [r["straw_removal_flux"] for r in results]
  144. statistics = {
  145. "total_samples": len(results),
  146. "mean_flux": sum(flux_values) / len(flux_values),
  147. "max_flux": max(flux_values),
  148. "min_flux": min(flux_values)
  149. }
  150. return {
  151. "success": True,
  152. "message": f"地区 '{area}' 的秸秆移除Cd通量计算成功",
  153. "data": {
  154. "area": area,
  155. "calculation_type": "straw_removal",
  156. "formula": "[EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000",
  157. "unit": "g/ha/a",
  158. "results": results,
  159. "statistics": statistics
  160. }
  161. }
  162. except Exception as e:
  163. self.logger.error(f"计算地区 '{area}' 的秸秆移除Cd通量失败: {str(e)}")
  164. return {
  165. "success": False,
  166. "message": f"计算失败: {str(e)}",
  167. "data": None
  168. }