""" Cd预测服务类 @description: 封装Cd预测模型的业务逻辑,提供作物Cd和有效态Cd的预测功能 @author: AcidMap Team @version: 1.0.0 """ import os import logging import asyncio from datetime import datetime from typing import Dict, Any, Optional import glob import shutil from ..config.cd_prediction_config import cd_config from ..utils.cd_prediction_wrapper import CdPredictionWrapper class CdPredictionService: """ Cd预测服务类 @description: 提供作物Cd和有效态Cd模型的预测与可视化功能 @example >>> service = CdPredictionService() >>> result = await service.generate_crop_cd_prediction() """ def __init__(self): """ 初始化Cd预测服务 @description: 设置Cd预测系统的路径和配置 """ # 设置日志 self.logger = logging.getLogger(__name__) # 获取配置 self.config = cd_config # 初始化包装器 cd_system_path = self.config.get_cd_system_path() self.wrapper = CdPredictionWrapper(cd_system_path) # 输出目录 self.output_figures_dir = self.config.get_output_dir("figures") self.output_raster_dir = self.config.get_output_dir("raster") self.logger.info("Cd预测服务初始化完成") async def generate_crop_cd_prediction(self) -> Dict[str, Any]: """ 生成作物Cd预测结果和可视化 @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息 @throws {Exception} 当预测过程发生错误时抛出 @example >>> service = CdPredictionService() >>> result = await service.generate_crop_cd_prediction() >>> print(result['map_path']) """ try: self.logger.info("开始作物Cd模型预测流程") # 在线程池中运行CPU密集型任务 loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, self._run_crop_cd_prediction ) return result except Exception as e: self.logger.error(f"作物Cd预测流程失败: {str(e)}") raise async def generate_effective_cd_prediction(self) -> Dict[str, Any]: """ 生成有效态Cd预测结果和可视化 @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息 @throws {Exception} 当预测过程发生错误时抛出 @example >>> service = CdPredictionService() >>> result = await service.generate_effective_cd_prediction() >>> print(result['map_path']) """ try: self.logger.info("开始有效态Cd模型预测流程") # 在线程池中运行CPU密集型任务 loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, self._run_effective_cd_prediction ) return result except Exception as e: self.logger.error(f"有效态Cd预测流程失败: {str(e)}") raise def _run_crop_cd_prediction(self) -> Dict[str, Any]: """ 执行作物Cd预测的同步逻辑 @returns {Dict[str, Any]} 预测结果信息 @throws {Exception} 当预测过程发生错误时抛出 """ try: # 运行作物Cd预测 self.logger.info("执行作物Cd预测") prediction_result = self.wrapper.run_prediction_script("crop") # 获取输出文件 latest_outputs = self.wrapper.get_latest_outputs("all") # 复制文件到API输出目录 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') copied_files = self._copy_output_files(latest_outputs, "crop_cd", timestamp) # 清理旧文件 self._cleanup_old_files("crop_cd") return { 'map_path': copied_files.get('map_path'), 'histogram_path': copied_files.get('histogram_path'), 'raster_path': copied_files.get('raster_path'), 'model_type': 'crop_cd', 'timestamp': timestamp, 'stats': self._get_file_stats(copied_files.get('map_path')) } except Exception as e: self.logger.error(f"作物Cd预测执行失败: {str(e)}") raise def _run_effective_cd_prediction(self) -> Dict[str, Any]: """ 执行有效态Cd预测的同步逻辑 @returns {Dict[str, Any]} 预测结果信息 @throws {Exception} 当预测过程发生错误时抛出 """ try: # 运行有效态Cd预测 self.logger.info("执行有效态Cd预测") prediction_result = self.wrapper.run_prediction_script("effective") # 获取输出文件 latest_outputs = self.wrapper.get_latest_outputs("all") # 复制文件到API输出目录 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') copied_files = self._copy_output_files(latest_outputs, "effective_cd", timestamp) # 清理旧文件 self._cleanup_old_files("effective_cd") return { 'map_path': copied_files.get('map_path'), 'histogram_path': copied_files.get('histogram_path'), 'raster_path': copied_files.get('raster_path'), 'model_type': 'effective_cd', 'timestamp': timestamp, 'stats': self._get_file_stats(copied_files.get('map_path')) } except Exception as e: self.logger.error(f"有效态Cd预测执行失败: {str(e)}") raise def _copy_output_files(self, latest_outputs: Dict[str, Optional[str]], model_type: str, timestamp: str) -> Dict[str, Optional[str]]: """ 复制输出文件到API目录 @param {Dict[str, Optional[str]]} latest_outputs - 最新输出文件路径 @param {str} model_type - 模型类型 @param {str} timestamp - 时间戳 @returns {Dict[str, Optional[str]]} 复制后的文件路径 """ copied_files = {} try: # 复制地图文件 if latest_outputs.get('latest_map'): src_map = latest_outputs['latest_map'] dst_map = os.path.join( self.output_figures_dir, f"{model_type}_prediction_map_{timestamp}.jpg" ) shutil.copy2(src_map, dst_map) copied_files['map_path'] = dst_map self.logger.info(f"地图文件已复制到: {dst_map}") # 复制直方图文件 if latest_outputs.get('latest_histogram'): src_histogram = latest_outputs['latest_histogram'] dst_histogram = os.path.join( self.output_figures_dir, f"{model_type}_prediction_histogram_{timestamp}.jpg" ) shutil.copy2(src_histogram, dst_histogram) copied_files['histogram_path'] = dst_histogram self.logger.info(f"直方图文件已复制到: {dst_histogram}") # 复制栅格文件 if latest_outputs.get('latest_raster'): src_raster = latest_outputs['latest_raster'] dst_raster = os.path.join( self.output_raster_dir, f"{model_type}_prediction_raster_{timestamp}.tif" ) shutil.copy2(src_raster, dst_raster) copied_files['raster_path'] = dst_raster self.logger.info(f"栅格文件已复制到: {dst_raster}") except Exception as e: self.logger.error(f"复制输出文件失败: {str(e)}") return copied_files def _cleanup_old_files(self, model_type: str): """ 清理旧的预测文件 @param {str} model_type - 模型类型 """ try: max_files = self.config.get_api_config().get("max_prediction_files", 10) # 清理地图文件 map_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_map_*.jpg") self._cleanup_files_by_pattern(map_pattern, max_files) # 清理直方图文件 histogram_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_histogram_*.jpg") self._cleanup_files_by_pattern(histogram_pattern, max_files) # 清理栅格文件 raster_pattern = os.path.join(self.output_raster_dir, f"{model_type}_prediction_raster_*.tif") self._cleanup_files_by_pattern(raster_pattern, max_files) except Exception as e: self.logger.warning(f"清理旧文件失败: {str(e)}") def _cleanup_files_by_pattern(self, pattern: str, max_files: int): """ 按模式清理文件 @param {str} pattern - 文件模式 @param {int} max_files - 最大保留文件数 """ try: files = glob.glob(pattern) if len(files) > max_files: # 按修改时间排序,删除最旧的文件 files.sort(key=os.path.getmtime) for file_to_delete in files[:-max_files]: os.remove(file_to_delete) self.logger.info(f"已删除旧文件: {file_to_delete}") except Exception as e: self.logger.warning(f"清理文件失败 {pattern}: {str(e)}") def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]: """ 获取文件统计信息 @param {Optional[str]} file_path - 文件路径 @returns {Dict[str, Any]} 文件统计信息 """ if not file_path or not os.path.exists(file_path): return {} try: stat = os.stat(file_path) return { 'file_size': stat.st_size, 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat() } except Exception: return {} def get_latest_crop_cd_map(self) -> Optional[str]: """ 获取最新的作物Cd预测地图文件路径 @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None """ try: pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_map_*.jpg") files = glob.glob(pattern) if files: return max(files, key=os.path.getctime) return None except Exception: return None def get_latest_effective_cd_map(self) -> Optional[str]: """ 获取最新的有效态Cd预测地图文件路径 @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None """ try: pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_map_*.jpg") files = glob.glob(pattern) if files: return max(files, key=os.path.getctime) return None except Exception: return None def get_latest_crop_cd_histogram(self) -> Optional[str]: """ 获取最新的作物Cd预测直方图文件路径 @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None """ try: pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_histogram_*.jpg") files = glob.glob(pattern) if files: return max(files, key=os.path.getctime) return None except Exception: return None def get_latest_effective_cd_histogram(self) -> Optional[str]: """ 获取最新的有效态Cd预测直方图文件路径 @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None """ try: pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_histogram_*.jpg") files = glob.glob(pattern) if files: return max(files, key=os.path.getctime) return None except Exception: return None