123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872 |
- """
- Cd预测服务类
- @description: 封装Cd预测模型的业务逻辑,提供作物Cd和有效态Cd的预测功能
- @author: AcidMap Team
- @version: 1.0.0
- """
- import os
- import logging
- import asyncio
- from datetime import datetime
- from typing import Dict, Any, Optional, List
- import glob
- import shutil
- import zipfile
- import tempfile
- import pandas as pd
- import io
- from ..config.cd_prediction_config import cd_config
- from ..utils.cd_prediction_wrapper import CdPredictionWrapper
- class CdPredictionService:
- """
- Cd预测服务类
-
- @description: 提供作物Cd和有效态Cd模型的预测与可视化功能
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_crop_cd_prediction()
- """
-
- def __init__(self):
- """
- 初始化Cd预测服务
-
- @description: 设置Cd预测系统的路径和配置
- """
- # 设置日志
- self.logger = logging.getLogger(__name__)
-
- # 获取配置
- self.config = cd_config
-
- # 初始化包装器
- cd_system_path = self.config.get_cd_system_path()
- self.wrapper = CdPredictionWrapper(cd_system_path)
-
- # 输出目录
- self.output_figures_dir = self.config.get_output_dir("figures")
- self.output_raster_dir = self.config.get_output_dir("raster")
- self.output_data_dir = self.config.get_output_dir("data")
-
- # 支持的县市配置
- self.supported_counties = self._load_supported_counties()
-
- self.logger.info("Cd预测服务初始化完成")
-
- def _load_supported_counties(self) -> Dict[str, Dict]:
- """
- 加载支持的县市配置
-
- @returns {Dict[str, Dict]} 支持的县市配置信息
- """
- # 获取Cd预测系统的基础路径
- cd_system_base = self.config.get_cd_system_path()
-
- return {
- "乐昌市": {
- "boundary_file": os.path.join(cd_system_base, "output/raster/lechang.shp"),
- "template_file": os.path.join(cd_system_base, "output/raster/meanTemp.tif"),
- "coordinate_file": os.path.join(cd_system_base, "data/coordinates/坐标.csv"),
- "region_code": "440282",
- "display_name": "乐昌市",
- "province": "广东省"
- },
- # 可扩展添加更多县市
- # "韶关市": {
- # "boundary_file": os.path.join(cd_system_base, "output/raster/shaoguan.shp"),
- # "template_file": os.path.join(cd_system_base, "output/raster/shaoguan_template.tif"),
- # "coordinate_file": os.path.join(cd_system_base, "data/coordinates/韶关_坐标.csv"),
- # "region_code": "440200",
- # "display_name": "韶关市",
- # "province": "广东省"
- # }
- }
-
- def is_county_supported(self, county_name: str) -> bool:
- """
- 检查县市是否被支持
-
- @param {str} county_name - 县市名称
- @returns {bool} 是否支持该县市
- """
- return county_name in self.supported_counties
-
- def get_supported_counties(self) -> List[str]:
- """
- 获取支持的县市名称列表
-
- @returns {List[str]} 支持的县市名称列表
- """
- return list(self.supported_counties.keys())
-
- def get_supported_counties_info(self) -> List[Dict[str, Any]]:
- """
- 获取支持的县市详细信息
-
- @returns {List[Dict[str, Any]]} 支持的县市详细信息列表
- """
- counties_info = []
- for county_name, config in self.supported_counties.items():
- counties_info.append({
- "name": county_name,
- "display_name": config.get("display_name", county_name),
- "province": config.get("province", ""),
- "region_code": config.get("region_code", ""),
- "has_boundary": os.path.exists(config.get("boundary_file", "")),
- "has_template": os.path.exists(config.get("template_file", "")),
- "has_coordinates": os.path.exists(config.get("coordinate_file", ""))
- })
- return counties_info
-
- def validate_input_data(self, df: pd.DataFrame, county_name: str) -> Dict[str, Any]:
- """
- 验证输入数据格式
-
- @param {pd.DataFrame} df - 输入数据
- @param {str} county_name - 县市名称
- @returns {Dict[str, Any]} 验证结果
- """
- # 基本要求:前两列为坐标,至少需要3列数据
- if df.shape[1] < 3:
- return {
- "valid": False,
- "errors": ["数据至少需要3列:前两列为经纬度,后续列为环境因子"],
- "warnings": [],
- "data_shape": df.shape,
- "county_supported": self.is_county_supported(county_name),
- "null_summary": {}
- }
-
- # 坐标列验证(假设前两列是经纬度)
- coordinate_issues = []
- try:
- # 检查前两列是否为数值型
- if not pd.api.types.is_numeric_dtype(df.iloc[:, 0]):
- coordinate_issues.append("第一列(经度)不是数值型数据")
- elif not df.iloc[:, 0].between(70, 140).all():
- coordinate_issues.append("经度值超出合理范围(70-140度)")
-
- if not pd.api.types.is_numeric_dtype(df.iloc[:, 1]):
- coordinate_issues.append("第二列(纬度)不是数值型数据")
- elif not df.iloc[:, 1].between(15, 55).all():
- coordinate_issues.append("纬度值超出合理范围(15-55度)")
- except Exception as e:
- coordinate_issues.append(f"坐标数据验证失败: {str(e)}")
-
- # 检查数据完整性
- null_counts = df.isnull().sum()
- high_null_columns = [col for col, count in null_counts.items()
- if count > len(df) * 0.1] # 空值超过10%的列
-
- # 检查环境因子列是否为数值型
- non_numeric_columns = []
- for i in range(2, df.shape[1]): # 从第三列开始检查
- col_name = df.columns[i]
- if not pd.api.types.is_numeric_dtype(df.iloc[:, i]):
- non_numeric_columns.append(col_name)
-
- warnings = []
- if high_null_columns:
- warnings.append(f"以下列空值较多: {', '.join(high_null_columns)}")
- if coordinate_issues:
- warnings.extend(coordinate_issues)
- if non_numeric_columns:
- warnings.append(f"以下列不是数值型: {', '.join(non_numeric_columns)}")
-
- # 如果有严重的坐标问题,标记为无效
- critical_errors = []
- if any("不是数值型数据" in issue for issue in coordinate_issues):
- critical_errors.extend([issue for issue in coordinate_issues if "不是数值型数据" in issue])
-
- return {
- "valid": len(critical_errors) == 0,
- "errors": critical_errors,
- "warnings": warnings,
- "data_shape": df.shape,
- "county_supported": self.is_county_supported(county_name),
- "null_summary": null_counts.to_dict()
- }
-
- def save_uploaded_data(self, df: pd.DataFrame, county_name: str,
- description: Optional[str] = None) -> str:
- """
- 保存上传的数据文件
-
- @param {pd.DataFrame} df - 数据
- @param {str} county_name - 县市名称
- @param {Optional[str]} description - 数据描述
- @returns {str} 保存的文件路径
- """
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f"{county_name}_uploaded_data_{timestamp}.csv"
- file_path = os.path.join(self.output_data_dir, "uploaded", filename)
-
- # 确保目录存在
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
-
- # 保存数据
- df.to_csv(file_path, index=False, encoding='utf-8-sig')
-
- # 保存元信息
- meta_info = {
- "county_name": county_name,
- "description": description,
- "upload_time": datetime.now().isoformat(),
- "data_shape": df.shape,
- "columns": df.columns.tolist()
- }
-
- meta_path = file_path.replace('.csv', '_meta.json')
- import json
- with open(meta_path, 'w', encoding='utf-8') as f:
- json.dump(meta_info, f, ensure_ascii=False, indent=2)
-
- self.logger.info(f"数据文件已保存: {file_path}")
- return file_path
-
- def save_temp_data(self, df: pd.DataFrame, county_name: str) -> str:
- """
- 保存临时数据文件
-
- @param {pd.DataFrame} df - 数据
- @param {str} county_name - 县市名称
- @returns {str} 临时文件路径
- """
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f"{county_name}_temp_data_{timestamp}.csv"
- file_path = os.path.join(self.output_data_dir, "temp", filename)
-
- # 确保目录存在
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
-
- # 保存数据
- df.to_csv(file_path, index=False, encoding='utf-8-sig')
-
- self.logger.info(f"临时数据文件已保存: {file_path}")
- return file_path
-
- async def add_county_support(self, county_name: str, boundary_file,
- coordinate_file) -> Dict[str, Any]:
- """
- 添加新县市支持
-
- @param {str} county_name - 县市名称
- @param boundary_file - 边界文件(Shapefile压缩包)
- @param coordinate_file - 坐标文件
- @returns {Dict[str, Any]} 添加结果
- """
- try:
- # 创建县市专用目录
- county_dir = os.path.join(self.output_data_dir, "counties", county_name)
- os.makedirs(county_dir, exist_ok=True)
-
- # 处理边界文件
- if boundary_file.filename.endswith('.zip'):
- boundary_content = await boundary_file.read()
- boundary_zip_path = os.path.join(county_dir, "boundary.zip")
- with open(boundary_zip_path, 'wb') as f:
- f.write(boundary_content)
-
- # 解压Shapefile
- with zipfile.ZipFile(boundary_zip_path, 'r') as zip_ref:
- zip_ref.extractall(os.path.join(county_dir, "boundary"))
-
- # 处理坐标文件
- coordinate_content = await coordinate_file.read()
- coordinate_path = os.path.join(county_dir, "coordinates.csv")
- df_coords = pd.read_csv(io.StringIO(coordinate_content.decode('utf-8')))
- df_coords.to_csv(coordinate_path, index=False, encoding='utf-8-sig')
-
- # 更新支持的县市配置
- self.supported_counties[county_name] = {
- "boundary_file": os.path.join(county_dir, "boundary"),
- "coordinate_file": coordinate_path,
- "template_file": "", # 需要后续生成
- "region_code": "",
- "display_name": county_name,
- "province": ""
- }
-
- return {
- "county_name": county_name,
- "boundary_path": os.path.join(county_dir, "boundary"),
- "coordinate_path": coordinate_path,
- "status": "success"
- }
-
- except Exception as e:
- self.logger.error(f"添加县市支持失败: {str(e)}")
- raise
-
- async def generate_crop_cd_prediction_for_county(
- self,
- county_name: str,
- data_file: Optional[str] = None
- ) -> Dict[str, Any]:
- """
- 为指定县市生成作物Cd预测
-
- @param {str} county_name - 县市名称
- @param {Optional[str]} data_file - 可选的数据文件路径
- @returns {Dict[str, Any]} 预测结果信息
- """
- if not self.is_county_supported(county_name):
- raise ValueError(f"不支持的县市: {county_name}")
-
- try:
- # 获取县市配置
- county_config = self.supported_counties[county_name]
-
- # 如果提供了自定义数据文件,使用它替换默认数据
- if data_file:
- # 准备作物Cd模型的自定义数据
- self._prepare_crop_cd_custom_data(data_file, county_name)
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_crop_cd_prediction_with_county,
- county_name, county_config
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"为{county_name}生成作物Cd预测失败: {str(e)}")
- raise
-
- async def generate_effective_cd_prediction_for_county(
- self,
- county_name: str,
- data_file: Optional[str] = None
- ) -> Dict[str, Any]:
- """
- 为指定县市生成有效态Cd预测
-
- @param {str} county_name - 县市名称
- @param {Optional[str]} data_file - 可选的数据文件路径
- @returns {Dict[str, Any]} 预测结果信息
- """
- if not self.is_county_supported(county_name):
- raise ValueError(f"不支持的县市: {county_name}")
-
- try:
- # 获取县市配置
- county_config = self.supported_counties[county_name]
-
- # 如果提供了自定义数据文件,使用它替换默认数据
- if data_file:
- # 准备有效态Cd模型的自定义数据
- self._prepare_effective_cd_custom_data(data_file, county_name)
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_effective_cd_prediction_with_county,
- county_name, county_config
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"为{county_name}生成有效态Cd预测失败: {str(e)}")
- raise
-
- def _prepare_crop_cd_custom_data(self, data_file: str, county_name: str):
- """
- 准备作物Cd模型的自定义数据文件
-
- @param {str} data_file - 数据文件路径
- @param {str} county_name - 县市名称
- """
- try:
- import pandas as pd
-
- # 读取用户上传的CSV文件
- df = pd.read_csv(data_file, encoding='utf-8')
-
- # 获取Cd预测系统的数据目录
- cd_system_path = self.config.get_cd_system_path()
-
- # 1. 提取坐标信息并保存为独立的坐标文件
- coordinates_df = pd.DataFrame({
- 'longitude': df.iloc[:, 0], # 第一列为经度
- 'latitude': df.iloc[:, 1] # 第二列为纬度
- })
-
- # 保存坐标文件到系统数据目录
- coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
- os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
- coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
- self.logger.info(f"坐标文件已保存: {coord_file_path}")
-
- # 2. 准备作物Cd模型的训练数据
- crop_cd_data_dir = os.path.join(cd_system_path, "models", "crop_cd_model", "data")
- crop_target_file = os.path.join(crop_cd_data_dir, "areatest.csv")
-
- # 备份原始文件
- if os.path.exists(crop_target_file):
- backup_file = f"{crop_target_file}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
- shutil.copy2(crop_target_file, backup_file)
- self.logger.info(f"作物Cd模型原始数据已备份: {backup_file}")
-
- # 提取环境因子数据(去掉前两列的经纬度)
- environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
-
- # 保存环境因子数据到作物Cd模型目录(不包含坐标)
- environmental_data.to_csv(crop_target_file, index=False, encoding='utf-8-sig')
- self.logger.info(f"作物Cd模型数据文件已保存: {crop_target_file}, 数据形状: {environmental_data.shape}")
-
- self.logger.info(f"作物Cd模型自定义数据文件已准备完成,县市: {county_name}")
-
- except Exception as e:
- self.logger.error(f"准备作物Cd模型自定义数据文件失败: {str(e)}")
- raise
-
- def _prepare_effective_cd_custom_data(self, data_file: str, county_name: str):
- """
- 准备有效态Cd模型的自定义数据文件
-
- @param {str} data_file - 数据文件路径
- @param {str} county_name - 县市名称
- """
- try:
- import pandas as pd
-
- # 读取用户上传的CSV文件
- df = pd.read_csv(data_file, encoding='utf-8')
-
- # 获取Cd预测系统的数据目录
- cd_system_path = self.config.get_cd_system_path()
-
- # 1. 提取坐标信息并保存为独立的坐标文件
- coordinates_df = pd.DataFrame({
- 'longitude': df.iloc[:, 0], # 第一列为经度
- 'latitude': df.iloc[:, 1] # 第二列为纬度
- })
-
- # 保存坐标文件到系统数据目录
- coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
- os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
- coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
- self.logger.info(f"坐标文件已保存: {coord_file_path}")
-
- # 2. 准备有效态Cd模型的训练数据
- effective_cd_data_dir = os.path.join(cd_system_path, "models", "effective_cd_model", "data")
- effective_target_file = os.path.join(effective_cd_data_dir, "areatest.csv")
-
- # 备份原始文件
- if os.path.exists(effective_target_file):
- backup_file = f"{effective_target_file}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
- shutil.copy2(effective_target_file, backup_file)
- self.logger.info(f"有效态Cd模型原始数据已备份: {backup_file}")
-
- # 检查用户数据是否包含足够的环境因子列
- user_env_columns = df.shape[1] - 2 # 减去经纬度列
-
- if user_env_columns >= 21:
- # 用户数据包含足够的环境因子列,使用用户数据
- # 提取环境因子数据(去掉前两列的经纬度)
- user_environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
-
- # 如果用户数据列数超过21列,只取前21列
- if user_environmental_data.shape[1] > 21:
- user_environmental_data = user_environmental_data.iloc[:, :21]
- self.logger.info(f"用户数据有{df.shape[1] - 2}列环境因子,取前21列用于有效态Cd模型")
-
- # 保存用户的环境因子数据到有效态Cd模型目录
- user_environmental_data.to_csv(effective_target_file, index=False, encoding='utf-8-sig')
- self.logger.info(f"有效态Cd模型使用用户数据,形状: {user_environmental_data.shape}")
- else:
- # 用户数据环境因子列数不足,恢复使用原始数据
- if os.path.exists(backup_file):
- shutil.copy2(backup_file, effective_target_file)
- self.logger.info(f"用户数据环境因子列数不足({user_env_columns} < 21),有效态Cd模型恢复使用原始21列数据")
- else:
- self.logger.warning(f"用户数据环境因子列数不足且未找到备份文件,继续使用当前数据")
-
- self.logger.info(f"有效态Cd模型自定义数据文件已准备完成,县市: {county_name}")
-
- except Exception as e:
- self.logger.error(f"准备有效态Cd模型自定义数据文件失败: {str(e)}")
- raise
-
- def _run_crop_cd_prediction_with_county(self, county_name: str,
- county_config: Dict[str, Any]) -> Dict[str, Any]:
- """
- 执行指定县市的作物Cd预测
-
- @param {str} county_name - 县市名称
- @param {Dict[str, Any]} county_config - 县市配置
- @returns {Dict[str, Any]} 预测结果信息
- """
- try:
- # 运行作物Cd预测
- self.logger.info(f"为{county_name}执行作物Cd预测")
- prediction_result = self.wrapper.run_prediction_script("crop")
-
- # 获取输出文件
- latest_outputs = self.wrapper.get_latest_outputs("all")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- model_type = f"crop_cd_{county_name}"
- copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
-
- # 清理旧文件
- self._cleanup_old_files(model_type)
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': model_type,
- 'county_name': county_name,
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"为{county_name}执行作物Cd预测失败: {str(e)}")
- raise
-
- def _run_effective_cd_prediction_with_county(self, county_name: str,
- county_config: Dict[str, Any]) -> Dict[str, Any]:
- """
- 执行指定县市的有效态Cd预测
-
- @param {str} county_name - 县市名称
- @param {Dict[str, Any]} county_config - 县市配置
- @returns {Dict[str, Any]} 预测结果信息
- """
- try:
- # 运行有效态Cd预测
- self.logger.info(f"为{county_name}执行有效态Cd预测")
- prediction_result = self.wrapper.run_prediction_script("effective")
-
- # 获取输出文件
- latest_outputs = self.wrapper.get_latest_outputs("all")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- model_type = f"effective_cd_{county_name}"
- copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
-
- # 清理旧文件
- self._cleanup_old_files(model_type)
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': model_type,
- 'county_name': county_name,
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"为{county_name}执行有效态Cd预测失败: {str(e)}")
- raise
-
- async def generate_crop_cd_prediction(self) -> Dict[str, Any]:
- """
- 生成作物Cd预测结果和可视化
-
- @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
- @throws {Exception} 当预测过程发生错误时抛出
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_crop_cd_prediction()
- >>> print(result['map_path'])
- """
- try:
- self.logger.info("开始作物Cd模型预测流程")
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_crop_cd_prediction
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"作物Cd预测流程失败: {str(e)}")
- raise
-
- async def generate_effective_cd_prediction(self) -> Dict[str, Any]:
- """
- 生成有效态Cd预测结果和可视化
-
- @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
- @throws {Exception} 当预测过程发生错误时抛出
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_effective_cd_prediction()
- >>> print(result['map_path'])
- """
- try:
- self.logger.info("开始有效态Cd模型预测流程")
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_effective_cd_prediction
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"有效态Cd预测流程失败: {str(e)}")
- raise
-
- def _run_crop_cd_prediction(self) -> Dict[str, Any]:
- """
- 执行作物Cd预测的同步逻辑
-
- @returns {Dict[str, Any]} 预测结果信息
- @throws {Exception} 当预测过程发生错误时抛出
- """
- try:
- # 运行作物Cd预测
- self.logger.info("执行作物Cd预测")
- prediction_result = self.wrapper.run_prediction_script("crop")
-
- # 获取输出文件
- latest_outputs = self.wrapper.get_latest_outputs("all")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- copied_files = self._copy_output_files(latest_outputs, "crop_cd", timestamp)
-
- # 清理旧文件
- self._cleanup_old_files("crop_cd")
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': 'crop_cd',
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"作物Cd预测执行失败: {str(e)}")
- raise
-
- def _run_effective_cd_prediction(self) -> Dict[str, Any]:
- """
- 执行有效态Cd预测的同步逻辑
-
- @returns {Dict[str, Any]} 预测结果信息
- @throws {Exception} 当预测过程发生错误时抛出
- """
- try:
- # 运行有效态Cd预测
- self.logger.info("执行有效态Cd预测")
- prediction_result = self.wrapper.run_prediction_script("effective")
-
- # 获取输出文件
- latest_outputs = self.wrapper.get_latest_outputs("all")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- copied_files = self._copy_output_files(latest_outputs, "effective_cd", timestamp)
-
- # 清理旧文件
- self._cleanup_old_files("effective_cd")
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': 'effective_cd',
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"有效态Cd预测执行失败: {str(e)}")
- raise
-
- def _copy_output_files(self, latest_outputs: Dict[str, Optional[str]],
- model_type: str, timestamp: str) -> Dict[str, Optional[str]]:
- """
- 复制输出文件到API目录
-
- @param {Dict[str, Optional[str]]} latest_outputs - 最新输出文件路径
- @param {str} model_type - 模型类型
- @param {str} timestamp - 时间戳
- @returns {Dict[str, Optional[str]]} 复制后的文件路径
- """
- copied_files = {}
-
- try:
- # 复制地图文件
- if latest_outputs.get('latest_map'):
- src_map = latest_outputs['latest_map']
- dst_map = os.path.join(
- self.output_figures_dir,
- f"{model_type}_prediction_map_{timestamp}.jpg"
- )
- shutil.copy2(src_map, dst_map)
- copied_files['map_path'] = dst_map
- self.logger.info(f"地图文件已复制到: {dst_map}")
-
- # 复制直方图文件
- if latest_outputs.get('latest_histogram'):
- src_histogram = latest_outputs['latest_histogram']
- dst_histogram = os.path.join(
- self.output_figures_dir,
- f"{model_type}_prediction_histogram_{timestamp}.jpg"
- )
- shutil.copy2(src_histogram, dst_histogram)
- copied_files['histogram_path'] = dst_histogram
- self.logger.info(f"直方图文件已复制到: {dst_histogram}")
-
- # 复制栅格文件
- if latest_outputs.get('latest_raster'):
- src_raster = latest_outputs['latest_raster']
- dst_raster = os.path.join(
- self.output_raster_dir,
- f"{model_type}_prediction_raster_{timestamp}.tif"
- )
- shutil.copy2(src_raster, dst_raster)
- copied_files['raster_path'] = dst_raster
- self.logger.info(f"栅格文件已复制到: {dst_raster}")
-
- except Exception as e:
- self.logger.error(f"复制输出文件失败: {str(e)}")
-
- return copied_files
-
- def _cleanup_old_files(self, model_type: str):
- """
- 清理旧的预测文件
-
- @param {str} model_type - 模型类型
- """
- try:
- max_files = self.config.get_api_config().get("max_prediction_files", 10)
-
- # 清理地图文件
- map_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_map_*.jpg")
- self._cleanup_files_by_pattern(map_pattern, max_files)
-
- # 清理直方图文件
- histogram_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_histogram_*.jpg")
- self._cleanup_files_by_pattern(histogram_pattern, max_files)
-
- # 清理栅格文件
- raster_pattern = os.path.join(self.output_raster_dir, f"{model_type}_prediction_raster_*.tif")
- self._cleanup_files_by_pattern(raster_pattern, max_files)
-
- except Exception as e:
- self.logger.warning(f"清理旧文件失败: {str(e)}")
-
- def _cleanup_files_by_pattern(self, pattern: str, max_files: int):
- """
- 按模式清理文件
-
- @param {str} pattern - 文件模式
- @param {int} max_files - 最大保留文件数
- """
- try:
- files = glob.glob(pattern)
- if len(files) > max_files:
- # 按修改时间排序,删除最旧的文件
- files.sort(key=os.path.getmtime)
- for file_to_delete in files[:-max_files]:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧文件: {file_to_delete}")
- except Exception as e:
- self.logger.warning(f"清理文件失败 {pattern}: {str(e)}")
-
- def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]:
- """
- 获取文件统计信息
-
- @param {Optional[str]} file_path - 文件路径
- @returns {Dict[str, Any]} 文件统计信息
- """
- if not file_path or not os.path.exists(file_path):
- return {}
-
- try:
- stat = os.stat(file_path)
- return {
- 'file_size': stat.st_size,
- 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
- 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat()
- }
- except Exception:
- return {}
-
- def get_latest_crop_cd_map(self) -> Optional[str]:
- """
- 获取最新的作物Cd预测地图文件路径
-
- @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_map_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_effective_cd_map(self) -> Optional[str]:
- """
- 获取最新的有效态Cd预测地图文件路径
-
- @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_map_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_crop_cd_histogram(self) -> Optional[str]:
- """
- 获取最新的作物Cd预测直方图文件路径
-
- @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_histogram_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_effective_cd_histogram(self) -> Optional[str]:
- """
- 获取最新的有效态Cd预测直方图文件路径
-
- @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_histogram_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
|