cd_prediction_service.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. """
  2. Cd预测服务类
  3. @description: 封装Cd预测模型的业务逻辑,提供作物Cd和有效态Cd的预测功能
  4. @author: AcidMap Team
  5. @version: 1.0.0
  6. """
  7. import os
  8. import logging
  9. import asyncio
  10. from datetime import datetime
  11. from typing import Dict, Any, Optional
  12. import glob
  13. import shutil
  14. from ..config.cd_prediction_config import cd_config
  15. from ..utils.cd_prediction_wrapper import CdPredictionWrapper
  16. class CdPredictionService:
  17. """
  18. Cd预测服务类
  19. @description: 提供作物Cd和有效态Cd模型的预测与可视化功能
  20. @example
  21. >>> service = CdPredictionService()
  22. >>> result = await service.generate_crop_cd_prediction()
  23. """
  24. def __init__(self):
  25. """
  26. 初始化Cd预测服务
  27. @description: 设置Cd预测系统的路径和配置
  28. """
  29. # 设置日志
  30. self.logger = logging.getLogger(__name__)
  31. # 获取配置
  32. self.config = cd_config
  33. # 初始化包装器
  34. cd_system_path = self.config.get_cd_system_path()
  35. self.wrapper = CdPredictionWrapper(cd_system_path)
  36. # 输出目录
  37. self.output_figures_dir = self.config.get_output_dir("figures")
  38. self.output_raster_dir = self.config.get_output_dir("raster")
  39. self.logger.info("Cd预测服务初始化完成")
  40. async def generate_crop_cd_prediction(self) -> Dict[str, Any]:
  41. """
  42. 生成作物Cd预测结果和可视化
  43. @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
  44. @throws {Exception} 当预测过程发生错误时抛出
  45. @example
  46. >>> service = CdPredictionService()
  47. >>> result = await service.generate_crop_cd_prediction()
  48. >>> print(result['map_path'])
  49. """
  50. try:
  51. self.logger.info("开始作物Cd模型预测流程")
  52. # 在线程池中运行CPU密集型任务
  53. loop = asyncio.get_event_loop()
  54. result = await loop.run_in_executor(
  55. None,
  56. self._run_crop_cd_prediction
  57. )
  58. return result
  59. except Exception as e:
  60. self.logger.error(f"作物Cd预测流程失败: {str(e)}")
  61. raise
  62. async def generate_effective_cd_prediction(self) -> Dict[str, Any]:
  63. """
  64. 生成有效态Cd预测结果和可视化
  65. @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
  66. @throws {Exception} 当预测过程发生错误时抛出
  67. @example
  68. >>> service = CdPredictionService()
  69. >>> result = await service.generate_effective_cd_prediction()
  70. >>> print(result['map_path'])
  71. """
  72. try:
  73. self.logger.info("开始有效态Cd模型预测流程")
  74. # 在线程池中运行CPU密集型任务
  75. loop = asyncio.get_event_loop()
  76. result = await loop.run_in_executor(
  77. None,
  78. self._run_effective_cd_prediction
  79. )
  80. return result
  81. except Exception as e:
  82. self.logger.error(f"有效态Cd预测流程失败: {str(e)}")
  83. raise
  84. def _run_crop_cd_prediction(self) -> Dict[str, Any]:
  85. """
  86. 执行作物Cd预测的同步逻辑
  87. @returns {Dict[str, Any]} 预测结果信息
  88. @throws {Exception} 当预测过程发生错误时抛出
  89. """
  90. try:
  91. # 运行作物Cd预测
  92. self.logger.info("执行作物Cd预测")
  93. prediction_result = self.wrapper.run_prediction_script("crop")
  94. # 获取输出文件
  95. latest_outputs = self.wrapper.get_latest_outputs("all")
  96. # 复制文件到API输出目录
  97. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  98. copied_files = self._copy_output_files(latest_outputs, "crop_cd", timestamp)
  99. # 清理旧文件
  100. self._cleanup_old_files("crop_cd")
  101. return {
  102. 'map_path': copied_files.get('map_path'),
  103. 'histogram_path': copied_files.get('histogram_path'),
  104. 'raster_path': copied_files.get('raster_path'),
  105. 'model_type': 'crop_cd',
  106. 'timestamp': timestamp,
  107. 'stats': self._get_file_stats(copied_files.get('map_path'))
  108. }
  109. except Exception as e:
  110. self.logger.error(f"作物Cd预测执行失败: {str(e)}")
  111. raise
  112. def _run_effective_cd_prediction(self) -> Dict[str, Any]:
  113. """
  114. 执行有效态Cd预测的同步逻辑
  115. @returns {Dict[str, Any]} 预测结果信息
  116. @throws {Exception} 当预测过程发生错误时抛出
  117. """
  118. try:
  119. # 运行有效态Cd预测
  120. self.logger.info("执行有效态Cd预测")
  121. prediction_result = self.wrapper.run_prediction_script("effective")
  122. # 获取输出文件
  123. latest_outputs = self.wrapper.get_latest_outputs("all")
  124. # 复制文件到API输出目录
  125. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  126. copied_files = self._copy_output_files(latest_outputs, "effective_cd", timestamp)
  127. # 清理旧文件
  128. self._cleanup_old_files("effective_cd")
  129. return {
  130. 'map_path': copied_files.get('map_path'),
  131. 'histogram_path': copied_files.get('histogram_path'),
  132. 'raster_path': copied_files.get('raster_path'),
  133. 'model_type': 'effective_cd',
  134. 'timestamp': timestamp,
  135. 'stats': self._get_file_stats(copied_files.get('map_path'))
  136. }
  137. except Exception as e:
  138. self.logger.error(f"有效态Cd预测执行失败: {str(e)}")
  139. raise
  140. def _copy_output_files(self, latest_outputs: Dict[str, Optional[str]],
  141. model_type: str, timestamp: str) -> Dict[str, Optional[str]]:
  142. """
  143. 复制输出文件到API目录
  144. @param {Dict[str, Optional[str]]} latest_outputs - 最新输出文件路径
  145. @param {str} model_type - 模型类型
  146. @param {str} timestamp - 时间戳
  147. @returns {Dict[str, Optional[str]]} 复制后的文件路径
  148. """
  149. copied_files = {}
  150. try:
  151. # 复制地图文件
  152. if latest_outputs.get('latest_map'):
  153. src_map = latest_outputs['latest_map']
  154. dst_map = os.path.join(
  155. self.output_figures_dir,
  156. f"{model_type}_prediction_map_{timestamp}.jpg"
  157. )
  158. shutil.copy2(src_map, dst_map)
  159. copied_files['map_path'] = dst_map
  160. self.logger.info(f"地图文件已复制到: {dst_map}")
  161. # 复制直方图文件
  162. if latest_outputs.get('latest_histogram'):
  163. src_histogram = latest_outputs['latest_histogram']
  164. dst_histogram = os.path.join(
  165. self.output_figures_dir,
  166. f"{model_type}_prediction_histogram_{timestamp}.jpg"
  167. )
  168. shutil.copy2(src_histogram, dst_histogram)
  169. copied_files['histogram_path'] = dst_histogram
  170. self.logger.info(f"直方图文件已复制到: {dst_histogram}")
  171. # 复制栅格文件
  172. if latest_outputs.get('latest_raster'):
  173. src_raster = latest_outputs['latest_raster']
  174. dst_raster = os.path.join(
  175. self.output_raster_dir,
  176. f"{model_type}_prediction_raster_{timestamp}.tif"
  177. )
  178. shutil.copy2(src_raster, dst_raster)
  179. copied_files['raster_path'] = dst_raster
  180. self.logger.info(f"栅格文件已复制到: {dst_raster}")
  181. except Exception as e:
  182. self.logger.error(f"复制输出文件失败: {str(e)}")
  183. return copied_files
  184. def _cleanup_old_files(self, model_type: str):
  185. """
  186. 清理旧的预测文件
  187. @param {str} model_type - 模型类型
  188. """
  189. try:
  190. max_files = self.config.get_api_config().get("max_prediction_files", 10)
  191. # 清理地图文件
  192. map_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_map_*.jpg")
  193. self._cleanup_files_by_pattern(map_pattern, max_files)
  194. # 清理直方图文件
  195. histogram_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_histogram_*.jpg")
  196. self._cleanup_files_by_pattern(histogram_pattern, max_files)
  197. # 清理栅格文件
  198. raster_pattern = os.path.join(self.output_raster_dir, f"{model_type}_prediction_raster_*.tif")
  199. self._cleanup_files_by_pattern(raster_pattern, max_files)
  200. except Exception as e:
  201. self.logger.warning(f"清理旧文件失败: {str(e)}")
  202. def _cleanup_files_by_pattern(self, pattern: str, max_files: int):
  203. """
  204. 按模式清理文件
  205. @param {str} pattern - 文件模式
  206. @param {int} max_files - 最大保留文件数
  207. """
  208. try:
  209. files = glob.glob(pattern)
  210. if len(files) > max_files:
  211. # 按修改时间排序,删除最旧的文件
  212. files.sort(key=os.path.getmtime)
  213. for file_to_delete in files[:-max_files]:
  214. os.remove(file_to_delete)
  215. self.logger.info(f"已删除旧文件: {file_to_delete}")
  216. except Exception as e:
  217. self.logger.warning(f"清理文件失败 {pattern}: {str(e)}")
  218. def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]:
  219. """
  220. 获取文件统计信息
  221. @param {Optional[str]} file_path - 文件路径
  222. @returns {Dict[str, Any]} 文件统计信息
  223. """
  224. if not file_path or not os.path.exists(file_path):
  225. return {}
  226. try:
  227. stat = os.stat(file_path)
  228. return {
  229. 'file_size': stat.st_size,
  230. 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
  231. 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat()
  232. }
  233. except Exception:
  234. return {}
  235. def get_latest_crop_cd_map(self) -> Optional[str]:
  236. """
  237. 获取最新的作物Cd预测地图文件路径
  238. @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
  239. """
  240. try:
  241. pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_map_*.jpg")
  242. files = glob.glob(pattern)
  243. if files:
  244. return max(files, key=os.path.getctime)
  245. return None
  246. except Exception:
  247. return None
  248. def get_latest_effective_cd_map(self) -> Optional[str]:
  249. """
  250. 获取最新的有效态Cd预测地图文件路径
  251. @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
  252. """
  253. try:
  254. pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_map_*.jpg")
  255. files = glob.glob(pattern)
  256. if files:
  257. return max(files, key=os.path.getctime)
  258. return None
  259. except Exception:
  260. return None
  261. def get_latest_crop_cd_histogram(self) -> Optional[str]:
  262. """
  263. 获取最新的作物Cd预测直方图文件路径
  264. @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
  265. """
  266. try:
  267. pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_histogram_*.jpg")
  268. files = glob.glob(pattern)
  269. if files:
  270. return max(files, key=os.path.getctime)
  271. return None
  272. except Exception:
  273. return None
  274. def get_latest_effective_cd_histogram(self) -> Optional[str]:
  275. """
  276. 获取最新的有效态Cd预测直方图文件路径
  277. @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
  278. """
  279. try:
  280. pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_histogram_*.jpg")
  281. files = glob.glob(pattern)
  282. if files:
  283. return max(files, key=os.path.getctime)
  284. return None
  285. except Exception:
  286. return None