cd_prediction_service.py 64 KB


  1. """
  2. Cd预测服务类
  3. @description: 封装Cd预测模型的业务逻辑,提供作物Cd和有效态Cd的预测功能
  4. @author: AcidMap Team
  5. @version: 1.0.0
  6. """
  7. import os
  8. import logging
  9. import asyncio
  10. from datetime import datetime
  11. from typing import Dict, Any, Optional, List, Tuple
  12. import glob
  13. import shutil
  14. import zipfile
  15. import tempfile
  16. import pandas as pd
  17. import io
  18. from ..config.cd_prediction_config import cd_config
  19. from ..utils.cd_prediction_wrapper import CdPredictionWrapper
  20. from ..database import SessionLocal
  21. from .admin_boundary_service import get_boundary_geojson_by_name
  22. import json
  23. class CdPredictionService:
  24. """
  25. Cd预测服务类
  26. @description: 提供作物Cd和有效态Cd模型的预测与可视化功能
  27. @example
  28. >>> service = CdPredictionService()
  29. >>> result = await service.generate_crop_cd_prediction()
  30. """
  31. def __init__(self):
  32. """
  33. 初始化Cd预测服务
  34. @description: 设置Cd预测系统的路径和配置
  35. """
  36. # 设置日志
  37. self.logger = logging.getLogger(__name__)
  38. # 获取配置
  39. self.config = cd_config
  40. # 初始化包装器
  41. cd_system_path = self.config.get_cd_system_path()
  42. self.wrapper = CdPredictionWrapper(cd_system_path)
  43. # 输出目录
  44. self.output_figures_dir = self.config.get_output_dir("figures")
  45. self.output_raster_dir = self.config.get_output_dir("raster")
  46. self.output_data_dir = self.config.get_output_dir("data")
  47. # 支持的县市配置
  48. self.supported_counties = self._load_supported_counties()
  49. self.logger.info("Cd预测服务初始化完成")
  50. def _load_supported_counties(self) -> Dict[str, Dict]:
  51. """
  52. 加载支持的县市配置
  53. @returns {Dict[str, Dict]} 支持的县市配置信息
  54. """
  55. # 获取Cd预测系统的基础路径
  56. cd_system_base = self.config.get_cd_system_path()
  57. return {
  58. "乐昌市": {
  59. "boundary_file": os.path.join(cd_system_base, "output/raster/lechang.shp"),
  60. "template_file": os.path.join(cd_system_base, "output/raster/meanTemp.tif"),
  61. "coordinate_file": os.path.join(cd_system_base, "data/coordinates/坐标.csv"),
  62. "region_code": "440282",
  63. "display_name": "乐昌市",
  64. "province": "广东省"
  65. },
  66. # 可扩展添加更多县市
  67. # "韶关市": {
  68. # "boundary_file": os.path.join(cd_system_base, "output/raster/shaoguan.shp"),
  69. # "template_file": os.path.join(cd_system_base, "output/raster/shaoguan_template.tif"),
  70. # "coordinate_file": os.path.join(cd_system_base, "data/coordinates/韶关_坐标.csv"),
  71. # "region_code": "440200",
  72. # "display_name": "韶关市",
  73. # "province": "广东省"
  74. # }
  75. }
  76. def is_county_supported(self, county_name: str) -> bool:
  77. """
  78. 检查县市是否被支持
  79. @param {str} county_name - 县市名称
  80. @returns {bool} 是否支持该县市
  81. """
  82. return county_name in self.supported_counties
  83. def get_supported_counties(self) -> List[str]:
  84. """
  85. 获取支持的县市名称列表
  86. @returns {List[str]} 支持的县市名称列表
  87. """
  88. return list(self.supported_counties.keys())
  89. def get_supported_counties_info(self) -> List[Dict[str, Any]]:
  90. """
  91. 获取支持的县市详细信息
  92. @returns {List[Dict[str, Any]]} 支持的县市详细信息列表
  93. """
  94. counties_info = []
  95. for county_name, config in self.supported_counties.items():
  96. counties_info.append({
  97. "name": county_name,
  98. "display_name": config.get("display_name", county_name),
  99. "province": config.get("province", ""),
  100. "region_code": config.get("region_code", ""),
  101. "has_boundary": os.path.exists(config.get("boundary_file", "")),
  102. "has_template": os.path.exists(config.get("template_file", "")),
  103. "has_coordinates": os.path.exists(config.get("coordinate_file", ""))
  104. })
  105. return counties_info
  106. def validate_input_data(self, df: pd.DataFrame, county_name: str) -> Dict[str, Any]:
  107. """
  108. 验证输入数据格式
  109. @param {pd.DataFrame} df - 输入数据
  110. @param {str} county_name - 县市名称
  111. @returns {Dict[str, Any]} 验证结果
  112. """
  113. # 基本要求:前两列为坐标,至少需要3列数据
  114. if df.shape[1] < 3:
  115. return {
  116. "valid": False,
  117. "errors": ["数据至少需要3列:前两列为经纬度,后续列为环境因子"],
  118. "warnings": [],
  119. "data_shape": df.shape,
  120. "county_supported": self.is_county_supported(county_name),
  121. "null_summary": {}
  122. }
  123. # 坐标列验证(假设前两列是经纬度)
  124. coordinate_issues = []
  125. try:
  126. # 检查前两列是否为数值型
  127. if not pd.api.types.is_numeric_dtype(df.iloc[:, 0]):
  128. coordinate_issues.append("第一列(经度)不是数值型数据")
  129. elif not df.iloc[:, 0].between(70, 140).all():
  130. coordinate_issues.append("经度值超出合理范围(70-140度)")
  131. if not pd.api.types.is_numeric_dtype(df.iloc[:, 1]):
  132. coordinate_issues.append("第二列(纬度)不是数值型数据")
  133. elif not df.iloc[:, 1].between(15, 55).all():
  134. coordinate_issues.append("纬度值超出合理范围(15-55度)")
  135. except Exception as e:
  136. coordinate_issues.append(f"坐标数据验证失败: {str(e)}")
  137. # 检查数据完整性
  138. null_counts = df.isnull().sum()
  139. high_null_columns = [col for col, count in null_counts.items()
  140. if count > len(df) * 0.1] # 空值超过10%的列
  141. # 检查环境因子列是否为数值型
  142. non_numeric_columns = []
  143. for i in range(2, df.shape[1]): # 从第三列开始检查
  144. col_name = df.columns[i]
  145. if not pd.api.types.is_numeric_dtype(df.iloc[:, i]):
  146. non_numeric_columns.append(col_name)
  147. warnings = []
  148. if high_null_columns:
  149. warnings.append(f"以下列空值较多: {', '.join(high_null_columns)}")
  150. if coordinate_issues:
  151. warnings.extend(coordinate_issues)
  152. if non_numeric_columns:
  153. warnings.append(f"以下列不是数值型: {', '.join(non_numeric_columns)}")
  154. # 如果有严重的坐标问题,标记为无效
  155. critical_errors = []
  156. if any("不是数值型数据" in issue for issue in coordinate_issues):
  157. critical_errors.extend([issue for issue in coordinate_issues if "不是数值型数据" in issue])
  158. return {
  159. "valid": len(critical_errors) == 0,
  160. "errors": critical_errors,
  161. "warnings": warnings,
  162. "data_shape": df.shape,
  163. "county_supported": self.is_county_supported(county_name),
  164. "null_summary": null_counts.to_dict()
  165. }
  166. def save_uploaded_data(self, df: pd.DataFrame, county_name: str,
  167. description: Optional[str] = None) -> str:
  168. """
  169. 保存上传的数据文件
  170. @param {pd.DataFrame} df - 数据
  171. @param {str} county_name - 县市名称
  172. @param {Optional[str]} description - 数据描述
  173. @returns {str} 保存的文件路径
  174. """
  175. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  176. filename = f"{county_name}_uploaded_data_{timestamp}.csv"
  177. file_path = os.path.join(self.output_data_dir, "uploaded", filename)
  178. # 确保目录存在
  179. os.makedirs(os.path.dirname(file_path), exist_ok=True)
  180. # 保存数据
  181. df.to_csv(file_path, index=False, encoding='utf-8-sig')
  182. # 保存元信息
  183. meta_info = {
  184. "county_name": county_name,
  185. "description": description,
  186. "upload_time": datetime.now().isoformat(),
  187. "data_shape": df.shape,
  188. "columns": df.columns.tolist()
  189. }
  190. meta_path = file_path.replace('.csv', '_meta.json')
  191. import json
  192. with open(meta_path, 'w', encoding='utf-8') as f:
  193. json.dump(meta_info, f, ensure_ascii=False, indent=2)
  194. self.logger.info(f"数据文件已保存: {file_path}")
  195. return file_path
  196. def save_temp_data(self, df: pd.DataFrame, county_name: str) -> str:
  197. """
  198. 保存临时数据文件
  199. @param {pd.DataFrame} df - 数据
  200. @param {str} county_name - 县市名称
  201. @returns {str} 临时文件路径
  202. """
  203. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  204. filename = f"{county_name}_temp_data_{timestamp}.csv"
  205. file_path = os.path.join(self.output_data_dir, "temp", filename)
  206. # 确保目录存在
  207. os.makedirs(os.path.dirname(file_path), exist_ok=True)
  208. # 保存数据
  209. df.to_csv(file_path, index=False, encoding='utf-8-sig')
  210. # 记录详细的文件信息
  211. file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
  212. self.logger.info(f"📁 临时数据文件已保存:")
  213. self.logger.info(f" 路径: {file_path}")
  214. self.logger.info(f" 大小: {file_size:,} bytes ({file_size/1024:.1f} KB)")
  215. self.logger.info(f" 数据形状: {df.shape[0]} 行 × {df.shape[1]} 列")
  216. # 清理旧的临时文件,只保留最新的5个
  217. self._cleanup_temp_files(county_name)
  218. return file_path
  219. async def add_county_support(self, county_name: str, boundary_file,
  220. coordinate_file) -> Dict[str, Any]:
  221. """
  222. 添加新县市支持
  223. @param {str} county_name - 县市名称
  224. @param boundary_file - 边界文件(Shapefile压缩包)
  225. @param coordinate_file - 坐标文件
  226. @returns {Dict[str, Any]} 添加结果
  227. """
  228. try:
  229. # 创建县市专用目录
  230. county_dir = os.path.join(self.output_data_dir, "counties", county_name)
  231. os.makedirs(county_dir, exist_ok=True)
  232. # 处理边界文件
  233. if boundary_file.filename.endswith('.zip'):
  234. boundary_content = await boundary_file.read()
  235. boundary_zip_path = os.path.join(county_dir, "boundary.zip")
  236. with open(boundary_zip_path, 'wb') as f:
  237. f.write(boundary_content)
  238. # 解压Shapefile
  239. with zipfile.ZipFile(boundary_zip_path, 'r') as zip_ref:
  240. zip_ref.extractall(os.path.join(county_dir, "boundary"))
  241. # 处理坐标文件
  242. coordinate_content = await coordinate_file.read()
  243. coordinate_path = os.path.join(county_dir, "coordinates.csv")
  244. df_coords = pd.read_csv(io.StringIO(coordinate_content.decode('utf-8')))
  245. df_coords.to_csv(coordinate_path, index=False, encoding='utf-8-sig')
  246. # 更新支持的县市配置
  247. self.supported_counties[county_name] = {
  248. "boundary_file": os.path.join(county_dir, "boundary"),
  249. "coordinate_file": coordinate_path,
  250. "template_file": "", # 需要后续生成
  251. "region_code": "",
  252. "display_name": county_name,
  253. "province": ""
  254. }
  255. return {
  256. "county_name": county_name,
  257. "boundary_path": os.path.join(county_dir, "boundary"),
  258. "coordinate_path": coordinate_path,
  259. "status": "success"
  260. }
  261. except Exception as e:
  262. self.logger.error(f"添加县市支持失败: {str(e)}")
  263. raise
  264. async def generate_crop_cd_prediction_for_county(
  265. self,
  266. county_name: str,
  267. data_file: Optional[str] = None,
  268. raster_config_override: Optional[Dict[str, Any]] = None
  269. ) -> Dict[str, Any]:
  270. """
  271. 为指定县市生成作物Cd预测
  272. @param {str} county_name - 县市名称
  273. @param {Optional[str]} data_file - 可选的数据文件路径
  274. @returns {Dict[str, Any]} 预测结果信息
  275. """
  276. if not self.is_county_supported(county_name):
  277. raise ValueError(f"不支持的县市: {county_name}")
  278. try:
  279. # 获取县市配置
  280. county_config = self.supported_counties[county_name]
  281. # 如果提供了自定义数据文件,使用它替换默认数据
  282. if data_file:
  283. # 准备作物Cd模型的自定义数据
  284. self._prepare_crop_cd_custom_data(data_file, county_name)
  285. # 在线程池中运行CPU密集型任务
  286. loop = asyncio.get_event_loop()
  287. result = await loop.run_in_executor(
  288. None,
  289. self._run_crop_cd_prediction_with_county,
  290. county_name, county_config, raster_config_override
  291. )
  292. return result
  293. except Exception as e:
  294. self.logger.error(f"为{county_name}生成作物Cd预测失败: {str(e)}")
  295. raise
  296. async def generate_effective_cd_prediction_for_county(
  297. self,
  298. county_name: str,
  299. data_file: Optional[str] = None,
  300. raster_config_override: Optional[Dict[str, Any]] = None
  301. ) -> Dict[str, Any]:
  302. """
  303. 为指定县市生成有效态Cd预测
  304. @param {str} county_name - 县市名称
  305. @param {Optional[str]} data_file - 可选的数据文件路径
  306. @returns {Dict[str, Any]} 预测结果信息
  307. """
  308. if not self.is_county_supported(county_name):
  309. raise ValueError(f"不支持的县市: {county_name}")
  310. try:
  311. # 获取县市配置
  312. county_config = self.supported_counties[county_name]
  313. # 如果提供了自定义数据文件,使用它替换默认数据
  314. if data_file:
  315. # 准备有效态Cd模型的自定义数据
  316. self._prepare_effective_cd_custom_data(data_file, county_name)
  317. # 在线程池中运行CPU密集型任务
  318. loop = asyncio.get_event_loop()
  319. result = await loop.run_in_executor(
  320. None,
  321. self._run_effective_cd_prediction_with_county,
  322. county_name, county_config, raster_config_override
  323. )
  324. return result
  325. except Exception as e:
  326. self.logger.error(f"为{county_name}生成有效态Cd预测失败: {str(e)}")
  327. raise
  328. def _prepare_crop_cd_custom_data(self, data_file: str, county_name: str):
  329. """
  330. 准备作物Cd模型的自定义数据文件
  331. @param {str} data_file - 数据文件路径
  332. @param {str} county_name - 县市名称
  333. """
  334. try:
  335. import pandas as pd
  336. # 读取用户上传的CSV文件
  337. df = pd.read_csv(data_file, encoding='utf-8')
  338. # 获取Cd预测系统的数据目录
  339. cd_system_path = self.config.get_cd_system_path()
  340. # 1. 提取坐标信息并保存为独立的坐标文件
  341. coordinates_df = pd.DataFrame({
  342. 'longitude': df.iloc[:, 0], # 第一列为经度
  343. 'latitude': df.iloc[:, 1] # 第二列为纬度
  344. })
  345. # 保存坐标文件到系统数据目录
  346. coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
  347. os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
  348. coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
  349. # 记录坐标文件详细信息
  350. coord_file_size = os.path.getsize(coord_file_path) if os.path.exists(coord_file_path) else 0
  351. self.logger.info(f"🗺️ 坐标文件已保存:")
  352. self.logger.info(f" 路径: {coord_file_path}")
  353. self.logger.info(f" 大小: {coord_file_size:,} bytes ({coord_file_size/1024:.1f} KB)")
  354. self.logger.info(f" 坐标点数: {coordinates_df.shape[0]} 个")
  355. # 2. 准备作物Cd模型的训练数据
  356. crop_cd_data_dir = os.path.join(cd_system_path, "models", "crop_cd_model", "data")
  357. crop_target_file = os.path.join(crop_cd_data_dir, "areatest.csv")
  358. # 不再创建备份文件,因为此文件每次都会被用户数据完全覆盖
  359. if os.path.exists(crop_target_file):
  360. original_size = os.path.getsize(crop_target_file)
  361. self.logger.info(f"🔄 准备覆盖作物Cd模型数据文件:")
  362. self.logger.info(f" 文件路径: {crop_target_file}")
  363. self.logger.info(f" 原始文件大小: {original_size:,} bytes ({original_size/1024:.1f} KB)")
  364. # 清理现有的备份文件(如果存在)
  365. self._cleanup_backup_files(crop_target_file, max_backups=0)
  366. # 提取环境因子数据(去掉前两列的经纬度)
  367. environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
  368. # 保存环境因子数据到作物Cd模型目录(不包含坐标)
  369. environmental_data.to_csv(crop_target_file, index=False, encoding='utf-8-sig')
  370. # 记录作物Cd模型数据文件详细信息
  371. model_file_size = os.path.getsize(crop_target_file) if os.path.exists(crop_target_file) else 0
  372. self.logger.info(f"🌾 作物Cd模型数据文件已保存:")
  373. self.logger.info(f" 路径: {crop_target_file}")
  374. self.logger.info(f" 大小: {model_file_size:,} bytes ({model_file_size/1024:.1f} KB)")
  375. self.logger.info(f" 数据形状: {environmental_data.shape[0]} 行 × {environmental_data.shape[1]} 列")
  376. self.logger.info(f" 环境因子列数: {environmental_data.shape[1]}")
  377. self.logger.info(f"✅ 作物Cd模型自定义数据文件已准备完成,县市: {county_name}")
  378. except Exception as e:
  379. self.logger.error(f"准备作物Cd模型自定义数据文件失败: {str(e)}")
  380. raise
  381. def _prepare_effective_cd_custom_data(self, data_file: str, county_name: str):
  382. """
  383. 准备有效态Cd模型的自定义数据文件
  384. @param {str} data_file - 数据文件路径
  385. @param {str} county_name - 县市名称
  386. """
  387. try:
  388. import pandas as pd
  389. # 读取用户上传的CSV文件
  390. df = pd.read_csv(data_file, encoding='utf-8')
  391. # 获取Cd预测系统的数据目录
  392. cd_system_path = self.config.get_cd_system_path()
  393. # 1. 提取坐标信息并保存为独立的坐标文件
  394. coordinates_df = pd.DataFrame({
  395. 'longitude': df.iloc[:, 0], # 第一列为经度
  396. 'latitude': df.iloc[:, 1] # 第二列为纬度
  397. })
  398. # 保存坐标文件到系统数据目录
  399. coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
  400. os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
  401. coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
  402. self.logger.info(f"坐标文件已保存: {coord_file_path}")
  403. # 2. 准备有效态Cd模型的训练数据
  404. effective_cd_data_dir = os.path.join(cd_system_path, "models", "effective_cd_model", "data")
  405. effective_target_file = os.path.join(effective_cd_data_dir, "areatest.csv")
  406. # 创建备份文件路径
  407. backup_dir = os.path.join(cd_system_path, "backups")
  408. os.makedirs(backup_dir, exist_ok=True)
  409. backup_file = os.path.join(backup_dir, f"areatest_backup_{datetime.now().strftime('%Y%m%d')}.csv")
  410. # 如果目标文件存在,创建备份
  411. if os.path.exists(effective_target_file):
  412. # 创建备份
  413. shutil.copy2(effective_target_file, backup_file)
  414. self.logger.info(f"已创建备份文件: {backup_file}")
  415. original_size = os.path.getsize(effective_target_file)
  416. self.logger.info(f"🔄 准备覆盖有效态Cd模型数据文件:")
  417. self.logger.info(f" 文件路径: {effective_target_file}")
  418. self.logger.info(f" 原始文件大小: {original_size:,} bytes ({original_size / 1024:.1f} KB)")
  419. # 清理现有的备份文件(如果存在)
  420. self._cleanup_backup_files(effective_target_file, max_backups=0)
  421. # 检查用户数据是否包含足够的环境因子列
  422. user_env_columns = df.shape[1] - 2 # 减去经纬度列
  423. if user_env_columns >= 21:
  424. # 用户数据包含足够的环境因子列,使用用户数据
  425. # 提取环境因子数据(去掉前两列的经纬度)
  426. user_environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
  427. # 如果用户数据列数超过21列,只取前21列
  428. if user_environmental_data.shape[1] > 21:
  429. user_environmental_data = user_environmental_data.iloc[:, :21]
  430. self.logger.info(f"用户数据有{df.shape[1] - 2}列环境因子,取前21列用于有效态Cd模型")
  431. # 保存用户的环境因子数据到有效态Cd模型目录
  432. user_environmental_data.to_csv(effective_target_file, index=False, encoding='utf-8-sig')
  433. self.logger.info(f"有效态Cd模型使用用户数据,形状: {user_environmental_data.shape}")
  434. else:
  435. # 用户数据环境因子列数不足,恢复使用备份数据
  436. if os.path.exists(backup_file):
  437. shutil.copy2(backup_file, effective_target_file)
  438. self.logger.info(f"用户数据环境因子列数不足({user_env_columns} < 21),有效态Cd模型恢复使用备份数据")
  439. else:
  440. # 如果没有备份文件,使用原始数据但记录警告
  441. self.logger.warning(
  442. f"用户数据环境因子列数不足({user_env_columns} < 21)且无备份文件,继续使用当前数据")
  443. self.logger.info(f"有效态Cd模型自定义数据文件已准备完成,县市: {county_name}")
  444. except Exception as e:
  445. self.logger.error(f"准备有效态Cd模型自定义数据文件失败: {str(e)}")
  446. raise
  447. def _run_crop_cd_prediction_with_county(self, county_name: str,
  448. county_config: Dict[str, Any],
  449. raster_config_override: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
  450. """
  451. 执行指定县市的作物Cd预测
  452. @param {str} county_name - 县市名称
  453. @param {Dict[str, Any]} county_config - 县市配置
  454. @returns {Dict[str, Any]} 预测结果信息
  455. """
  456. try:
  457. # 用数据库边界覆盖环境变量给集成系统
  458. tmp_geojson = None
  459. try:
  460. db = SessionLocal()
  461. feature = get_boundary_geojson_by_name(db, county_name, level="auto")
  462. fc = {"type": "FeatureCollection", "features": [feature]}
  463. tmp_dir = tempfile.mkdtemp()
  464. tmp_geojson = os.path.join(tmp_dir, "boundary.geojson")
  465. with open(tmp_geojson, 'w', encoding='utf-8') as f:
  466. json.dump(fc, f, ensure_ascii=False)
  467. os.environ['CD_BOUNDARY_FILE'] = tmp_geojson
  468. except Exception as _e:
  469. self.logger.warning(f"从数据库获取边界失败,回退到默认配置: {str(_e)}")
  470. finally:
  471. try:
  472. db.close()
  473. except Exception:
  474. pass
  475. # 运行作物Cd预测
  476. self.logger.info(f"为{county_name}执行作物Cd预测")
  477. prediction_result = self.wrapper.run_prediction_script("crop", raster_config_override)
  478. # 获取输出文件(指定作物Cd模型类型)
  479. latest_outputs = self.wrapper.get_latest_outputs("all", "crop")
  480. # 复制文件到API输出目录
  481. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  482. model_type = f"crop_cd_{county_name}"
  483. copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
  484. # 清理旧文件
  485. self._cleanup_old_files(model_type)
  486. result_obj = {
  487. 'map_path': copied_files.get('map_path'),
  488. 'histogram_path': copied_files.get('histogram_path'),
  489. 'raster_path': copied_files.get('raster_path'),
  490. 'model_type': model_type,
  491. 'county_name': county_name,
  492. 'timestamp': timestamp,
  493. 'stats': self._get_file_stats(copied_files.get('map_path'))
  494. }
  495. # 清理临时边界
  496. try:
  497. if tmp_geojson and os.path.exists(tmp_geojson):
  498. import shutil
  499. shutil.rmtree(os.path.dirname(tmp_geojson), ignore_errors=True)
  500. if 'CD_BOUNDARY_FILE' in os.environ:
  501. del os.environ['CD_BOUNDARY_FILE']
  502. except Exception:
  503. pass
  504. return result_obj
  505. except Exception as e:
  506. self.logger.error(f"为{county_name}执行作物Cd预测失败: {str(e)}")
  507. raise
  508. def _run_effective_cd_prediction_with_county(self, county_name: str,
  509. county_config: Dict[str, Any],
  510. raster_config_override: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
  511. """
  512. 执行指定县市的有效态Cd预测
  513. @param {str} county_name - 县市名称
  514. @param {Dict[str, Any]} county_config - 县市配置
  515. @returns {Dict[str, Any]} 预测结果信息
  516. """
  517. try:
  518. # 用数据库边界覆盖环境变量给集成系统
  519. tmp_geojson = None
  520. try:
  521. db = SessionLocal()
  522. feature = get_boundary_geojson_by_name(db, county_name, level="auto")
  523. fc = {"type": "FeatureCollection", "features": [feature]}
  524. tmp_dir = tempfile.mkdtemp()
  525. tmp_geojson = os.path.join(tmp_dir, "boundary.geojson")
  526. with open(tmp_geojson, 'w', encoding='utf-8') as f:
  527. json.dump(fc, f, ensure_ascii=False)
  528. os.environ['CD_BOUNDARY_FILE'] = tmp_geojson
  529. except Exception as _e:
  530. self.logger.warning(f"从数据库获取边界失败,回退到默认配置: {str(_e)}")
  531. finally:
  532. try:
  533. db.close()
  534. except Exception:
  535. pass
  536. # 运行有效态Cd预测
  537. self.logger.info(f"为{county_name}执行有效态Cd预测")
  538. prediction_result = self.wrapper.run_prediction_script("effective", raster_config_override)
  539. # 获取输出文件(指定有效态Cd模型类型)
  540. latest_outputs = self.wrapper.get_latest_outputs("all", "effective")
  541. # 复制文件到API输出目录
  542. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  543. model_type = f"effective_cd_{county_name}"
  544. copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
  545. # 清理旧文件
  546. self._cleanup_old_files(model_type)
  547. result_obj = {
  548. 'map_path': copied_files.get('map_path'),
  549. 'histogram_path': copied_files.get('histogram_path'),
  550. 'raster_path': copied_files.get('raster_path'),
  551. 'model_type': model_type,
  552. 'county_name': county_name,
  553. 'timestamp': timestamp,
  554. 'stats': self._get_file_stats(copied_files.get('map_path'))
  555. }
  556. # 清理临时边界
  557. try:
  558. if tmp_geojson and os.path.exists(tmp_geojson):
  559. import shutil
  560. shutil.rmtree(os.path.dirname(tmp_geojson), ignore_errors=True)
  561. if 'CD_BOUNDARY_FILE' in os.environ:
  562. del os.environ['CD_BOUNDARY_FILE']
  563. except Exception:
  564. pass
  565. return result_obj
  566. except Exception as e:
  567. self.logger.error(f"为{county_name}执行有效态Cd预测失败: {str(e)}")
  568. raise
  569. async def generate_crop_cd_prediction(self) -> Dict[str, Any]:
  570. """
  571. 生成作物Cd预测结果和可视化
  572. @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
  573. @throws {Exception} 当预测过程发生错误时抛出
  574. @example
  575. >>> service = CdPredictionService()
  576. >>> result = await service.generate_crop_cd_prediction()
  577. >>> print(result['map_path'])
  578. """
  579. try:
  580. self.logger.info("开始作物Cd模型预测流程")
  581. # 在线程池中运行CPU密集型任务
  582. loop = asyncio.get_event_loop()
  583. result = await loop.run_in_executor(
  584. None,
  585. self._run_crop_cd_prediction
  586. )
  587. return result
  588. except Exception as e:
  589. self.logger.error(f"作物Cd预测流程失败: {str(e)}")
  590. raise
  591. async def generate_effective_cd_prediction(self) -> Dict[str, Any]:
  592. """
  593. 生成有效态Cd预测结果和可视化
  594. @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
  595. @throws {Exception} 当预测过程发生错误时抛出
  596. @example
  597. >>> service = CdPredictionService()
  598. >>> result = await service.generate_effective_cd_prediction()
  599. >>> print(result['map_path'])
  600. """
  601. try:
  602. self.logger.info("开始有效态Cd模型预测流程")
  603. # 在线程池中运行CPU密集型任务
  604. loop = asyncio.get_event_loop()
  605. result = await loop.run_in_executor(
  606. None,
  607. self._run_effective_cd_prediction
  608. )
  609. return result
  610. except Exception as e:
  611. self.logger.error(f"有效态Cd预测流程失败: {str(e)}")
  612. raise
  613. def _run_crop_cd_prediction(self) -> Dict[str, Any]:
  614. """
  615. 执行作物Cd预测的同步逻辑
  616. @returns {Dict[str, Any]} 预测结果信息
  617. @throws {Exception} 当预测过程发生错误时抛出
  618. """
  619. try:
  620. # 运行作物Cd预测
  621. self.logger.info("执行作物Cd预测")
  622. prediction_result = self.wrapper.run_prediction_script("crop")
  623. # 获取输出文件(指定作物Cd模型类型)
  624. latest_outputs = self.wrapper.get_latest_outputs("all", "crop")
  625. # 复制文件到API输出目录
  626. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  627. copied_files = self._copy_output_files(latest_outputs, "crop_cd", timestamp)
  628. # 清理旧文件
  629. self._cleanup_old_files("crop_cd")
  630. return {
  631. 'map_path': copied_files.get('map_path'),
  632. 'histogram_path': copied_files.get('histogram_path'),
  633. 'raster_path': copied_files.get('raster_path'),
  634. 'model_type': 'crop_cd',
  635. 'timestamp': timestamp,
  636. 'stats': self._get_file_stats(copied_files.get('map_path'))
  637. }
  638. except Exception as e:
  639. self.logger.error(f"作物Cd预测执行失败: {str(e)}")
  640. raise
  641. def _run_effective_cd_prediction(self) -> Dict[str, Any]:
  642. """
  643. 执行有效态Cd预测的同步逻辑
  644. @returns {Dict[str, Any]} 预测结果信息
  645. @throws {Exception} 当预测过程发生错误时抛出
  646. """
  647. try:
  648. # 运行有效态Cd预测
  649. self.logger.info("执行有效态Cd预测")
  650. prediction_result = self.wrapper.run_prediction_script("effective")
  651. # 获取输出文件(指定有效态Cd模型类型)
  652. latest_outputs = self.wrapper.get_latest_outputs("all", "effective")
  653. # 复制文件到API输出目录
  654. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  655. copied_files = self._copy_output_files(latest_outputs, "effective_cd", timestamp)
  656. # 清理旧文件
  657. self._cleanup_old_files("effective_cd")
  658. return {
  659. 'map_path': copied_files.get('map_path'),
  660. 'histogram_path': copied_files.get('histogram_path'),
  661. 'raster_path': copied_files.get('raster_path'),
  662. 'model_type': 'effective_cd',
  663. 'timestamp': timestamp,
  664. 'stats': self._get_file_stats(copied_files.get('map_path'))
  665. }
  666. except Exception as e:
  667. self.logger.error(f"有效态Cd预测执行失败: {str(e)}")
  668. raise
  669. def _copy_output_files(self, latest_outputs: Dict[str, Optional[str]],
  670. model_type: str, timestamp: str) -> Dict[str, Optional[str]]:
  671. """
  672. 复制输出文件到API目录
  673. @param {Dict[str, Optional[str]]} latest_outputs - 最新输出文件路径
  674. @param {str} model_type - 模型类型
  675. @param {str} timestamp - 时间戳
  676. @returns {Dict[str, Optional[str]]} 复制后的文件路径
  677. """
  678. copied_files = {}
  679. try:
  680. self.logger.info(f"🔄 开始复制输出文件 (模型类型: {model_type})")
  681. total_size = 0
  682. # 复制地图文件
  683. if latest_outputs.get('latest_map'):
  684. src_map = latest_outputs['latest_map']
  685. dst_map = os.path.join(
  686. self.output_figures_dir,
  687. f"{model_type}_prediction_map_{timestamp}.jpg"
  688. )
  689. shutil.copy2(src_map, dst_map)
  690. copied_files['map_path'] = dst_map
  691. # 记录详细信息
  692. src_size = os.path.getsize(src_map) if os.path.exists(src_map) else 0
  693. dst_size = os.path.getsize(dst_map) if os.path.exists(dst_map) else 0
  694. total_size += dst_size
  695. self.logger.info(f"🗺️ 地图文件已复制:")
  696. self.logger.info(f" 源文件: {src_map}")
  697. self.logger.info(f" 目标文件: {dst_map}")
  698. self.logger.info(f" 文件大小: {dst_size:,} bytes ({dst_size/1024:.1f} KB)")
  699. # 激进清理:立即删除源文件
  700. try:
  701. os.remove(src_map)
  702. self.logger.info(f"🗑️ 已删除源地图文件: {os.path.basename(src_map)}")
  703. except Exception as e:
  704. self.logger.warning(f"删除源地图文件失败: {str(e)}")
  705. # 复制直方图文件
  706. if latest_outputs.get('latest_histogram'):
  707. src_histogram = latest_outputs['latest_histogram']
  708. dst_histogram = os.path.join(
  709. self.output_figures_dir,
  710. f"{model_type}_prediction_histogram_{timestamp}.jpg"
  711. )
  712. shutil.copy2(src_histogram, dst_histogram)
  713. copied_files['histogram_path'] = dst_histogram
  714. # 记录详细信息
  715. src_size = os.path.getsize(src_histogram) if os.path.exists(src_histogram) else 0
  716. dst_size = os.path.getsize(dst_histogram) if os.path.exists(dst_histogram) else 0
  717. total_size += dst_size
  718. self.logger.info(f"📊 直方图文件已复制:")
  719. self.logger.info(f" 源文件: {src_histogram}")
  720. self.logger.info(f" 目标文件: {dst_histogram}")
  721. self.logger.info(f" 文件大小: {dst_size:,} bytes ({dst_size/1024:.1f} KB)")
  722. # 激进清理:立即删除源文件
  723. try:
  724. os.remove(src_histogram)
  725. self.logger.info(f"🗑️ 已删除源直方图文件: {os.path.basename(src_histogram)}")
  726. except Exception as e:
  727. self.logger.warning(f"删除源直方图文件失败: {str(e)}")
  728. # 处理栅格文件(不复制,直接删除源文件以节省空间)
  729. if latest_outputs.get('latest_raster'):
  730. src_raster = latest_outputs['latest_raster']
  731. # 记录栅格文件信息但不复制
  732. src_size = os.path.getsize(src_raster) if os.path.exists(src_raster) else 0
  733. self.logger.info(f"🌐 栅格文件处理:")
  734. self.logger.info(f" 源文件: {src_raster}")
  735. self.logger.info(f" 文件大小: {src_size:,} bytes ({src_size/1024:.1f} KB)")
  736. self.logger.info(f" 处理方式: 跳过复制,直接删除(栅格文件为中间文件)")
  737. # 激进清理:直接删除源文件,不进行复制
  738. try:
  739. os.remove(src_raster)
  740. self.logger.info(f"🗑️ 已删除中间栅格文件: {os.path.basename(src_raster)}")
  741. self.logger.info(f" 节省空间: {src_size:,} bytes ({src_size/1024:.1f} KB)")
  742. except Exception as e:
  743. self.logger.warning(f"删除源栅格文件失败: {str(e)}")
  744. # 不设置raster_path,因为不需要保留栅格文件
  745. copied_files['raster_path'] = None
  746. # 记录总体信息
  747. self.logger.info(f"✅ 文件复制完成,总大小: {total_size:,} bytes ({total_size/1024/1024:.1f} MB)")
  748. # 清理Cd预测系统原始输出文件
  749. self._cleanup_cd_system_outputs()
  750. # 清理中间数据文件
  751. self._cleanup_intermediate_data_files()
  752. except Exception as e:
  753. self.logger.error(f"复制输出文件失败: {str(e)}")
  754. return copied_files
  755. def _cleanup_old_files(self, model_type: str):
  756. """
  757. 清理旧的预测文件
  758. @param {str} model_type - 模型类型
  759. """
  760. try:
  761. max_files = self.config.get_api_config().get("max_prediction_files", 10)
  762. # 清理地图文件
  763. map_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_map_*.jpg")
  764. self._cleanup_files_by_pattern(map_pattern, max_files)
  765. # 清理直方图文件
  766. histogram_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_histogram_*.jpg")
  767. self._cleanup_files_by_pattern(histogram_pattern, max_files)
  768. # 不再需要清理API栅格文件,因为我们不再保留栅格文件
  769. # raster_pattern = os.path.join(self.output_raster_dir, f"{model_type}_prediction_raster_*.tif")
  770. # self._cleanup_files_by_pattern(raster_pattern, max_files)
  771. self.logger.info(f"跳过栅格文件清理(栅格文件已不再保留)")
  772. except Exception as e:
  773. self.logger.warning(f"清理旧文件失败: {str(e)}")
  774. def _cleanup_files_by_pattern(self, pattern: str, max_files: int):
  775. """
  776. 按模式清理文件
  777. @param {str} pattern - 文件模式
  778. @param {int} max_files - 最大保留文件数
  779. """
  780. try:
  781. files = glob.glob(pattern)
  782. if len(files) > max_files:
  783. # 按修改时间排序,删除最旧的文件
  784. files.sort(key=os.path.getmtime)
  785. for file_to_delete in files[:-max_files]:
  786. os.remove(file_to_delete)
  787. self.logger.info(f"已删除旧文件: {file_to_delete}")
  788. except Exception as e:
  789. self.logger.warning(f"清理文件失败 {pattern}: {str(e)}")
  790. def _cleanup_temp_files(self, county_name: str, max_files: int = 5):
  791. """
  792. 清理临时数据文件,只保留指定县市的最新文件
  793. @param {str} county_name - 县市名称
  794. @param {int} max_files - 最大保留文件数,默认5个
  795. """
  796. try:
  797. temp_dir = os.path.join(self.output_data_dir, "temp")
  798. temp_pattern = os.path.join(temp_dir, f"{county_name}_temp_data_*.csv")
  799. # 获取该县市的所有临时文件
  800. temp_files = glob.glob(temp_pattern)
  801. if len(temp_files) > max_files:
  802. # 按修改时间排序,删除最旧的文件
  803. temp_files.sort(key=os.path.getmtime)
  804. files_to_delete = temp_files[:-max_files]
  805. for file_to_delete in files_to_delete:
  806. os.remove(file_to_delete)
  807. self.logger.info(f"已删除旧临时文件: {os.path.basename(file_to_delete)}")
  808. self.logger.info(f"已清理{county_name}的临时文件,保留最新{max_files}个文件")
  809. except Exception as e:
  810. self.logger.warning(f"清理{county_name}临时文件失败: {str(e)}")
  811. def _cleanup_backup_files(self, target_file: str, max_backups: int = 3):
  812. """
  813. 清理备份文件,只保留最新的备份
  814. @param {str} target_file - 目标文件路径
  815. @param {int} max_backups - 最大保留备份数,默认3个
  816. """
  817. try:
  818. # 构建备份文件匹配模式
  819. backup_pattern = f"{target_file}.backup_*"
  820. backup_files = glob.glob(backup_pattern)
  821. if len(backup_files) > max_backups:
  822. # 按修改时间排序,删除最旧的备份文件
  823. backup_files.sort(key=os.path.getmtime)
  824. files_to_delete = backup_files[:-max_backups]
  825. for file_to_delete in files_to_delete:
  826. os.remove(file_to_delete)
  827. self.logger.info(f"已删除旧备份文件: {os.path.basename(file_to_delete)}")
  828. self.logger.info(f"已清理备份文件,保留最新{max_backups}个备份")
  829. except Exception as e:
  830. self.logger.warning(f"清理备份文件失败: {str(e)}")
  831. def _delete_all_files_by_pattern(self, pattern: str) -> Tuple[int, int]:
  832. """
  833. 删除所有匹配模式的文件(激进清理模式)
  834. @param {str} pattern - 文件匹配模式
  835. @returns {Tuple[int, int]} 返回删除的文件数量和释放的字节数
  836. """
  837. deleted_count = 0
  838. total_size_freed = 0
  839. try:
  840. files = glob.glob(pattern)
  841. for file_path in files:
  842. try:
  843. # 获取文件大小
  844. file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
  845. # 删除文件
  846. os.remove(file_path)
  847. deleted_count += 1
  848. total_size_freed += file_size
  849. self.logger.info(f"🗑️ 已删除中间文件: {os.path.basename(file_path)} ({file_size:,} bytes)")
  850. except Exception as e:
  851. self.logger.warning(f"删除文件失败 {file_path}: {str(e)}")
  852. if deleted_count > 0:
  853. self.logger.info(f"🧹 模式 '{os.path.basename(pattern)}' 清理完成: {deleted_count} 个文件, {total_size_freed:,} bytes")
  854. except Exception as e:
  855. self.logger.warning(f"按模式删除文件失败 {pattern}: {str(e)}")
  856. return deleted_count, total_size_freed
  857. def _cleanup_cd_system_outputs(self, aggressive_mode: bool = True):
  858. """
  859. 清理Cd预测系统原始输出文件(激进模式:删除所有中间文件)
  860. @param {bool} aggressive_mode - 激进模式,默认True(删除所有中间文件)
  861. """
  862. try:
  863. cd_system_path = self.config.get_cd_system_path()
  864. output_figures_dir = os.path.join(cd_system_path, "output", "figures")
  865. output_raster_dir = os.path.join(cd_system_path, "output", "raster")
  866. total_deleted = 0
  867. total_size_freed = 0
  868. # 清理figures目录下的所有预测输出文件
  869. if os.path.exists(output_figures_dir):
  870. # 立即删除所有地图文件
  871. map_patterns = [
  872. "Prediction_results_*.jpg",
  873. "Prediction_results_*.png"
  874. ]
  875. for pattern in map_patterns:
  876. full_pattern = os.path.join(output_figures_dir, pattern)
  877. deleted, size_freed = self._delete_all_files_by_pattern(full_pattern)
  878. total_deleted += deleted
  879. total_size_freed += size_freed
  880. # 立即删除所有直方图文件
  881. histogram_patterns = [
  882. "Prediction_frequency_*.jpg",
  883. "Prediction_frequency_*.png"
  884. ]
  885. for pattern in histogram_patterns:
  886. full_pattern = os.path.join(output_figures_dir, pattern)
  887. deleted, size_freed = self._delete_all_files_by_pattern(full_pattern)
  888. total_deleted += deleted
  889. total_size_freed += size_freed
  890. # 清理raster目录下的所有文件
  891. if os.path.exists(output_raster_dir):
  892. # 立即删除所有栅格相关文件
  893. raster_patterns = [
  894. "output_*.tif",
  895. "points_*.shp",
  896. "points_*.dbf",
  897. "points_*.prj",
  898. "points_*.shx",
  899. "points_*.cpg"
  900. ]
  901. for pattern in raster_patterns:
  902. full_pattern = os.path.join(output_raster_dir, pattern)
  903. deleted, size_freed = self._delete_all_files_by_pattern(full_pattern)
  904. total_deleted += deleted
  905. total_size_freed += size_freed
  906. self.logger.info(f"🧹 激进清理完成:")
  907. self.logger.info(f" 删除文件数: {total_deleted} 个")
  908. self.logger.info(f" 释放空间: {total_size_freed:,} bytes ({total_size_freed/1024/1024:.2f} MB)")
  909. except Exception as e:
  910. self.logger.warning(f"激进清理Cd预测系统输出文件失败: {str(e)}")
  911. def _cleanup_intermediate_data_files(self):
  912. """
  913. 清理中间数据文件
  914. 包括:坐标.csv 和 combined_pH.csv, pHcombined.csv
  915. """
  916. try:
  917. cd_system_path = self.config.get_cd_system_path()
  918. deleted_count = 0
  919. total_size_freed = 0
  920. # 要清理的中间数据文件
  921. intermediate_files = [
  922. os.path.join(cd_system_path, "data", "coordinates", "坐标.csv"),
  923. os.path.join(cd_system_path, "data", "predictions", "combined_pH.csv"),
  924. os.path.join(cd_system_path, "data", "predictions", "pHcombined.csv")
  925. ]
  926. self.logger.info("🧹 开始清理中间数据文件...")
  927. for file_path in intermediate_files:
  928. if os.path.exists(file_path):
  929. try:
  930. file_size = os.path.getsize(file_path)
  931. os.remove(file_path)
  932. deleted_count += 1
  933. total_size_freed += file_size
  934. self.logger.info(f"🗑️ 已删除中间数据文件: {os.path.basename(file_path)}")
  935. self.logger.info(f" 文件路径: {file_path}")
  936. self.logger.info(f" 文件大小: {file_size:,} bytes ({file_size/1024:.1f} KB)")
  937. except Exception as e:
  938. self.logger.warning(f"删除中间数据文件失败 {file_path}: {str(e)}")
  939. else:
  940. self.logger.debug(f"中间数据文件不存在,跳过: {os.path.basename(file_path)}")
  941. if deleted_count > 0:
  942. self.logger.info(f"✅ 中间数据文件清理完成:")
  943. self.logger.info(f" 删除文件数: {deleted_count} 个")
  944. self.logger.info(f" 释放空间: {total_size_freed:,} bytes ({total_size_freed/1024:.1f} KB)")
  945. else:
  946. self.logger.info("ℹ️ 没有找到需要清理的中间数据文件")
  947. except Exception as e:
  948. self.logger.warning(f"清理中间数据文件失败: {str(e)}")
  949. def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]:
  950. """
  951. 获取文件统计信息
  952. @param {Optional[str]} file_path - 文件路径
  953. @returns {Dict[str, Any]} 文件统计信息
  954. """
  955. if not file_path or not os.path.exists(file_path):
  956. return {}
  957. try:
  958. stat = os.stat(file_path)
  959. return {
  960. 'file_size': stat.st_size,
  961. 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
  962. 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat()
  963. }
  964. except Exception:
  965. return {}
  966. def get_latest_crop_cd_map(self) -> Optional[str]:
  967. """
  968. 获取最新的作物Cd预测地图文件路径
  969. @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
  970. """
  971. try:
  972. pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_map_*.jpg")
  973. files = glob.glob(pattern)
  974. if files:
  975. return max(files, key=os.path.getctime)
  976. return None
  977. except Exception:
  978. return None
  979. def get_latest_effective_cd_map(self) -> Optional[str]:
  980. """
  981. 获取最新的有效态Cd预测地图文件路径
  982. @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
  983. """
  984. try:
  985. pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_map_*.jpg")
  986. files = glob.glob(pattern)
  987. if files:
  988. return max(files, key=os.path.getctime)
  989. return None
  990. except Exception:
  991. return None
  992. def get_latest_crop_cd_histogram(self) -> Optional[str]:
  993. """
  994. 获取最新的作物Cd预测直方图文件路径
  995. @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
  996. """
  997. try:
  998. pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_histogram_*.jpg")
  999. files = glob.glob(pattern)
  1000. if files:
  1001. return max(files, key=os.path.getctime)
  1002. return None
  1003. except Exception:
  1004. return None
  1005. def get_latest_effective_cd_histogram(self) -> Optional[str]:
  1006. """
  1007. 获取最新的有效态Cd预测直方图文件路径
  1008. @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
  1009. """
  1010. try:
  1011. pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_histogram_*.jpg")
  1012. files = glob.glob(pattern)
  1013. if files:
  1014. return max(files, key=os.path.getctime)
  1015. return None
  1016. except Exception:
  1017. return None
  1018. # =============================================================================
  1019. # 统计信息方法
  1020. # =============================================================================
  1021. def get_crop_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
  1022. """
  1023. 获取作物Cd预测结果的统计信息
  1024. @param {str} county_name - 县市名称
  1025. @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
  1026. """
  1027. try:
  1028. # 查找最新的预测结果文件
  1029. cd_system_path = self.config.get_cd_system_path()
  1030. final_data_path = os.path.join(cd_system_path, "data", "final", "Final_predictions_crop_cd.csv")
  1031. if not os.path.exists(final_data_path):
  1032. self.logger.warning(f"未找到作物Cd预测结果文件: {final_data_path}")
  1033. return None
  1034. # 读取预测数据
  1035. df = pd.read_csv(final_data_path)
  1036. if 'Prediction' not in df.columns:
  1037. self.logger.warning("预测结果文件中缺少'Prediction'列")
  1038. return None
  1039. predictions = df['Prediction']
  1040. # 计算基础统计信息
  1041. basic_stats = {
  1042. "数据点总数": len(predictions),
  1043. "均值": float(predictions.mean()),
  1044. "中位数": float(predictions.median()),
  1045. "标准差": float(predictions.std()),
  1046. "最小值": float(predictions.min()),
  1047. "最大值": float(predictions.max()),
  1048. "25%分位数": float(predictions.quantile(0.25)),
  1049. "75%分位数": float(predictions.quantile(0.75)),
  1050. "偏度": float(predictions.skew()),
  1051. "峰度": float(predictions.kurtosis())
  1052. }
  1053. # 计算分布直方图数据
  1054. histogram_data = self._calculate_histogram_data(predictions)
  1055. # 计算空间统计信息(如果有坐标信息)
  1056. spatial_stats = None
  1057. if 'longitude' in df.columns and 'latitude' in df.columns:
  1058. spatial_stats = self._calculate_spatial_statistics(df)
  1059. return {
  1060. "模型类型": "作物Cd模型",
  1061. "县市名称": county_name,
  1062. "数据更新时间": datetime.fromtimestamp(os.path.getmtime(final_data_path)).isoformat(),
  1063. "基础统计": basic_stats,
  1064. "分布直方图": histogram_data,
  1065. "空间统计": spatial_stats
  1066. }
  1067. except Exception as e:
  1068. self.logger.error(f"获取作物Cd统计信息失败: {str(e)}")
  1069. return None
  1070. def get_effective_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
  1071. """
  1072. 获取有效态Cd预测结果的统计信息
  1073. @param {str} county_name - 县市名称
  1074. @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
  1075. """
  1076. try:
  1077. # 查找最新的预测结果文件
  1078. cd_system_path = self.config.get_cd_system_path()
  1079. final_data_path = os.path.join(cd_system_path, "data", "final", "Final_predictions_effective_cd.csv")
  1080. if not os.path.exists(final_data_path):
  1081. self.logger.warning(f"未找到有效态Cd预测结果文件: {final_data_path}")
  1082. return None
  1083. # 读取预测数据
  1084. df = pd.read_csv(final_data_path)
  1085. if 'Prediction' not in df.columns:
  1086. self.logger.warning("预测结果文件中缺少'Prediction'列")
  1087. return None
  1088. predictions = df['Prediction']
  1089. # 计算基础统计信息
  1090. basic_stats = {
  1091. "数据点总数": len(predictions),
  1092. "均值": float(predictions.mean()),
  1093. "中位数": float(predictions.median()),
  1094. "标准差": float(predictions.std()),
  1095. "最小值": float(predictions.min()),
  1096. "最大值": float(predictions.max()),
  1097. "25%分位数": float(predictions.quantile(0.25)),
  1098. "75%分位数": float(predictions.quantile(0.75)),
  1099. "偏度": float(predictions.skew()),
  1100. "峰度": float(predictions.kurtosis())
  1101. }
  1102. # 计算分布直方图数据
  1103. histogram_data = self._calculate_histogram_data(predictions)
  1104. # 计算空间统计信息(如果有坐标信息)
  1105. spatial_stats = None
  1106. if 'longitude' in df.columns and 'latitude' in df.columns:
  1107. spatial_stats = self._calculate_spatial_statistics(df)
  1108. return {
  1109. "模型类型": "有效态Cd模型",
  1110. "县市名称": county_name,
  1111. "数据更新时间": datetime.fromtimestamp(os.path.getmtime(final_data_path)).isoformat(),
  1112. "基础统计": basic_stats,
  1113. "分布直方图": histogram_data,
  1114. "空间统计": spatial_stats
  1115. }
  1116. except Exception as e:
  1117. self.logger.error(f"获取有效态Cd统计信息失败: {str(e)}")
  1118. return None
  1119. def get_combined_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
  1120. """
  1121. 获取综合预测统计信息
  1122. @param {str} county_name - 县市名称
  1123. @returns {Optional[Dict[str, Any]]} 综合统计信息,如果没有数据则返回None
  1124. """
  1125. try:
  1126. crop_stats = self.get_crop_cd_statistics(county_name)
  1127. effective_stats = self.get_effective_cd_statistics(county_name)
  1128. if not crop_stats and not effective_stats:
  1129. return None
  1130. return {
  1131. "县市名称": county_name,
  1132. "作物Cd统计": crop_stats,
  1133. "有效态Cd统计": effective_stats,
  1134. "生成时间": datetime.now().isoformat()
  1135. }
  1136. except Exception as e:
  1137. self.logger.error(f"获取综合统计信息失败: {str(e)}")
  1138. return None
  1139. def get_all_counties_statistics(self) -> Dict[str, Any]:
  1140. """
  1141. 获取所有支持县市的统计概览
  1142. @returns {Dict[str, Any]} 所有县市的统计概览
  1143. """
  1144. try:
  1145. all_stats = {
  1146. "支持县市总数": len(self.supported_counties),
  1147. "统计生成时间": datetime.now().isoformat(),
  1148. "县市统计": {},
  1149. "汇总信息": {
  1150. "有作物Cd数据的县市": 0,
  1151. "有有效态Cd数据的县市": 0,
  1152. "数据完整的县市": 0
  1153. }
  1154. }
  1155. for county_name in self.supported_counties.keys():
  1156. county_stats = {
  1157. "县市名称": county_name,
  1158. "有作物Cd数据": False,
  1159. "有有效态Cd数据": False,
  1160. "数据完整": False,
  1161. "最新更新时间": None
  1162. }
  1163. # 检查作物Cd数据
  1164. crop_stats = self.get_crop_cd_statistics(county_name)
  1165. if crop_stats:
  1166. county_stats["有作物Cd数据"] = True
  1167. county_stats["作物Cd概要"] = {
  1168. "数据点数": crop_stats["基础统计"]["数据点总数"],
  1169. "均值": crop_stats["基础统计"]["均值"],
  1170. "最大值": crop_stats["基础统计"]["最大值"]
  1171. }
  1172. all_stats["汇总信息"]["有作物Cd数据的县市"] += 1
  1173. # 检查有效态Cd数据
  1174. effective_stats = self.get_effective_cd_statistics(county_name)
  1175. if effective_stats:
  1176. county_stats["有有效态Cd数据"] = True
  1177. county_stats["有效态Cd概要"] = {
  1178. "数据点数": effective_stats["基础统计"]["数据点总数"],
  1179. "均值": effective_stats["基础统计"]["均值"],
  1180. "最大值": effective_stats["基础统计"]["最大值"]
  1181. }
  1182. all_stats["汇总信息"]["有有效态Cd数据的县市"] += 1
  1183. # 检查数据完整性
  1184. if county_stats["有作物Cd数据"] and county_stats["有有效态Cd数据"]:
  1185. county_stats["数据完整"] = True
  1186. all_stats["汇总信息"]["数据完整的县市"] += 1
  1187. all_stats["县市统计"][county_name] = county_stats
  1188. return all_stats
  1189. except Exception as e:
  1190. self.logger.error(f"获取所有县市统计概览失败: {str(e)}")
  1191. return {
  1192. "error": f"获取统计概览失败: {str(e)}",
  1193. "支持县市总数": len(self.supported_counties),
  1194. "统计生成时间": datetime.now().isoformat()
  1195. }
  1196. # =============================================================================
  1197. # 辅助统计方法
  1198. # =============================================================================
  1199. def _calculate_histogram_data(self, predictions: pd.Series, bins: int = 20) -> Dict[str, Any]:
  1200. """
  1201. 计算分布直方图数据
  1202. @param {pd.Series} predictions - 预测值
  1203. @param {int} bins - 直方图区间数
  1204. @returns {Dict[str, Any]} 直方图数据
  1205. """
  1206. try:
  1207. import numpy as np
  1208. hist, bin_edges = np.histogram(predictions, bins=bins)
  1209. # 计算区间中心点
  1210. bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
  1211. return {
  1212. "区间数": int(bins),
  1213. "频次": [int(count) for count in hist],
  1214. "区间中心": [float(center) for center in bin_centers],
  1215. "区间边界": [float(edge) for edge in bin_edges],
  1216. "总频次": int(hist.sum())
  1217. }
  1218. except Exception as e:
  1219. self.logger.error(f"计算直方图数据失败: {str(e)}")
  1220. return {}
  1221. def _calculate_spatial_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
  1222. """
  1223. 计算空间统计信息
  1224. @param {pd.DataFrame} df - 包含坐标和预测值的数据框
  1225. @returns {Dict[str, Any]} 空间统计信息
  1226. """
  1227. try:
  1228. spatial_stats = {
  1229. "经度范围": {
  1230. "最小值": float(df['longitude'].min()),
  1231. "最大值": float(df['longitude'].max()),
  1232. "跨度": float(df['longitude'].max() - df['longitude'].min())
  1233. },
  1234. "纬度范围": {
  1235. "最小值": float(df['latitude'].min()),
  1236. "最大值": float(df['latitude'].max()),
  1237. "跨度": float(df['latitude'].max() - df['latitude'].min())
  1238. }
  1239. }
  1240. return spatial_stats
  1241. except Exception as e:
  1242. self.logger.error(f"计算空间统计信息失败: {str(e)}")
  1243. return {}