""" Cd预测服务类 v3.0 @description: 完全自包含的服务,不依赖Cd_Prediction_Integrated_System @author: AcidMap Team @version: 3.0.0 """ import os import logging import asyncio import tempfile import json from datetime import datetime from typing import Dict, Any, Optional, List import pandas as pd import glob from ..config.cd_prediction_config import cd_config from ..models.cd_prediction import CdPredictionEngine from ..database import SessionLocal from .admin_boundary_service import get_boundary_geojson_by_name class CdPredictionServiceV3: """ Cd预测服务类 v3.0 - 完全自包含版本 @description: 使用自包含的预测模型,完全不依赖外部集成系统 """ def __init__(self): """ 初始化Cd预测服务 """ # 设置日志 self.logger = logging.getLogger(__name__) # 获取配置 self.config = cd_config # 设置输出目录(使用基础输出目录) self.output_base_dir = self.config.get_output_dir("base") # 初始化预测引擎 self.engine = CdPredictionEngine(self.output_base_dir) # 输出子目录 self.output_figures_dir = os.path.join(self.output_base_dir, "figures") self.output_raster_dir = os.path.join(self.output_base_dir, "raster") self.output_data_dir = os.path.join(self.output_base_dir, "data") # 支持的县市配置 self.supported_counties = self._load_supported_counties() self.logger.info("Cd预测服务v3.0初始化完成(完全自包含)") def _load_supported_counties(self) -> Dict[str, Dict]: """ 加载支持的县市配置 @returns {Dict[str, Dict]} 支持的县市配置信息 """ return { "乐昌市": { "region_code": "440282", "display_name": "乐昌市", "province": "广东省" } # 可以继续添加更多县市 } def is_county_supported(self, county_name: str) -> bool: """ 检查县市是否被支持 @param {str} county_name - 县市名称 @returns {bool} 是否支持该县市 """ return county_name in self.supported_counties def get_supported_counties(self) -> List[str]: """ 获取支持的县市名称列表 @returns {List[str]} 支持的县市名称列表 """ return list(self.supported_counties.keys()) def get_supported_counties_info(self) -> List[Dict[str, Any]]: """ 获取支持的县市详细信息 @returns {List[Dict[str, Any]]} 支持的县市详细信息列表 """ counties_info = [] for county_name, config in self.supported_counties.items(): counties_info.append({ "name": county_name, "display_name": config.get("display_name", county_name), "province": config.get("province", ""), "region_code": config.get("region_code", ""), "supported": True # v3.0版本默认都支持 }) return counties_info def validate_input_data(self, df: pd.DataFrame, county_name: str) -> Dict[str, Any]: """ 验证输入数据格式 @param {pd.DataFrame} df - 输入数据 @param {str} county_name - 县市名称 @returns {Dict[str, Any]} 验证结果 """ # 基本要求:前两列为坐标,至少需要3列数据 if df.shape[1] < 3: return { "valid": False, "errors": ["数据至少需要3列:前两列为经纬度,后续列为环境因子"], "warnings": [], "data_shape": df.shape, "county_supported": self.is_county_supported(county_name) } # 坐标列验证 coordinate_issues = [] try: # 检查前两列是否为数值型 if not pd.api.types.is_numeric_dtype(df.iloc[:, 0]): coordinate_issues.append("第一列(经度)不是数值型数据") elif not df.iloc[:, 0].between(70, 140).all(): coordinate_issues.append("经度值超出合理范围(70-140度)") if not pd.api.types.is_numeric_dtype(df.iloc[:, 1]): coordinate_issues.append("第二列(纬度)不是数值型数据") elif not df.iloc[:, 1].between(15, 55).all(): coordinate_issues.append("纬度值超出合理范围(15-55度)") except Exception as e: coordinate_issues.append(f"坐标数据验证失败: {str(e)}") # 检查数据完整性 null_counts = df.isnull().sum() high_null_columns = [col for col, count in null_counts.items() if count > len(df) * 0.1] # 检查环境因子列是否为数值型 non_numeric_columns = [] for i in range(2, df.shape[1]): col_name = df.columns[i] if not pd.api.types.is_numeric_dtype(df.iloc[:, i]): non_numeric_columns.append(col_name) warnings = [] if high_null_columns: warnings.append(f"以下列空值较多: {', '.join(high_null_columns)}") if coordinate_issues: warnings.extend(coordinate_issues) if non_numeric_columns: warnings.append(f"以下列不是数值型: {', '.join(non_numeric_columns)}") # 如果有严重的坐标问题,标记为无效 critical_errors = [] if any("不是数值型数据" in issue for issue in coordinate_issues): critical_errors.extend([issue for issue in coordinate_issues if "不是数值型数据" in issue]) return { "valid": len(critical_errors) == 0, "errors": critical_errors, "warnings": warnings, "data_shape": df.shape, "county_supported": self.is_county_supported(county_name) } def save_temp_data(self, df: pd.DataFrame, county_name: str) -> str: """ 保存临时数据文件 @param {pd.DataFrame} df - 数据 @param {str} county_name - 县市名称 @returns {str} 临时文件路径 """ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"{county_name}_temp_data_{timestamp}.csv" file_path = os.path.join(self.output_data_dir, "temp", filename) # 确保目录存在 os.makedirs(os.path.dirname(file_path), exist_ok=True) # 保存数据 df.to_csv(file_path, index=False, encoding='utf-8-sig') file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0 self.logger.info(f"临时数据文件已保存: {file_path} ({file_size:,} bytes)") # 清理旧的临时文件 self._cleanup_temp_files(county_name) return file_path def _get_boundary_geojson(self, county_name: str) -> Optional[str]: """ 获取县市边界的GeoJSON文件 @param {str} county_name - 县市名称 @returns {Optional[str]} GeoJSON文件路径,如果失败则返回None """ try: db = SessionLocal() feature = get_boundary_geojson_by_name(db, county_name, level="auto") fc = {"type": "FeatureCollection", "features": [feature]} # 创建临时GeoJSON文件 tmp_dir = tempfile.mkdtemp() tmp_geojson = os.path.join(tmp_dir, "boundary.geojson") with open(tmp_geojson, 'w', encoding='utf-8') as f: json.dump(fc, f, ensure_ascii=False) return tmp_geojson except Exception as e: self.logger.warning(f"从数据库获取边界失败: {str(e)}") return None finally: try: db.close() except Exception: pass async def generate_crop_cd_prediction_for_county( self, county_name: str, data_file: Optional[str] = None, raster_config_override: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 为指定县市生成作物Cd预测 @param {str} county_name - 县市名称 @param {Optional[str]} data_file - 可选的数据文件路径 @param {Optional[Dict[str, Any]]} raster_config_override - 栅格配置覆盖参数 @returns {Dict[str, Any]} 预测结果信息 """ if not self.is_county_supported(county_name): raise ValueError(f"不支持的县市: {county_name}") try: self.logger.info(f"开始为{county_name}生成作物Cd预测(v3.0自包含版本)") # 读取数据文件 if not data_file or not os.path.exists(data_file): raise ValueError("需要提供有效的数据文件") df = pd.read_csv(data_file, encoding='utf-8') # 确保前两列为经纬度 df.columns = ['longitude', 'latitude'] + list(df.columns[2:]) # 获取边界文件 boundary_geojson = self._get_boundary_geojson(county_name) # 在线程池中运行预测 loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, self._run_crop_cd_prediction, df, county_name, boundary_geojson, raster_config_override ) # 使用统一绘图接口,文件已直接生成在正确位置,无需复制 api_result = self._format_api_result(result, county_name, "crop_cd") # 清理临时文件 self._cleanup_after_prediction(boundary_geojson) return api_result except Exception as e: self.logger.error(f"为{county_name}生成作物Cd预测失败: {str(e)}") raise async def generate_effective_cd_prediction_for_county( self, county_name: str, data_file: Optional[str] = None, raster_config_override: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 为指定县市生成有效态Cd预测 @param {str} county_name - 县市名称 @param {Optional[str]} data_file - 可选的数据文件路径 @param {Optional[Dict[str, Any]]} raster_config_override - 栅格配置覆盖参数 @returns {Dict[str, Any]} 预测结果信息 """ if not self.is_county_supported(county_name): raise ValueError(f"不支持的县市: {county_name}") try: self.logger.info(f"开始为{county_name}生成有效态Cd预测(v3.0自包含版本)") # 读取数据文件 if not data_file or not os.path.exists(data_file): raise ValueError("需要提供有效的数据文件") df = pd.read_csv(data_file, encoding='utf-8') # 确保前两列为经纬度 df.columns = ['longitude', 'latitude'] + list(df.columns[2:]) # 获取边界文件 boundary_geojson = self._get_boundary_geojson(county_name) # 在线程池中运行预测 loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, self._run_effective_cd_prediction, df, county_name, boundary_geojson, raster_config_override ) # 使用统一绘图接口,文件已直接生成在正确位置,无需复制 api_result = self._format_api_result(result, county_name, "effective_cd") # 清理临时文件 self._cleanup_after_prediction(boundary_geojson) return api_result except Exception as e: self.logger.error(f"为{county_name}生成有效态Cd预测失败: {str(e)}") raise def _run_crop_cd_prediction(self, df: pd.DataFrame, county_name: str, boundary_geojson: Optional[str], raster_config_override: Optional[Dict[str, Any]]) -> Dict[str, Any]: """ 执行作物Cd预测(同步方法) @param {pd.DataFrame} df - 输入数据 @param {str} county_name - 县市名称 @param {Optional[str]} boundary_geojson - 边界文件路径 @param {Optional[Dict[str, Any]]} raster_config_override - 栅格配置覆盖参数 @returns {Dict[str, Any]} 预测结果 """ try: # 准备边界数据 boundary_gdf = None if boundary_geojson: try: import geopandas as gpd boundary_gdf = gpd.read_file(boundary_geojson) self.logger.info(f"已加载边界文件: {boundary_geojson}") except Exception as e: self.logger.warning(f"加载边界文件失败,将不使用边界限制: {str(e)}") boundary_gdf = None # 执行预测和可视化(使用统一绘图接口,不保存栅格文件) result = self.engine.predict_and_visualize( input_data=df, model_type="crop_cd", county_name=county_name, boundary_gdf=boundary_gdf, raster_config_override=raster_config_override, save_raster=False # 不保存栅格文件,节省存储空间 ) return result except Exception as e: self.logger.error(f"作物Cd预测执行失败: {str(e)}") raise def _run_effective_cd_prediction(self, df: pd.DataFrame, county_name: str, boundary_geojson: Optional[str], raster_config_override: Optional[Dict[str, Any]]) -> Dict[str, Any]: """ 执行有效态Cd预测(同步方法) @param {pd.DataFrame} df - 输入数据 @param {str} county_name - 县市名称 @param {Optional[str]} boundary_geojson - 边界文件路径 @param {Optional[Dict[str, Any]]} raster_config_override - 栅格配置覆盖参数 @returns {Dict[str, Any]} 预测结果 """ try: # 准备边界数据 boundary_gdf = None if boundary_geojson: try: import geopandas as gpd boundary_gdf = gpd.read_file(boundary_geojson) self.logger.info(f"已加载边界文件: {boundary_geojson}") except Exception as e: self.logger.warning(f"加载边界文件失败,将不使用边界限制: {str(e)}") boundary_gdf = None # 执行预测和可视化(使用统一绘图接口,不保存栅格文件) result = self.engine.predict_and_visualize( input_data=df, model_type="effective_cd", county_name=county_name, boundary_gdf=boundary_gdf, raster_config_override=raster_config_override, save_raster=False # 不保存栅格文件,节省存储空间 ) return result except Exception as e: self.logger.error(f"有效态Cd预测执行失败: {str(e)}") raise def _format_api_result(self, result: Dict[str, Any], county_name: str, model_type: str) -> Dict[str, Any]: """ 格式化API返回结果(使用统一绘图接口,无需复制文件) @param {Dict[str, Any]} result - 预测结果 @param {str} county_name - 县市名称 @param {str} model_type - 模型类型 @returns {Dict[str, Any]} 格式化的API结果 """ try: model_prefix = f"{model_type}_{county_name}" # 直接使用引擎生成的文件路径,无需复制 api_result = { 'map_path': result.get('map_path'), 'histogram_path': result.get('histogram_path'), 'raster_path': result.get('raster_path'), 'model_type': model_type, 'county_name': county_name, 'timestamp': result.get('timestamp'), 'validation': result.get('validation', {}), 'stats': self._get_file_stats(result.get('map_path')) } # 清理旧文件(保留) self._cleanup_old_files(model_prefix) self.logger.info(f"API结果格式化完成(统一接口): {model_type}_{county_name}") return api_result except Exception as e: self.logger.error(f"格式化API结果失败: {str(e)}") raise def _cleanup_after_prediction(self, boundary_geojson: Optional[str]): """ 预测后清理工作 @param {Optional[str]} boundary_geojson - 临时边界文件路径 """ try: # 清理预测引擎的临时文件 self.engine.cleanup_temp_files() # 清理临时边界文件 if boundary_geojson and os.path.exists(boundary_geojson): import shutil shutil.rmtree(os.path.dirname(boundary_geojson), ignore_errors=True) self.logger.info("临时边界文件已清理") except Exception as e: self.logger.warning(f"预测后清理失败: {str(e)}") def _cleanup_temp_files(self, county_name: str, max_files: int = 5): """ 清理临时数据文件 @param {str} county_name - 县市名称 @param {int} max_files - 最大保留文件数 """ try: temp_dir = os.path.join(self.output_data_dir, "temp") temp_pattern = os.path.join(temp_dir, f"{county_name}_temp_data_*.csv") temp_files = glob.glob(temp_pattern) if len(temp_files) > max_files: temp_files.sort(key=os.path.getmtime) files_to_delete = temp_files[:-max_files] for file_to_delete in files_to_delete: os.remove(file_to_delete) self.logger.info(f"已删除旧临时文件: {os.path.basename(file_to_delete)}") except Exception as e: self.logger.warning(f"清理临时文件失败: {str(e)}") def _cleanup_old_files(self, model_prefix: str): """ 清理旧的预测文件 @param {str} model_prefix - 模型前缀 """ try: max_files = 10 # 清理地图文件 map_pattern = os.path.join(self.output_figures_dir, f"{model_prefix}_prediction_map_*.jpg") self._cleanup_files_by_pattern(map_pattern, max_files) # 清理直方图文件 histogram_pattern = os.path.join(self.output_figures_dir, f"{model_prefix}_prediction_histogram_*.jpg") self._cleanup_files_by_pattern(histogram_pattern, max_files) except Exception as e: self.logger.warning(f"清理旧文件失败: {str(e)}") def _cleanup_files_by_pattern(self, pattern: str, max_files: int): """ 按模式清理文件 @param {str} pattern - 文件模式 @param {int} max_files - 最大保留文件数 """ try: files = glob.glob(pattern) if len(files) > max_files: files.sort(key=os.path.getmtime) for file_to_delete in files[:-max_files]: os.remove(file_to_delete) self.logger.info(f"已删除旧文件: {os.path.basename(file_to_delete)}") except Exception as e: self.logger.warning(f"清理文件失败 {pattern}: {str(e)}") def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]: """ 获取文件统计信息 @param {Optional[str]} file_path - 文件路径 @returns {Dict[str, Any]} 文件统计信息 """ if not file_path or not os.path.exists(file_path): return {} try: stat = os.stat(file_path) return { 'file_size': stat.st_size, 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat() } except Exception: return {} def get_engine_info(self) -> Dict[str, Any]: """ 获取引擎信息 @returns {Dict[str, Any]} 引擎信息 """ try: return self.engine.get_model_info() except Exception as e: self.logger.error(f"获取引擎信息失败: {str(e)}") return {"error": str(e)} # ============================================================================= # 统计信息方法(简化版本,从最终数据文件中读取) # ============================================================================= def get_crop_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]: """ 获取作物Cd预测结果的统计信息 @param {str} county_name - 县市名称 @returns {Optional[Dict[str, Any]]} 统计信息 """ try: # 查找最新的最终数据文件 final_pattern = os.path.join(self.output_data_dir, "final", f"Final_predictions_crop_cd_*.csv") final_files = glob.glob(final_pattern) if not final_files: return None latest_file = max(final_files, key=os.path.getmtime) df = pd.read_csv(latest_file) if 'Prediction' not in df.columns: return None predictions = df['Prediction'] basic_stats = { "数据点总数": len(predictions), "均值": float(predictions.mean()), "中位数": float(predictions.median()), "标准差": float(predictions.std()), "最小值": float(predictions.min()), "最大值": float(predictions.max()), "25%分位数": float(predictions.quantile(0.25)), "75%分位数": float(predictions.quantile(0.75)) } return { "模型类型": "作物Cd模型v3.0", "县市名称": county_name, "数据更新时间": datetime.fromtimestamp(os.path.getmtime(latest_file)).isoformat(), "基础统计": basic_stats } except Exception as e: self.logger.error(f"获取作物Cd统计信息失败: {str(e)}") return None def get_effective_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]: """ 获取有效态Cd预测结果的统计信息 @param {str} county_name - 县市名称 @returns {Optional[Dict[str, Any]]} 统计信息 """ try: # 查找最新的最终数据文件 final_pattern = os.path.join(self.output_data_dir, "final", f"Final_predictions_effective_cd_*.csv") final_files = glob.glob(final_pattern) if not final_files: return None latest_file = max(final_files, key=os.path.getmtime) df = pd.read_csv(latest_file) if 'Prediction' not in df.columns: return None predictions = df['Prediction'] basic_stats = { "数据点总数": len(predictions), "均值": float(predictions.mean()), "中位数": float(predictions.median()), "标准差": float(predictions.std()), "最小值": float(predictions.min()), "最大值": float(predictions.max()), "25%分位数": float(predictions.quantile(0.25)), "75%分位数": float(predictions.quantile(0.75)) } return { "模型类型": "有效态Cd模型v3.0", "县市名称": county_name, "数据更新时间": datetime.fromtimestamp(os.path.getmtime(latest_file)).isoformat(), "基础统计": basic_stats } except Exception as e: self.logger.error(f"获取有效态Cd统计信息失败: {str(e)}") return None def get_combined_statistics(self, county_name: str) -> Optional[Dict[str, Any]]: """ 获取综合预测统计信息 @param {str} county_name - 县市名称 @returns {Optional[Dict[str, Any]]} 综合统计信息 """ try: crop_stats = self.get_crop_cd_statistics(county_name) effective_stats = self.get_effective_cd_statistics(county_name) if not crop_stats and not effective_stats: return None return { "县市名称": county_name, "作物Cd统计": crop_stats, "有效态Cd统计": effective_stats, "生成时间": datetime.now().isoformat(), "版本": "v3.0自包含版本" } except Exception as e: self.logger.error(f"获取综合统计信息失败: {str(e)}") return None def get_all_counties_statistics(self) -> Dict[str, Any]: """ 获取所有支持县市的统计概览 @returns {Dict[str, Any]} 所有县市的统计概览 """ try: all_stats = { "支持县市总数": len(self.supported_counties), "统计生成时间": datetime.now().isoformat(), "版本": "v3.0自包含版本", "县市统计": {}, "汇总信息": { "有作物Cd数据的县市": 0, "有有效态Cd数据的县市": 0, "数据完整的县市": 0 } } for county_name in self.supported_counties.keys(): county_stats = { "县市名称": county_name, "有作物Cd数据": False, "有有效态Cd数据": False, "数据完整": False } # 检查作物Cd数据 crop_stats = self.get_crop_cd_statistics(county_name) if crop_stats: county_stats["有作物Cd数据"] = True all_stats["汇总信息"]["有作物Cd数据的县市"] += 1 # 检查有效态Cd数据 effective_stats = self.get_effective_cd_statistics(county_name) if effective_stats: county_stats["有有效态Cd数据"] = True all_stats["汇总信息"]["有有效态Cd数据的县市"] += 1 # 检查数据完整性 if county_stats["有作物Cd数据"] and county_stats["有有效态Cd数据"]: county_stats["数据完整"] = True all_stats["汇总信息"]["数据完整的县市"] += 1 all_stats["县市统计"][county_name] = county_stats return all_stats except Exception as e: self.logger.error(f"获取所有县市统计概览失败: {str(e)}") return { "error": f"获取统计概览失败: {str(e)}", "支持县市总数": len(self.supported_counties), "统计生成时间": datetime.now().isoformat(), "版本": "v3.0自包含版本" }