123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- """
- Cd预测服务类
- @description: 封装Cd预测模型的业务逻辑,提供作物Cd和有效态Cd的预测功能
- @author: AcidMap Team
- @version: 1.0.0
- """
- import os
- import logging
- import asyncio
- from datetime import datetime
- from typing import Dict, Any, Optional
- import glob
- import shutil
- from ..config.cd_prediction_config import cd_config
- from ..utils.cd_prediction_wrapper import CdPredictionWrapper
- class CdPredictionService:
- """
- Cd预测服务类
-
- @description: 提供作物Cd和有效态Cd模型的预测与可视化功能
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_crop_cd_prediction()
- """
-
- def __init__(self):
- """
- 初始化Cd预测服务
-
- @description: 设置Cd预测系统的路径和配置
- """
- # 设置日志
- self.logger = logging.getLogger(__name__)
-
- # 获取配置
- self.config = cd_config
-
- # 初始化包装器
- cd_system_path = self.config.get_cd_system_path()
- self.wrapper = CdPredictionWrapper(cd_system_path)
-
- # 输出目录
- self.output_figures_dir = self.config.get_output_dir("figures")
- self.output_raster_dir = self.config.get_output_dir("raster")
-
- self.logger.info("Cd预测服务初始化完成")
-
- async def generate_crop_cd_prediction(self) -> Dict[str, Any]:
- """
- 生成作物Cd预测结果和可视化
-
- @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
- @throws {Exception} 当预测过程发生错误时抛出
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_crop_cd_prediction()
- >>> print(result['map_path'])
- """
- try:
- self.logger.info("开始作物Cd模型预测流程")
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_crop_cd_prediction
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"作物Cd预测流程失败: {str(e)}")
- raise
-
- async def generate_effective_cd_prediction(self) -> Dict[str, Any]:
- """
- 生成有效态Cd预测结果和可视化
-
- @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
- @throws {Exception} 当预测过程发生错误时抛出
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_effective_cd_prediction()
- >>> print(result['map_path'])
- """
- try:
- self.logger.info("开始有效态Cd模型预测流程")
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_effective_cd_prediction
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"有效态Cd预测流程失败: {str(e)}")
- raise
-
- def _run_crop_cd_prediction(self) -> Dict[str, Any]:
- """
- 执行作物Cd预测的同步逻辑
-
- @returns {Dict[str, Any]} 预测结果信息
- @throws {Exception} 当预测过程发生错误时抛出
- """
- try:
- # 运行作物Cd预测
- self.logger.info("执行作物Cd预测")
- prediction_result = self.wrapper.run_prediction_script("crop")
-
- # 获取输出文件
- latest_outputs = self.wrapper.get_latest_outputs("all")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- copied_files = self._copy_output_files(latest_outputs, "crop_cd", timestamp)
-
- # 清理旧文件
- self._cleanup_old_files("crop_cd")
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': 'crop_cd',
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"作物Cd预测执行失败: {str(e)}")
- raise
-
- def _run_effective_cd_prediction(self) -> Dict[str, Any]:
- """
- 执行有效态Cd预测的同步逻辑
-
- @returns {Dict[str, Any]} 预测结果信息
- @throws {Exception} 当预测过程发生错误时抛出
- """
- try:
- # 运行有效态Cd预测
- self.logger.info("执行有效态Cd预测")
- prediction_result = self.wrapper.run_prediction_script("effective")
-
- # 获取输出文件
- latest_outputs = self.wrapper.get_latest_outputs("all")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- copied_files = self._copy_output_files(latest_outputs, "effective_cd", timestamp)
-
- # 清理旧文件
- self._cleanup_old_files("effective_cd")
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': 'effective_cd',
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"有效态Cd预测执行失败: {str(e)}")
- raise
-
- def _copy_output_files(self, latest_outputs: Dict[str, Optional[str]],
- model_type: str, timestamp: str) -> Dict[str, Optional[str]]:
- """
- 复制输出文件到API目录
-
- @param {Dict[str, Optional[str]]} latest_outputs - 最新输出文件路径
- @param {str} model_type - 模型类型
- @param {str} timestamp - 时间戳
- @returns {Dict[str, Optional[str]]} 复制后的文件路径
- """
- copied_files = {}
-
- try:
- # 复制地图文件
- if latest_outputs.get('latest_map'):
- src_map = latest_outputs['latest_map']
- dst_map = os.path.join(
- self.output_figures_dir,
- f"{model_type}_prediction_map_{timestamp}.jpg"
- )
- shutil.copy2(src_map, dst_map)
- copied_files['map_path'] = dst_map
- self.logger.info(f"地图文件已复制到: {dst_map}")
-
- # 复制直方图文件
- if latest_outputs.get('latest_histogram'):
- src_histogram = latest_outputs['latest_histogram']
- dst_histogram = os.path.join(
- self.output_figures_dir,
- f"{model_type}_prediction_histogram_{timestamp}.jpg"
- )
- shutil.copy2(src_histogram, dst_histogram)
- copied_files['histogram_path'] = dst_histogram
- self.logger.info(f"直方图文件已复制到: {dst_histogram}")
-
- # 复制栅格文件
- if latest_outputs.get('latest_raster'):
- src_raster = latest_outputs['latest_raster']
- dst_raster = os.path.join(
- self.output_raster_dir,
- f"{model_type}_prediction_raster_{timestamp}.tif"
- )
- shutil.copy2(src_raster, dst_raster)
- copied_files['raster_path'] = dst_raster
- self.logger.info(f"栅格文件已复制到: {dst_raster}")
-
- except Exception as e:
- self.logger.error(f"复制输出文件失败: {str(e)}")
-
- return copied_files
-
- def _cleanup_old_files(self, model_type: str):
- """
- 清理旧的预测文件
-
- @param {str} model_type - 模型类型
- """
- try:
- max_files = self.config.get_api_config().get("max_prediction_files", 10)
-
- # 清理地图文件
- map_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_map_*.jpg")
- self._cleanup_files_by_pattern(map_pattern, max_files)
-
- # 清理直方图文件
- histogram_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_histogram_*.jpg")
- self._cleanup_files_by_pattern(histogram_pattern, max_files)
-
- # 清理栅格文件
- raster_pattern = os.path.join(self.output_raster_dir, f"{model_type}_prediction_raster_*.tif")
- self._cleanup_files_by_pattern(raster_pattern, max_files)
-
- except Exception as e:
- self.logger.warning(f"清理旧文件失败: {str(e)}")
-
- def _cleanup_files_by_pattern(self, pattern: str, max_files: int):
- """
- 按模式清理文件
-
- @param {str} pattern - 文件模式
- @param {int} max_files - 最大保留文件数
- """
- try:
- files = glob.glob(pattern)
- if len(files) > max_files:
- # 按修改时间排序,删除最旧的文件
- files.sort(key=os.path.getmtime)
- for file_to_delete in files[:-max_files]:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧文件: {file_to_delete}")
- except Exception as e:
- self.logger.warning(f"清理文件失败 {pattern}: {str(e)}")
-
- def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]:
- """
- 获取文件统计信息
-
- @param {Optional[str]} file_path - 文件路径
- @returns {Dict[str, Any]} 文件统计信息
- """
- if not file_path or not os.path.exists(file_path):
- return {}
-
- try:
- stat = os.stat(file_path)
- return {
- 'file_size': stat.st_size,
- 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
- 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat()
- }
- except Exception:
- return {}
-
- def get_latest_crop_cd_map(self) -> Optional[str]:
- """
- 获取最新的作物Cd预测地图文件路径
-
- @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_map_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_effective_cd_map(self) -> Optional[str]:
- """
- 获取最新的有效态Cd预测地图文件路径
-
- @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_map_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_crop_cd_histogram(self) -> Optional[str]:
- """
- 获取最新的作物Cd预测直方图文件路径
-
- @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_histogram_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_effective_cd_histogram(self) -> Optional[str]:
- """
- 获取最新的有效态Cd预测直方图文件路径
-
- @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_histogram_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
|