|
- """
- Cd预测服务类 v3.0
- @description: 完全自包含的服务,不依赖Cd_Prediction_Integrated_System
- @author: AcidMap Team
- @version: 3.0.0
- """
- import os
- import logging
- import asyncio
- import tempfile
- import json
- from datetime import datetime
- from typing import Dict, Any, Optional, List
- import pandas as pd
- import glob
- from ..config.cd_prediction_config import cd_config
- from ..models.cd_prediction import CdPredictionEngine
- from ..database import SessionLocal
- from .admin_boundary_service import get_boundary_geojson_by_name
- class CdPredictionServiceV3:
- """
- Cd预测服务类 v3.0 - 完全自包含版本
-
- @description: 使用自包含的预测模型,完全不依赖外部集成系统
- """
-
- def __init__(self):
- """
- 初始化Cd预测服务
- """
- # 设置日志
- self.logger = logging.getLogger(__name__)
-
- # 获取配置
- self.config = cd_config
-
- # 设置输出目录(使用基础输出目录)
- self.output_base_dir = self.config.get_output_dir("base")
-
- # 初始化预测引擎
- self.engine = CdPredictionEngine(self.output_base_dir)
-
- # 输出子目录
- self.output_figures_dir = os.path.join(self.output_base_dir, "figures")
- self.output_raster_dir = os.path.join(self.output_base_dir, "raster")
- self.output_data_dir = os.path.join(self.output_base_dir, "data")
-
- # 支持的县市配置
- self.supported_counties = self._load_supported_counties()
-
- self.logger.info("Cd预测服务v3.0初始化完成(完全自包含)")
-
- def _load_supported_counties(self) -> Dict[str, Dict]:
- """
- 加载支持的县市配置
-
- @returns {Dict[str, Dict]} 支持的县市配置信息
- """
- return {
- "乐昌市": {
- "region_code": "440282",
- "display_name": "乐昌市",
- "province": "广东省"
- }
- # 可以继续添加更多县市
- }
-
- def is_county_supported(self, county_name: str) -> bool:
- """
- 检查县市是否被支持
-
- @param {str} county_name - 县市名称
- @returns {bool} 是否支持该县市
- """
- return county_name in self.supported_counties
-
- def get_supported_counties(self) -> List[str]:
- """
- 获取支持的县市名称列表
-
- @returns {List[str]} 支持的县市名称列表
- """
- return list(self.supported_counties.keys())
-
- def get_supported_counties_info(self) -> List[Dict[str, Any]]:
- """
- 获取支持的县市详细信息
-
- @returns {List[Dict[str, Any]]} 支持的县市详细信息列表
- """
- counties_info = []
- for county_name, config in self.supported_counties.items():
- counties_info.append({
- "name": county_name,
- "display_name": config.get("display_name", county_name),
- "province": config.get("province", ""),
- "region_code": config.get("region_code", ""),
- "supported": True # v3.0版本默认都支持
- })
- return counties_info
-
- def validate_input_data(self, df: pd.DataFrame, county_name: str) -> Dict[str, Any]:
- """
- 验证输入数据格式
-
- @param {pd.DataFrame} df - 输入数据
- @param {str} county_name - 县市名称
- @returns {Dict[str, Any]} 验证结果
- """
- # 基本要求:前两列为坐标,至少需要3列数据
- if df.shape[1] < 3:
- return {
- "valid": False,
- "errors": ["数据至少需要3列:前两列为经纬度,后续列为环境因子"],
- "warnings": [],
- "data_shape": df.shape,
- "county_supported": self.is_county_supported(county_name)
- }
-
- # 坐标列验证
- coordinate_issues = []
- try:
- # 检查前两列是否为数值型
- if not pd.api.types.is_numeric_dtype(df.iloc[:, 0]):
- coordinate_issues.append("第一列(经度)不是数值型数据")
- elif not df.iloc[:, 0].between(70, 140).all():
- coordinate_issues.append("经度值超出合理范围(70-140度)")
-
- if not pd.api.types.is_numeric_dtype(df.iloc[:, 1]):
- coordinate_issues.append("第二列(纬度)不是数值型数据")
- elif not df.iloc[:, 1].between(15, 55).all():
- coordinate_issues.append("纬度值超出合理范围(15-55度)")
- except Exception as e:
- coordinate_issues.append(f"坐标数据验证失败: {str(e)}")
-
- # 检查数据完整性
- null_counts = df.isnull().sum()
- high_null_columns = [col for col, count in null_counts.items()
- if count > len(df) * 0.1]
-
- # 检查环境因子列是否为数值型
- non_numeric_columns = []
- for i in range(2, df.shape[1]):
- col_name = df.columns[i]
- if not pd.api.types.is_numeric_dtype(df.iloc[:, i]):
- non_numeric_columns.append(col_name)
-
- warnings = []
- if high_null_columns:
- warnings.append(f"以下列空值较多: {', '.join(high_null_columns)}")
- if coordinate_issues:
- warnings.extend(coordinate_issues)
- if non_numeric_columns:
- warnings.append(f"以下列不是数值型: {', '.join(non_numeric_columns)}")
-
- # 如果有严重的坐标问题,标记为无效
- critical_errors = []
- if any("不是数值型数据" in issue for issue in coordinate_issues):
- critical_errors.extend([issue for issue in coordinate_issues if "不是数值型数据" in issue])
-
- return {
- "valid": len(critical_errors) == 0,
- "errors": critical_errors,
- "warnings": warnings,
- "data_shape": df.shape,
- "county_supported": self.is_county_supported(county_name)
- }
-
- def save_temp_data(self, df: pd.DataFrame, county_name: str) -> str:
- """
- 保存临时数据文件
-
- @param {pd.DataFrame} df - 数据
- @param {str} county_name - 县市名称
- @returns {str} 临时文件路径
- """
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f"{county_name}_temp_data_{timestamp}.csv"
- file_path = os.path.join(self.output_data_dir, "temp", filename)
-
- # 确保目录存在
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
-
- # 保存数据
- df.to_csv(file_path, index=False, encoding='utf-8-sig')
-
- file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
- self.logger.info(f"临时数据文件已保存: {file_path} ({file_size:,} bytes)")
-
- # 清理旧的临时文件
- self._cleanup_temp_files(county_name)
-
- return file_path
-
- def _get_boundary_geojson(self, county_name: str) -> Optional[str]:
- """
- 获取县市边界的GeoJSON文件
-
- @param {str} county_name - 县市名称
- @returns {Optional[str]} GeoJSON文件路径,如果失败则返回None
- """
- try:
- db = SessionLocal()
- feature = get_boundary_geojson_by_name(db, county_name, level="auto")
- fc = {"type": "FeatureCollection", "features": [feature]}
-
- # 创建临时GeoJSON文件
- tmp_dir = tempfile.mkdtemp()
- tmp_geojson = os.path.join(tmp_dir, "boundary.geojson")
- with open(tmp_geojson, 'w', encoding='utf-8') as f:
- json.dump(fc, f, ensure_ascii=False)
-
- return tmp_geojson
-
- except Exception as e:
- self.logger.warning(f"从数据库获取边界失败: {str(e)}")
- return None
- finally:
- try:
- db.close()
- except Exception:
- pass
-
- async def generate_crop_cd_prediction_for_county(
- self,
- county_name: str,
- data_file: Optional[str] = None,
- raster_config_override: Optional[Dict[str, Any]] = None
- ) -> Dict[str, Any]:
- """
- 为指定县市生成作物Cd预测
-
- @param {str} county_name - 县市名称
- @param {Optional[str]} data_file - 可选的数据文件路径
- @param {Optional[Dict[str, Any]]} raster_config_override - 栅格配置覆盖参数
- @returns {Dict[str, Any]} 预测结果信息
- """
- if not self.is_county_supported(county_name):
- raise ValueError(f"不支持的县市: {county_name}")
-
- try:
- self.logger.info(f"开始为{county_name}生成作物Cd预测(v3.0自包含版本)")
-
- # 读取数据文件
- if not data_file or not os.path.exists(data_file):
- raise ValueError("需要提供有效的数据文件")
-
- df = pd.read_csv(data_file, encoding='utf-8')
- # 确保前两列为经纬度
- df.columns = ['longitude', 'latitude'] + list(df.columns[2:])
-
- # 获取边界文件
- boundary_geojson = self._get_boundary_geojson(county_name)
-
- # 在线程池中运行预测
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_crop_cd_prediction,
- df, county_name, boundary_geojson, raster_config_override
- )
-
- # 使用统一绘图接口,文件已直接生成在正确位置,无需复制
- api_result = self._format_api_result(result, county_name, "crop_cd")
-
- # 清理临时文件
- self._cleanup_after_prediction(boundary_geojson)
-
- return api_result
-
- except Exception as e:
- self.logger.error(f"为{county_name}生成作物Cd预测失败: {str(e)}")
- raise
-
- async def generate_effective_cd_prediction_for_county(
- self,
- county_name: str,
- data_file: Optional[str] = None,
- raster_config_override: Optional[Dict[str, Any]] = None
- ) -> Dict[str, Any]:
- """
- 为指定县市生成有效态Cd预测
-
- @param {str} county_name - 县市名称
- @param {Optional[str]} data_file - 可选的数据文件路径
- @param {Optional[Dict[str, Any]]} raster_config_override - 栅格配置覆盖参数
- @returns {Dict[str, Any]} 预测结果信息
- """
- if not self.is_county_supported(county_name):
- raise ValueError(f"不支持的县市: {county_name}")
-
- try:
- self.logger.info(f"开始为{county_name}生成有效态Cd预测(v3.0自包含版本)")
-
- # 读取数据文件
- if not data_file or not os.path.exists(data_file):
- raise ValueError("需要提供有效的数据文件")
-
- df = pd.read_csv(data_file, encoding='utf-8')
- # 确保前两列为经纬度
- df.columns = ['longitude', 'latitude'] + list(df.columns[2:])
-
- # 获取边界文件
- boundary_geojson = self._get_boundary_geojson(county_name)
-
- # 在线程池中运行预测
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_effective_cd_prediction,
- df, county_name, boundary_geojson, raster_config_override
- )
-
- # 使用统一绘图接口,文件已直接生成在正确位置,无需复制
- api_result = self._format_api_result(result, county_name, "effective_cd")
-
- # 清理临时文件
- self._cleanup_after_prediction(boundary_geojson)
-
- return api_result
-
- except Exception as e:
- self.logger.error(f"为{county_name}生成有效态Cd预测失败: {str(e)}")
- raise
-
- def _run_crop_cd_prediction(self, df: pd.DataFrame, county_name: str,
- boundary_geojson: Optional[str],
- raster_config_override: Optional[Dict[str, Any]]) -> Dict[str, Any]:
- """
- 执行作物Cd预测(同步方法)
-
- @param {pd.DataFrame} df - 输入数据
- @param {str} county_name - 县市名称
- @param {Optional[str]} boundary_geojson - 边界文件路径
- @param {Optional[Dict[str, Any]]} raster_config_override - 栅格配置覆盖参数
- @returns {Dict[str, Any]} 预测结果
- """
- try:
- # 准备边界数据
- boundary_gdf = None
- if boundary_geojson:
- try:
- import geopandas as gpd
- boundary_gdf = gpd.read_file(boundary_geojson)
- self.logger.info(f"已加载边界文件: {boundary_geojson}")
- except Exception as e:
- self.logger.warning(f"加载边界文件失败,将不使用边界限制: {str(e)}")
- boundary_gdf = None
-
- # 执行预测和可视化(使用统一绘图接口,不保存栅格文件)
- result = self.engine.predict_and_visualize(
- input_data=df,
- model_type="crop_cd",
- county_name=county_name,
- boundary_gdf=boundary_gdf,
- raster_config_override=raster_config_override,
- save_raster=False # 不保存栅格文件,节省存储空间
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"作物Cd预测执行失败: {str(e)}")
- raise
-
- def _run_effective_cd_prediction(self, df: pd.DataFrame, county_name: str,
- boundary_geojson: Optional[str],
- raster_config_override: Optional[Dict[str, Any]]) -> Dict[str, Any]:
- """
- 执行有效态Cd预测(同步方法)
-
- @param {pd.DataFrame} df - 输入数据
- @param {str} county_name - 县市名称
- @param {Optional[str]} boundary_geojson - 边界文件路径
- @param {Optional[Dict[str, Any]]} raster_config_override - 栅格配置覆盖参数
- @returns {Dict[str, Any]} 预测结果
- """
- try:
- # 准备边界数据
- boundary_gdf = None
- if boundary_geojson:
- try:
- import geopandas as gpd
- boundary_gdf = gpd.read_file(boundary_geojson)
- self.logger.info(f"已加载边界文件: {boundary_geojson}")
- except Exception as e:
- self.logger.warning(f"加载边界文件失败,将不使用边界限制: {str(e)}")
- boundary_gdf = None
-
- # 执行预测和可视化(使用统一绘图接口,不保存栅格文件)
- result = self.engine.predict_and_visualize(
- input_data=df,
- model_type="effective_cd",
- county_name=county_name,
- boundary_gdf=boundary_gdf,
- raster_config_override=raster_config_override,
- save_raster=False # 不保存栅格文件,节省存储空间
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"有效态Cd预测执行失败: {str(e)}")
- raise
-
- def _format_api_result(self, result: Dict[str, Any], county_name: str,
- model_type: str) -> Dict[str, Any]:
- """
- 格式化API返回结果(使用统一绘图接口,无需复制文件)
-
- @param {Dict[str, Any]} result - 预测结果
- @param {str} county_name - 县市名称
- @param {str} model_type - 模型类型
- @returns {Dict[str, Any]} 格式化的API结果
- """
- try:
- model_prefix = f"{model_type}_{county_name}"
-
- # 直接使用引擎生成的文件路径,无需复制
- api_result = {
- 'map_path': result.get('map_path'),
- 'histogram_path': result.get('histogram_path'),
- 'raster_path': result.get('raster_path'),
- 'model_type': model_type,
- 'county_name': county_name,
- 'timestamp': result.get('timestamp'),
- 'validation': result.get('validation', {}),
- 'stats': self._get_file_stats(result.get('map_path'))
- }
-
- # 清理旧文件(保留)
- self._cleanup_old_files(model_prefix)
-
- self.logger.info(f"API结果格式化完成(统一接口): {model_type}_{county_name}")
- return api_result
-
- except Exception as e:
- self.logger.error(f"格式化API结果失败: {str(e)}")
- raise
-
- def _cleanup_after_prediction(self, boundary_geojson: Optional[str]):
- """
- 预测后清理工作
-
- @param {Optional[str]} boundary_geojson - 临时边界文件路径
- """
- try:
- # 清理预测引擎的临时文件
- self.engine.cleanup_temp_files()
-
- # 清理临时边界文件
- if boundary_geojson and os.path.exists(boundary_geojson):
- import shutil
- shutil.rmtree(os.path.dirname(boundary_geojson), ignore_errors=True)
- self.logger.info("临时边界文件已清理")
-
- except Exception as e:
- self.logger.warning(f"预测后清理失败: {str(e)}")
-
- def _cleanup_temp_files(self, county_name: str, max_files: int = 5):
- """
- 清理临时数据文件
-
- @param {str} county_name - 县市名称
- @param {int} max_files - 最大保留文件数
- """
- try:
- temp_dir = os.path.join(self.output_data_dir, "temp")
- temp_pattern = os.path.join(temp_dir, f"{county_name}_temp_data_*.csv")
-
- temp_files = glob.glob(temp_pattern)
-
- if len(temp_files) > max_files:
- temp_files.sort(key=os.path.getmtime)
- files_to_delete = temp_files[:-max_files]
-
- for file_to_delete in files_to_delete:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧临时文件: {os.path.basename(file_to_delete)}")
-
- except Exception as e:
- self.logger.warning(f"清理临时文件失败: {str(e)}")
-
- def _cleanup_old_files(self, model_prefix: str):
- """
- 清理旧的预测文件
-
- @param {str} model_prefix - 模型前缀
- """
- try:
- max_files = 10
-
- # 清理地图文件
- map_pattern = os.path.join(self.output_figures_dir, f"{model_prefix}_prediction_map_*.jpg")
- self._cleanup_files_by_pattern(map_pattern, max_files)
-
- # 清理直方图文件
- histogram_pattern = os.path.join(self.output_figures_dir, f"{model_prefix}_prediction_histogram_*.jpg")
- self._cleanup_files_by_pattern(histogram_pattern, max_files)
-
- except Exception as e:
- self.logger.warning(f"清理旧文件失败: {str(e)}")
-
- def _cleanup_files_by_pattern(self, pattern: str, max_files: int):
- """
- 按模式清理文件
-
- @param {str} pattern - 文件模式
- @param {int} max_files - 最大保留文件数
- """
- try:
- files = glob.glob(pattern)
- if len(files) > max_files:
- files.sort(key=os.path.getmtime)
- for file_to_delete in files[:-max_files]:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧文件: {os.path.basename(file_to_delete)}")
- except Exception as e:
- self.logger.warning(f"清理文件失败 {pattern}: {str(e)}")
-
- def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]:
- """
- 获取文件统计信息
-
- @param {Optional[str]} file_path - 文件路径
- @returns {Dict[str, Any]} 文件统计信息
- """
- if not file_path or not os.path.exists(file_path):
- return {}
-
- try:
- stat = os.stat(file_path)
- return {
- 'file_size': stat.st_size,
- 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
- 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat()
- }
- except Exception:
- return {}
-
- def get_engine_info(self) -> Dict[str, Any]:
- """
- 获取引擎信息
-
- @returns {Dict[str, Any]} 引擎信息
- """
- try:
- return self.engine.get_model_info()
- except Exception as e:
- self.logger.error(f"获取引擎信息失败: {str(e)}")
- return {"error": str(e)}
-
- # =============================================================================
- # 统计信息方法(简化版本,从最终数据文件中读取)
- # =============================================================================
-
- def get_crop_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取作物Cd预测结果的统计信息
-
- @param {str} county_name - 县市名称
- @returns {Optional[Dict[str, Any]]} 统计信息
- """
- try:
- # 查找最新的最终数据文件
- final_pattern = os.path.join(self.output_data_dir, "final", f"Final_predictions_crop_cd_*.csv")
- final_files = glob.glob(final_pattern)
-
- if not final_files:
- return None
-
- latest_file = max(final_files, key=os.path.getmtime)
- df = pd.read_csv(latest_file)
-
- if 'Prediction' not in df.columns:
- return None
-
- predictions = df['Prediction']
-
- basic_stats = {
- "数据点总数": len(predictions),
- "均值": float(predictions.mean()),
- "中位数": float(predictions.median()),
- "标准差": float(predictions.std()),
- "最小值": float(predictions.min()),
- "最大值": float(predictions.max()),
- "25%分位数": float(predictions.quantile(0.25)),
- "75%分位数": float(predictions.quantile(0.75))
- }
-
- return {
- "模型类型": "作物Cd模型v3.0",
- "县市名称": county_name,
- "数据更新时间": datetime.fromtimestamp(os.path.getmtime(latest_file)).isoformat(),
- "基础统计": basic_stats
- }
-
- except Exception as e:
- self.logger.error(f"获取作物Cd统计信息失败: {str(e)}")
- return None
-
- def get_effective_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取有效态Cd预测结果的统计信息
-
- @param {str} county_name - 县市名称
- @returns {Optional[Dict[str, Any]]} 统计信息
- """
- try:
- # 查找最新的最终数据文件
- final_pattern = os.path.join(self.output_data_dir, "final", f"Final_predictions_effective_cd_*.csv")
- final_files = glob.glob(final_pattern)
-
- if not final_files:
- return None
-
- latest_file = max(final_files, key=os.path.getmtime)
- df = pd.read_csv(latest_file)
-
- if 'Prediction' not in df.columns:
- return None
-
- predictions = df['Prediction']
-
- basic_stats = {
- "数据点总数": len(predictions),
- "均值": float(predictions.mean()),
- "中位数": float(predictions.median()),
- "标准差": float(predictions.std()),
- "最小值": float(predictions.min()),
- "最大值": float(predictions.max()),
- "25%分位数": float(predictions.quantile(0.25)),
- "75%分位数": float(predictions.quantile(0.75))
- }
-
- return {
- "模型类型": "有效态Cd模型v3.0",
- "县市名称": county_name,
- "数据更新时间": datetime.fromtimestamp(os.path.getmtime(latest_file)).isoformat(),
- "基础统计": basic_stats
- }
-
- except Exception as e:
- self.logger.error(f"获取有效态Cd统计信息失败: {str(e)}")
- return None
-
- def get_combined_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取综合预测统计信息
-
- @param {str} county_name - 县市名称
- @returns {Optional[Dict[str, Any]]} 综合统计信息
- """
- try:
- crop_stats = self.get_crop_cd_statistics(county_name)
- effective_stats = self.get_effective_cd_statistics(county_name)
-
- if not crop_stats and not effective_stats:
- return None
-
- return {
- "县市名称": county_name,
- "作物Cd统计": crop_stats,
- "有效态Cd统计": effective_stats,
- "生成时间": datetime.now().isoformat(),
- "版本": "v3.0自包含版本"
- }
-
- except Exception as e:
- self.logger.error(f"获取综合统计信息失败: {str(e)}")
- return None
-
- def get_all_counties_statistics(self) -> Dict[str, Any]:
- """
- 获取所有支持县市的统计概览
-
- @returns {Dict[str, Any]} 所有县市的统计概览
- """
- try:
- all_stats = {
- "支持县市总数": len(self.supported_counties),
- "统计生成时间": datetime.now().isoformat(),
- "版本": "v3.0自包含版本",
- "县市统计": {},
- "汇总信息": {
- "有作物Cd数据的县市": 0,
- "有有效态Cd数据的县市": 0,
- "数据完整的县市": 0
- }
- }
-
- for county_name in self.supported_counties.keys():
- county_stats = {
- "县市名称": county_name,
- "有作物Cd数据": False,
- "有有效态Cd数据": False,
- "数据完整": False
- }
-
- # 检查作物Cd数据
- crop_stats = self.get_crop_cd_statistics(county_name)
- if crop_stats:
- county_stats["有作物Cd数据"] = True
- all_stats["汇总信息"]["有作物Cd数据的县市"] += 1
-
- # 检查有效态Cd数据
- effective_stats = self.get_effective_cd_statistics(county_name)
- if effective_stats:
- county_stats["有有效态Cd数据"] = True
- all_stats["汇总信息"]["有有效态Cd数据的县市"] += 1
-
- # 检查数据完整性
- if county_stats["有作物Cd数据"] and county_stats["有有效态Cd数据"]:
- county_stats["数据完整"] = True
- all_stats["汇总信息"]["数据完整的县市"] += 1
-
- all_stats["县市统计"][county_name] = county_stats
-
- return all_stats
-
- except Exception as e:
- self.logger.error(f"获取所有县市统计概览失败: {str(e)}")
- return {
- "error": f"获取统计概览失败: {str(e)}",
- "支持县市总数": len(self.supported_counties),
- "统计生成时间": datetime.now().isoformat(),
- "版本": "v3.0自包含版本"
- }
|