cd_prediction_service.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872
  1. """
  2. Cd预测服务类
  3. @description: 封装Cd预测模型的业务逻辑,提供作物Cd和有效态Cd的预测功能
  4. @author: AcidMap Team
  5. @version: 1.0.0
  6. """
  7. import os
  8. import logging
  9. import asyncio
  10. from datetime import datetime
  11. from typing import Dict, Any, Optional, List
  12. import glob
  13. import shutil
  14. import zipfile
  15. import tempfile
  16. import pandas as pd
  17. import io
  18. from ..config.cd_prediction_config import cd_config
  19. from ..utils.cd_prediction_wrapper import CdPredictionWrapper
  20. class CdPredictionService:
  21. """
  22. Cd预测服务类
  23. @description: 提供作物Cd和有效态Cd模型的预测与可视化功能
  24. @example
  25. >>> service = CdPredictionService()
  26. >>> result = await service.generate_crop_cd_prediction()
  27. """
  28. def __init__(self):
  29. """
  30. 初始化Cd预测服务
  31. @description: 设置Cd预测系统的路径和配置
  32. """
  33. # 设置日志
  34. self.logger = logging.getLogger(__name__)
  35. # 获取配置
  36. self.config = cd_config
  37. # 初始化包装器
  38. cd_system_path = self.config.get_cd_system_path()
  39. self.wrapper = CdPredictionWrapper(cd_system_path)
  40. # 输出目录
  41. self.output_figures_dir = self.config.get_output_dir("figures")
  42. self.output_raster_dir = self.config.get_output_dir("raster")
  43. self.output_data_dir = self.config.get_output_dir("data")
  44. # 支持的县市配置
  45. self.supported_counties = self._load_supported_counties()
  46. self.logger.info("Cd预测服务初始化完成")
  47. def _load_supported_counties(self) -> Dict[str, Dict]:
  48. """
  49. 加载支持的县市配置
  50. @returns {Dict[str, Dict]} 支持的县市配置信息
  51. """
  52. # 获取Cd预测系统的基础路径
  53. cd_system_base = self.config.get_cd_system_path()
  54. return {
  55. "乐昌市": {
  56. "boundary_file": os.path.join(cd_system_base, "output/raster/lechang.shp"),
  57. "template_file": os.path.join(cd_system_base, "output/raster/meanTemp.tif"),
  58. "coordinate_file": os.path.join(cd_system_base, "data/coordinates/坐标.csv"),
  59. "region_code": "440282",
  60. "display_name": "乐昌市",
  61. "province": "广东省"
  62. },
  63. # 可扩展添加更多县市
  64. # "韶关市": {
  65. # "boundary_file": os.path.join(cd_system_base, "output/raster/shaoguan.shp"),
  66. # "template_file": os.path.join(cd_system_base, "output/raster/shaoguan_template.tif"),
  67. # "coordinate_file": os.path.join(cd_system_base, "data/coordinates/韶关_坐标.csv"),
  68. # "region_code": "440200",
  69. # "display_name": "韶关市",
  70. # "province": "广东省"
  71. # }
  72. }
  73. def is_county_supported(self, county_name: str) -> bool:
  74. """
  75. 检查县市是否被支持
  76. @param {str} county_name - 县市名称
  77. @returns {bool} 是否支持该县市
  78. """
  79. return county_name in self.supported_counties
  80. def get_supported_counties(self) -> List[str]:
  81. """
  82. 获取支持的县市名称列表
  83. @returns {List[str]} 支持的县市名称列表
  84. """
  85. return list(self.supported_counties.keys())
  86. def get_supported_counties_info(self) -> List[Dict[str, Any]]:
  87. """
  88. 获取支持的县市详细信息
  89. @returns {List[Dict[str, Any]]} 支持的县市详细信息列表
  90. """
  91. counties_info = []
  92. for county_name, config in self.supported_counties.items():
  93. counties_info.append({
  94. "name": county_name,
  95. "display_name": config.get("display_name", county_name),
  96. "province": config.get("province", ""),
  97. "region_code": config.get("region_code", ""),
  98. "has_boundary": os.path.exists(config.get("boundary_file", "")),
  99. "has_template": os.path.exists(config.get("template_file", "")),
  100. "has_coordinates": os.path.exists(config.get("coordinate_file", ""))
  101. })
  102. return counties_info
  103. def validate_input_data(self, df: pd.DataFrame, county_name: str) -> Dict[str, Any]:
  104. """
  105. 验证输入数据格式
  106. @param {pd.DataFrame} df - 输入数据
  107. @param {str} county_name - 县市名称
  108. @returns {Dict[str, Any]} 验证结果
  109. """
  110. # 基本要求:前两列为坐标,至少需要3列数据
  111. if df.shape[1] < 3:
  112. return {
  113. "valid": False,
  114. "errors": ["数据至少需要3列:前两列为经纬度,后续列为环境因子"],
  115. "warnings": [],
  116. "data_shape": df.shape,
  117. "county_supported": self.is_county_supported(county_name),
  118. "null_summary": {}
  119. }
  120. # 坐标列验证(假设前两列是经纬度)
  121. coordinate_issues = []
  122. try:
  123. # 检查前两列是否为数值型
  124. if not pd.api.types.is_numeric_dtype(df.iloc[:, 0]):
  125. coordinate_issues.append("第一列(经度)不是数值型数据")
  126. elif not df.iloc[:, 0].between(70, 140).all():
  127. coordinate_issues.append("经度值超出合理范围(70-140度)")
  128. if not pd.api.types.is_numeric_dtype(df.iloc[:, 1]):
  129. coordinate_issues.append("第二列(纬度)不是数值型数据")
  130. elif not df.iloc[:, 1].between(15, 55).all():
  131. coordinate_issues.append("纬度值超出合理范围(15-55度)")
  132. except Exception as e:
  133. coordinate_issues.append(f"坐标数据验证失败: {str(e)}")
  134. # 检查数据完整性
  135. null_counts = df.isnull().sum()
  136. high_null_columns = [col for col, count in null_counts.items()
  137. if count > len(df) * 0.1] # 空值超过10%的列
  138. # 检查环境因子列是否为数值型
  139. non_numeric_columns = []
  140. for i in range(2, df.shape[1]): # 从第三列开始检查
  141. col_name = df.columns[i]
  142. if not pd.api.types.is_numeric_dtype(df.iloc[:, i]):
  143. non_numeric_columns.append(col_name)
  144. warnings = []
  145. if high_null_columns:
  146. warnings.append(f"以下列空值较多: {', '.join(high_null_columns)}")
  147. if coordinate_issues:
  148. warnings.extend(coordinate_issues)
  149. if non_numeric_columns:
  150. warnings.append(f"以下列不是数值型: {', '.join(non_numeric_columns)}")
  151. # 如果有严重的坐标问题,标记为无效
  152. critical_errors = []
  153. if any("不是数值型数据" in issue for issue in coordinate_issues):
  154. critical_errors.extend([issue for issue in coordinate_issues if "不是数值型数据" in issue])
  155. return {
  156. "valid": len(critical_errors) == 0,
  157. "errors": critical_errors,
  158. "warnings": warnings,
  159. "data_shape": df.shape,
  160. "county_supported": self.is_county_supported(county_name),
  161. "null_summary": null_counts.to_dict()
  162. }
  163. def save_uploaded_data(self, df: pd.DataFrame, county_name: str,
  164. description: Optional[str] = None) -> str:
  165. """
  166. 保存上传的数据文件
  167. @param {pd.DataFrame} df - 数据
  168. @param {str} county_name - 县市名称
  169. @param {Optional[str]} description - 数据描述
  170. @returns {str} 保存的文件路径
  171. """
  172. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  173. filename = f"{county_name}_uploaded_data_{timestamp}.csv"
  174. file_path = os.path.join(self.output_data_dir, "uploaded", filename)
  175. # 确保目录存在
  176. os.makedirs(os.path.dirname(file_path), exist_ok=True)
  177. # 保存数据
  178. df.to_csv(file_path, index=False, encoding='utf-8-sig')
  179. # 保存元信息
  180. meta_info = {
  181. "county_name": county_name,
  182. "description": description,
  183. "upload_time": datetime.now().isoformat(),
  184. "data_shape": df.shape,
  185. "columns": df.columns.tolist()
  186. }
  187. meta_path = file_path.replace('.csv', '_meta.json')
  188. import json
  189. with open(meta_path, 'w', encoding='utf-8') as f:
  190. json.dump(meta_info, f, ensure_ascii=False, indent=2)
  191. self.logger.info(f"数据文件已保存: {file_path}")
  192. return file_path
  193. def save_temp_data(self, df: pd.DataFrame, county_name: str) -> str:
  194. """
  195. 保存临时数据文件
  196. @param {pd.DataFrame} df - 数据
  197. @param {str} county_name - 县市名称
  198. @returns {str} 临时文件路径
  199. """
  200. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  201. filename = f"{county_name}_temp_data_{timestamp}.csv"
  202. file_path = os.path.join(self.output_data_dir, "temp", filename)
  203. # 确保目录存在
  204. os.makedirs(os.path.dirname(file_path), exist_ok=True)
  205. # 保存数据
  206. df.to_csv(file_path, index=False, encoding='utf-8-sig')
  207. self.logger.info(f"临时数据文件已保存: {file_path}")
  208. return file_path
  209. async def add_county_support(self, county_name: str, boundary_file,
  210. coordinate_file) -> Dict[str, Any]:
  211. """
  212. 添加新县市支持
  213. @param {str} county_name - 县市名称
  214. @param boundary_file - 边界文件(Shapefile压缩包)
  215. @param coordinate_file - 坐标文件
  216. @returns {Dict[str, Any]} 添加结果
  217. """
  218. try:
  219. # 创建县市专用目录
  220. county_dir = os.path.join(self.output_data_dir, "counties", county_name)
  221. os.makedirs(county_dir, exist_ok=True)
  222. # 处理边界文件
  223. if boundary_file.filename.endswith('.zip'):
  224. boundary_content = await boundary_file.read()
  225. boundary_zip_path = os.path.join(county_dir, "boundary.zip")
  226. with open(boundary_zip_path, 'wb') as f:
  227. f.write(boundary_content)
  228. # 解压Shapefile
  229. with zipfile.ZipFile(boundary_zip_path, 'r') as zip_ref:
  230. zip_ref.extractall(os.path.join(county_dir, "boundary"))
  231. # 处理坐标文件
  232. coordinate_content = await coordinate_file.read()
  233. coordinate_path = os.path.join(county_dir, "coordinates.csv")
  234. df_coords = pd.read_csv(io.StringIO(coordinate_content.decode('utf-8')))
  235. df_coords.to_csv(coordinate_path, index=False, encoding='utf-8-sig')
  236. # 更新支持的县市配置
  237. self.supported_counties[county_name] = {
  238. "boundary_file": os.path.join(county_dir, "boundary"),
  239. "coordinate_file": coordinate_path,
  240. "template_file": "", # 需要后续生成
  241. "region_code": "",
  242. "display_name": county_name,
  243. "province": ""
  244. }
  245. return {
  246. "county_name": county_name,
  247. "boundary_path": os.path.join(county_dir, "boundary"),
  248. "coordinate_path": coordinate_path,
  249. "status": "success"
  250. }
  251. except Exception as e:
  252. self.logger.error(f"添加县市支持失败: {str(e)}")
  253. raise
  254. async def generate_crop_cd_prediction_for_county(
  255. self,
  256. county_name: str,
  257. data_file: Optional[str] = None
  258. ) -> Dict[str, Any]:
  259. """
  260. 为指定县市生成作物Cd预测
  261. @param {str} county_name - 县市名称
  262. @param {Optional[str]} data_file - 可选的数据文件路径
  263. @returns {Dict[str, Any]} 预测结果信息
  264. """
  265. if not self.is_county_supported(county_name):
  266. raise ValueError(f"不支持的县市: {county_name}")
  267. try:
  268. # 获取县市配置
  269. county_config = self.supported_counties[county_name]
  270. # 如果提供了自定义数据文件,使用它替换默认数据
  271. if data_file:
  272. # 准备作物Cd模型的自定义数据
  273. self._prepare_crop_cd_custom_data(data_file, county_name)
  274. # 在线程池中运行CPU密集型任务
  275. loop = asyncio.get_event_loop()
  276. result = await loop.run_in_executor(
  277. None,
  278. self._run_crop_cd_prediction_with_county,
  279. county_name, county_config
  280. )
  281. return result
  282. except Exception as e:
  283. self.logger.error(f"为{county_name}生成作物Cd预测失败: {str(e)}")
  284. raise
  285. async def generate_effective_cd_prediction_for_county(
  286. self,
  287. county_name: str,
  288. data_file: Optional[str] = None
  289. ) -> Dict[str, Any]:
  290. """
  291. 为指定县市生成有效态Cd预测
  292. @param {str} county_name - 县市名称
  293. @param {Optional[str]} data_file - 可选的数据文件路径
  294. @returns {Dict[str, Any]} 预测结果信息
  295. """
  296. if not self.is_county_supported(county_name):
  297. raise ValueError(f"不支持的县市: {county_name}")
  298. try:
  299. # 获取县市配置
  300. county_config = self.supported_counties[county_name]
  301. # 如果提供了自定义数据文件,使用它替换默认数据
  302. if data_file:
  303. # 准备有效态Cd模型的自定义数据
  304. self._prepare_effective_cd_custom_data(data_file, county_name)
  305. # 在线程池中运行CPU密集型任务
  306. loop = asyncio.get_event_loop()
  307. result = await loop.run_in_executor(
  308. None,
  309. self._run_effective_cd_prediction_with_county,
  310. county_name, county_config
  311. )
  312. return result
  313. except Exception as e:
  314. self.logger.error(f"为{county_name}生成有效态Cd预测失败: {str(e)}")
  315. raise
  316. def _prepare_crop_cd_custom_data(self, data_file: str, county_name: str):
  317. """
  318. 准备作物Cd模型的自定义数据文件
  319. @param {str} data_file - 数据文件路径
  320. @param {str} county_name - 县市名称
  321. """
  322. try:
  323. import pandas as pd
  324. # 读取用户上传的CSV文件
  325. df = pd.read_csv(data_file, encoding='utf-8')
  326. # 获取Cd预测系统的数据目录
  327. cd_system_path = self.config.get_cd_system_path()
  328. # 1. 提取坐标信息并保存为独立的坐标文件
  329. coordinates_df = pd.DataFrame({
  330. 'longitude': df.iloc[:, 0], # 第一列为经度
  331. 'latitude': df.iloc[:, 1] # 第二列为纬度
  332. })
  333. # 保存坐标文件到系统数据目录
  334. coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
  335. os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
  336. coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
  337. self.logger.info(f"坐标文件已保存: {coord_file_path}")
  338. # 2. 准备作物Cd模型的训练数据
  339. crop_cd_data_dir = os.path.join(cd_system_path, "models", "crop_cd_model", "data")
  340. crop_target_file = os.path.join(crop_cd_data_dir, "areatest.csv")
  341. # 备份原始文件
  342. if os.path.exists(crop_target_file):
  343. backup_file = f"{crop_target_file}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
  344. shutil.copy2(crop_target_file, backup_file)
  345. self.logger.info(f"作物Cd模型原始数据已备份: {backup_file}")
  346. # 提取环境因子数据(去掉前两列的经纬度)
  347. environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
  348. # 保存环境因子数据到作物Cd模型目录(不包含坐标)
  349. environmental_data.to_csv(crop_target_file, index=False, encoding='utf-8-sig')
  350. self.logger.info(f"作物Cd模型数据文件已保存: {crop_target_file}, 数据形状: {environmental_data.shape}")
  351. self.logger.info(f"作物Cd模型自定义数据文件已准备完成,县市: {county_name}")
  352. except Exception as e:
  353. self.logger.error(f"准备作物Cd模型自定义数据文件失败: {str(e)}")
  354. raise
  355. def _prepare_effective_cd_custom_data(self, data_file: str, county_name: str):
  356. """
  357. 准备有效态Cd模型的自定义数据文件
  358. @param {str} data_file - 数据文件路径
  359. @param {str} county_name - 县市名称
  360. """
  361. try:
  362. import pandas as pd
  363. # 读取用户上传的CSV文件
  364. df = pd.read_csv(data_file, encoding='utf-8')
  365. # 获取Cd预测系统的数据目录
  366. cd_system_path = self.config.get_cd_system_path()
  367. # 1. 提取坐标信息并保存为独立的坐标文件
  368. coordinates_df = pd.DataFrame({
  369. 'longitude': df.iloc[:, 0], # 第一列为经度
  370. 'latitude': df.iloc[:, 1] # 第二列为纬度
  371. })
  372. # 保存坐标文件到系统数据目录
  373. coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
  374. os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
  375. coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
  376. self.logger.info(f"坐标文件已保存: {coord_file_path}")
  377. # 2. 准备有效态Cd模型的训练数据
  378. effective_cd_data_dir = os.path.join(cd_system_path, "models", "effective_cd_model", "data")
  379. effective_target_file = os.path.join(effective_cd_data_dir, "areatest.csv")
  380. # 备份原始文件
  381. if os.path.exists(effective_target_file):
  382. backup_file = f"{effective_target_file}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
  383. shutil.copy2(effective_target_file, backup_file)
  384. self.logger.info(f"有效态Cd模型原始数据已备份: {backup_file}")
  385. # 检查用户数据是否包含足够的环境因子列
  386. user_env_columns = df.shape[1] - 2 # 减去经纬度列
  387. if user_env_columns >= 21:
  388. # 用户数据包含足够的环境因子列,使用用户数据
  389. # 提取环境因子数据(去掉前两列的经纬度)
  390. user_environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
  391. # 如果用户数据列数超过21列,只取前21列
  392. if user_environmental_data.shape[1] > 21:
  393. user_environmental_data = user_environmental_data.iloc[:, :21]
  394. self.logger.info(f"用户数据有{df.shape[1] - 2}列环境因子,取前21列用于有效态Cd模型")
  395. # 保存用户的环境因子数据到有效态Cd模型目录
  396. user_environmental_data.to_csv(effective_target_file, index=False, encoding='utf-8-sig')
  397. self.logger.info(f"有效态Cd模型使用用户数据,形状: {user_environmental_data.shape}")
  398. else:
  399. # 用户数据环境因子列数不足,恢复使用原始数据
  400. if os.path.exists(backup_file):
  401. shutil.copy2(backup_file, effective_target_file)
  402. self.logger.info(f"用户数据环境因子列数不足({user_env_columns} < 21),有效态Cd模型恢复使用原始21列数据")
  403. else:
  404. self.logger.warning(f"用户数据环境因子列数不足且未找到备份文件,继续使用当前数据")
  405. self.logger.info(f"有效态Cd模型自定义数据文件已准备完成,县市: {county_name}")
  406. except Exception as e:
  407. self.logger.error(f"准备有效态Cd模型自定义数据文件失败: {str(e)}")
  408. raise
  409. def _run_crop_cd_prediction_with_county(self, county_name: str,
  410. county_config: Dict[str, Any]) -> Dict[str, Any]:
  411. """
  412. 执行指定县市的作物Cd预测
  413. @param {str} county_name - 县市名称
  414. @param {Dict[str, Any]} county_config - 县市配置
  415. @returns {Dict[str, Any]} 预测结果信息
  416. """
  417. try:
  418. # 运行作物Cd预测
  419. self.logger.info(f"为{county_name}执行作物Cd预测")
  420. prediction_result = self.wrapper.run_prediction_script("crop")
  421. # 获取输出文件
  422. latest_outputs = self.wrapper.get_latest_outputs("all")
  423. # 复制文件到API输出目录
  424. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  425. model_type = f"crop_cd_{county_name}"
  426. copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
  427. # 清理旧文件
  428. self._cleanup_old_files(model_type)
  429. return {
  430. 'map_path': copied_files.get('map_path'),
  431. 'histogram_path': copied_files.get('histogram_path'),
  432. 'raster_path': copied_files.get('raster_path'),
  433. 'model_type': model_type,
  434. 'county_name': county_name,
  435. 'timestamp': timestamp,
  436. 'stats': self._get_file_stats(copied_files.get('map_path'))
  437. }
  438. except Exception as e:
  439. self.logger.error(f"为{county_name}执行作物Cd预测失败: {str(e)}")
  440. raise
  441. def _run_effective_cd_prediction_with_county(self, county_name: str,
  442. county_config: Dict[str, Any]) -> Dict[str, Any]:
  443. """
  444. 执行指定县市的有效态Cd预测
  445. @param {str} county_name - 县市名称
  446. @param {Dict[str, Any]} county_config - 县市配置
  447. @returns {Dict[str, Any]} 预测结果信息
  448. """
  449. try:
  450. # 运行有效态Cd预测
  451. self.logger.info(f"为{county_name}执行有效态Cd预测")
  452. prediction_result = self.wrapper.run_prediction_script("effective")
  453. # 获取输出文件
  454. latest_outputs = self.wrapper.get_latest_outputs("all")
  455. # 复制文件到API输出目录
  456. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  457. model_type = f"effective_cd_{county_name}"
  458. copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
  459. # 清理旧文件
  460. self._cleanup_old_files(model_type)
  461. return {
  462. 'map_path': copied_files.get('map_path'),
  463. 'histogram_path': copied_files.get('histogram_path'),
  464. 'raster_path': copied_files.get('raster_path'),
  465. 'model_type': model_type,
  466. 'county_name': county_name,
  467. 'timestamp': timestamp,
  468. 'stats': self._get_file_stats(copied_files.get('map_path'))
  469. }
  470. except Exception as e:
  471. self.logger.error(f"为{county_name}执行有效态Cd预测失败: {str(e)}")
  472. raise
  473. async def generate_crop_cd_prediction(self) -> Dict[str, Any]:
  474. """
  475. 生成作物Cd预测结果和可视化
  476. @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
  477. @throws {Exception} 当预测过程发生错误时抛出
  478. @example
  479. >>> service = CdPredictionService()
  480. >>> result = await service.generate_crop_cd_prediction()
  481. >>> print(result['map_path'])
  482. """
  483. try:
  484. self.logger.info("开始作物Cd模型预测流程")
  485. # 在线程池中运行CPU密集型任务
  486. loop = asyncio.get_event_loop()
  487. result = await loop.run_in_executor(
  488. None,
  489. self._run_crop_cd_prediction
  490. )
  491. return result
  492. except Exception as e:
  493. self.logger.error(f"作物Cd预测流程失败: {str(e)}")
  494. raise
  495. async def generate_effective_cd_prediction(self) -> Dict[str, Any]:
  496. """
  497. 生成有效态Cd预测结果和可视化
  498. @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
  499. @throws {Exception} 当预测过程发生错误时抛出
  500. @example
  501. >>> service = CdPredictionService()
  502. >>> result = await service.generate_effective_cd_prediction()
  503. >>> print(result['map_path'])
  504. """
  505. try:
  506. self.logger.info("开始有效态Cd模型预测流程")
  507. # 在线程池中运行CPU密集型任务
  508. loop = asyncio.get_event_loop()
  509. result = await loop.run_in_executor(
  510. None,
  511. self._run_effective_cd_prediction
  512. )
  513. return result
  514. except Exception as e:
  515. self.logger.error(f"有效态Cd预测流程失败: {str(e)}")
  516. raise
  517. def _run_crop_cd_prediction(self) -> Dict[str, Any]:
  518. """
  519. 执行作物Cd预测的同步逻辑
  520. @returns {Dict[str, Any]} 预测结果信息
  521. @throws {Exception} 当预测过程发生错误时抛出
  522. """
  523. try:
  524. # 运行作物Cd预测
  525. self.logger.info("执行作物Cd预测")
  526. prediction_result = self.wrapper.run_prediction_script("crop")
  527. # 获取输出文件
  528. latest_outputs = self.wrapper.get_latest_outputs("all")
  529. # 复制文件到API输出目录
  530. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  531. copied_files = self._copy_output_files(latest_outputs, "crop_cd", timestamp)
  532. # 清理旧文件
  533. self._cleanup_old_files("crop_cd")
  534. return {
  535. 'map_path': copied_files.get('map_path'),
  536. 'histogram_path': copied_files.get('histogram_path'),
  537. 'raster_path': copied_files.get('raster_path'),
  538. 'model_type': 'crop_cd',
  539. 'timestamp': timestamp,
  540. 'stats': self._get_file_stats(copied_files.get('map_path'))
  541. }
  542. except Exception as e:
  543. self.logger.error(f"作物Cd预测执行失败: {str(e)}")
  544. raise
  545. def _run_effective_cd_prediction(self) -> Dict[str, Any]:
  546. """
  547. 执行有效态Cd预测的同步逻辑
  548. @returns {Dict[str, Any]} 预测结果信息
  549. @throws {Exception} 当预测过程发生错误时抛出
  550. """
  551. try:
  552. # 运行有效态Cd预测
  553. self.logger.info("执行有效态Cd预测")
  554. prediction_result = self.wrapper.run_prediction_script("effective")
  555. # 获取输出文件
  556. latest_outputs = self.wrapper.get_latest_outputs("all")
  557. # 复制文件到API输出目录
  558. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  559. copied_files = self._copy_output_files(latest_outputs, "effective_cd", timestamp)
  560. # 清理旧文件
  561. self._cleanup_old_files("effective_cd")
  562. return {
  563. 'map_path': copied_files.get('map_path'),
  564. 'histogram_path': copied_files.get('histogram_path'),
  565. 'raster_path': copied_files.get('raster_path'),
  566. 'model_type': 'effective_cd',
  567. 'timestamp': timestamp,
  568. 'stats': self._get_file_stats(copied_files.get('map_path'))
  569. }
  570. except Exception as e:
  571. self.logger.error(f"有效态Cd预测执行失败: {str(e)}")
  572. raise
  573. def _copy_output_files(self, latest_outputs: Dict[str, Optional[str]],
  574. model_type: str, timestamp: str) -> Dict[str, Optional[str]]:
  575. """
  576. 复制输出文件到API目录
  577. @param {Dict[str, Optional[str]]} latest_outputs - 最新输出文件路径
  578. @param {str} model_type - 模型类型
  579. @param {str} timestamp - 时间戳
  580. @returns {Dict[str, Optional[str]]} 复制后的文件路径
  581. """
  582. copied_files = {}
  583. try:
  584. # 复制地图文件
  585. if latest_outputs.get('latest_map'):
  586. src_map = latest_outputs['latest_map']
  587. dst_map = os.path.join(
  588. self.output_figures_dir,
  589. f"{model_type}_prediction_map_{timestamp}.jpg"
  590. )
  591. shutil.copy2(src_map, dst_map)
  592. copied_files['map_path'] = dst_map
  593. self.logger.info(f"地图文件已复制到: {dst_map}")
  594. # 复制直方图文件
  595. if latest_outputs.get('latest_histogram'):
  596. src_histogram = latest_outputs['latest_histogram']
  597. dst_histogram = os.path.join(
  598. self.output_figures_dir,
  599. f"{model_type}_prediction_histogram_{timestamp}.jpg"
  600. )
  601. shutil.copy2(src_histogram, dst_histogram)
  602. copied_files['histogram_path'] = dst_histogram
  603. self.logger.info(f"直方图文件已复制到: {dst_histogram}")
  604. # 复制栅格文件
  605. if latest_outputs.get('latest_raster'):
  606. src_raster = latest_outputs['latest_raster']
  607. dst_raster = os.path.join(
  608. self.output_raster_dir,
  609. f"{model_type}_prediction_raster_{timestamp}.tif"
  610. )
  611. shutil.copy2(src_raster, dst_raster)
  612. copied_files['raster_path'] = dst_raster
  613. self.logger.info(f"栅格文件已复制到: {dst_raster}")
  614. except Exception as e:
  615. self.logger.error(f"复制输出文件失败: {str(e)}")
  616. return copied_files
  617. def _cleanup_old_files(self, model_type: str):
  618. """
  619. 清理旧的预测文件
  620. @param {str} model_type - 模型类型
  621. """
  622. try:
  623. max_files = self.config.get_api_config().get("max_prediction_files", 10)
  624. # 清理地图文件
  625. map_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_map_*.jpg")
  626. self._cleanup_files_by_pattern(map_pattern, max_files)
  627. # 清理直方图文件
  628. histogram_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_histogram_*.jpg")
  629. self._cleanup_files_by_pattern(histogram_pattern, max_files)
  630. # 清理栅格文件
  631. raster_pattern = os.path.join(self.output_raster_dir, f"{model_type}_prediction_raster_*.tif")
  632. self._cleanup_files_by_pattern(raster_pattern, max_files)
  633. except Exception as e:
  634. self.logger.warning(f"清理旧文件失败: {str(e)}")
  635. def _cleanup_files_by_pattern(self, pattern: str, max_files: int):
  636. """
  637. 按模式清理文件
  638. @param {str} pattern - 文件模式
  639. @param {int} max_files - 最大保留文件数
  640. """
  641. try:
  642. files = glob.glob(pattern)
  643. if len(files) > max_files:
  644. # 按修改时间排序,删除最旧的文件
  645. files.sort(key=os.path.getmtime)
  646. for file_to_delete in files[:-max_files]:
  647. os.remove(file_to_delete)
  648. self.logger.info(f"已删除旧文件: {file_to_delete}")
  649. except Exception as e:
  650. self.logger.warning(f"清理文件失败 {pattern}: {str(e)}")
  651. def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]:
  652. """
  653. 获取文件统计信息
  654. @param {Optional[str]} file_path - 文件路径
  655. @returns {Dict[str, Any]} 文件统计信息
  656. """
  657. if not file_path or not os.path.exists(file_path):
  658. return {}
  659. try:
  660. stat = os.stat(file_path)
  661. return {
  662. 'file_size': stat.st_size,
  663. 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
  664. 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat()
  665. }
  666. except Exception:
  667. return {}
  668. def get_latest_crop_cd_map(self) -> Optional[str]:
  669. """
  670. 获取最新的作物Cd预测地图文件路径
  671. @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
  672. """
  673. try:
  674. pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_map_*.jpg")
  675. files = glob.glob(pattern)
  676. if files:
  677. return max(files, key=os.path.getctime)
  678. return None
  679. except Exception:
  680. return None
  681. def get_latest_effective_cd_map(self) -> Optional[str]:
  682. """
  683. 获取最新的有效态Cd预测地图文件路径
  684. @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
  685. """
  686. try:
  687. pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_map_*.jpg")
  688. files = glob.glob(pattern)
  689. if files:
  690. return max(files, key=os.path.getctime)
  691. return None
  692. except Exception:
  693. return None
  694. def get_latest_crop_cd_histogram(self) -> Optional[str]:
  695. """
  696. 获取最新的作物Cd预测直方图文件路径
  697. @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
  698. """
  699. try:
  700. pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_histogram_*.jpg")
  701. files = glob.glob(pattern)
  702. if files:
  703. return max(files, key=os.path.getctime)
  704. return None
  705. except Exception:
  706. return None
  707. def get_latest_effective_cd_histogram(self) -> Optional[str]:
  708. """
  709. 获取最新的有效态Cd预测直方图文件路径
  710. @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
  711. """
  712. try:
  713. pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_histogram_*.jpg")
  714. files = glob.glob(pattern)
  715. if files:
  716. return max(files, key=os.path.getctime)
  717. return None
  718. except Exception:
  719. return None