123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515 |
- """
- Cd预测服务类
- @description: 封装Cd预测模型的业务逻辑,提供作物Cd和有效态Cd的预测功能
- @author: AcidMap Team
- @version: 1.0.0
- """
- import os
- import logging
- import asyncio
- from datetime import datetime
- from typing import Dict, Any, Optional, List, Tuple
- import glob
- import shutil
- import zipfile
- import tempfile
- import pandas as pd
- import io
- from ..config.cd_prediction_config import cd_config
- from ..utils.cd_prediction_wrapper import CdPredictionWrapper
- from ..database import SessionLocal
- from .admin_boundary_service import get_boundary_geojson_by_name
- import json
- class CdPredictionService:
- """
- Cd预测服务类
-
- @description: 提供作物Cd和有效态Cd模型的预测与可视化功能
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_crop_cd_prediction()
- """
-
- def __init__(self):
- """
- 初始化Cd预测服务
-
- @description: 设置Cd预测系统的路径和配置
- """
- # 设置日志
- self.logger = logging.getLogger(__name__)
-
- # 获取配置
- self.config = cd_config
-
- # 初始化包装器
- cd_system_path = self.config.get_cd_system_path()
- self.wrapper = CdPredictionWrapper(cd_system_path)
-
- # 输出目录
- self.output_figures_dir = self.config.get_output_dir("figures")
- self.output_raster_dir = self.config.get_output_dir("raster")
- self.output_data_dir = self.config.get_output_dir("data")
-
- # 支持的县市配置
- self.supported_counties = self._load_supported_counties()
-
- self.logger.info("Cd预测服务初始化完成")
-
- def _load_supported_counties(self) -> Dict[str, Dict]:
- """
- 加载支持的县市配置
-
- @returns {Dict[str, Dict]} 支持的县市配置信息
- """
- # 获取Cd预测系统的基础路径
- cd_system_base = self.config.get_cd_system_path()
-
- return {
- "乐昌市": {
- "boundary_file": os.path.join(cd_system_base, "output/raster/lechang.shp"),
- "template_file": os.path.join(cd_system_base, "output/raster/meanTemp.tif"),
- "coordinate_file": os.path.join(cd_system_base, "data/coordinates/坐标.csv"),
- "region_code": "440282",
- "display_name": "乐昌市",
- "province": "广东省"
- },
- # 可扩展添加更多县市
- # "韶关市": {
- # "boundary_file": os.path.join(cd_system_base, "output/raster/shaoguan.shp"),
- # "template_file": os.path.join(cd_system_base, "output/raster/shaoguan_template.tif"),
- # "coordinate_file": os.path.join(cd_system_base, "data/coordinates/韶关_坐标.csv"),
- # "region_code": "440200",
- # "display_name": "韶关市",
- # "province": "广东省"
- # }
- }
-
- def is_county_supported(self, county_name: str) -> bool:
- """
- 检查县市是否被支持
-
- @param {str} county_name - 县市名称
- @returns {bool} 是否支持该县市
- """
- return county_name in self.supported_counties
-
- def get_supported_counties(self) -> List[str]:
- """
- 获取支持的县市名称列表
-
- @returns {List[str]} 支持的县市名称列表
- """
- return list(self.supported_counties.keys())
-
- def get_supported_counties_info(self) -> List[Dict[str, Any]]:
- """
- 获取支持的县市详细信息
-
- @returns {List[Dict[str, Any]]} 支持的县市详细信息列表
- """
- counties_info = []
- for county_name, config in self.supported_counties.items():
- counties_info.append({
- "name": county_name,
- "display_name": config.get("display_name", county_name),
- "province": config.get("province", ""),
- "region_code": config.get("region_code", ""),
- "has_boundary": os.path.exists(config.get("boundary_file", "")),
- "has_template": os.path.exists(config.get("template_file", "")),
- "has_coordinates": os.path.exists(config.get("coordinate_file", ""))
- })
- return counties_info
-
- def validate_input_data(self, df: pd.DataFrame, county_name: str) -> Dict[str, Any]:
- """
- 验证输入数据格式
-
- @param {pd.DataFrame} df - 输入数据
- @param {str} county_name - 县市名称
- @returns {Dict[str, Any]} 验证结果
- """
- # 基本要求:前两列为坐标,至少需要3列数据
- if df.shape[1] < 3:
- return {
- "valid": False,
- "errors": ["数据至少需要3列:前两列为经纬度,后续列为环境因子"],
- "warnings": [],
- "data_shape": df.shape,
- "county_supported": self.is_county_supported(county_name),
- "null_summary": {}
- }
-
- # 坐标列验证(假设前两列是经纬度)
- coordinate_issues = []
- try:
- # 检查前两列是否为数值型
- if not pd.api.types.is_numeric_dtype(df.iloc[:, 0]):
- coordinate_issues.append("第一列(经度)不是数值型数据")
- elif not df.iloc[:, 0].between(70, 140).all():
- coordinate_issues.append("经度值超出合理范围(70-140度)")
-
- if not pd.api.types.is_numeric_dtype(df.iloc[:, 1]):
- coordinate_issues.append("第二列(纬度)不是数值型数据")
- elif not df.iloc[:, 1].between(15, 55).all():
- coordinate_issues.append("纬度值超出合理范围(15-55度)")
- except Exception as e:
- coordinate_issues.append(f"坐标数据验证失败: {str(e)}")
-
- # 检查数据完整性
- null_counts = df.isnull().sum()
- high_null_columns = [col for col, count in null_counts.items()
- if count > len(df) * 0.1] # 空值超过10%的列
-
- # 检查环境因子列是否为数值型
- non_numeric_columns = []
- for i in range(2, df.shape[1]): # 从第三列开始检查
- col_name = df.columns[i]
- if not pd.api.types.is_numeric_dtype(df.iloc[:, i]):
- non_numeric_columns.append(col_name)
-
- warnings = []
- if high_null_columns:
- warnings.append(f"以下列空值较多: {', '.join(high_null_columns)}")
- if coordinate_issues:
- warnings.extend(coordinate_issues)
- if non_numeric_columns:
- warnings.append(f"以下列不是数值型: {', '.join(non_numeric_columns)}")
-
- # 如果有严重的坐标问题,标记为无效
- critical_errors = []
- if any("不是数值型数据" in issue for issue in coordinate_issues):
- critical_errors.extend([issue for issue in coordinate_issues if "不是数值型数据" in issue])
-
- return {
- "valid": len(critical_errors) == 0,
- "errors": critical_errors,
- "warnings": warnings,
- "data_shape": df.shape,
- "county_supported": self.is_county_supported(county_name),
- "null_summary": null_counts.to_dict()
- }
-
- def save_uploaded_data(self, df: pd.DataFrame, county_name: str,
- description: Optional[str] = None) -> str:
- """
- 保存上传的数据文件
-
- @param {pd.DataFrame} df - 数据
- @param {str} county_name - 县市名称
- @param {Optional[str]} description - 数据描述
- @returns {str} 保存的文件路径
- """
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f"{county_name}_uploaded_data_{timestamp}.csv"
- file_path = os.path.join(self.output_data_dir, "uploaded", filename)
-
- # 确保目录存在
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
-
- # 保存数据
- df.to_csv(file_path, index=False, encoding='utf-8-sig')
-
- # 保存元信息
- meta_info = {
- "county_name": county_name,
- "description": description,
- "upload_time": datetime.now().isoformat(),
- "data_shape": df.shape,
- "columns": df.columns.tolist()
- }
-
- meta_path = file_path.replace('.csv', '_meta.json')
- import json
- with open(meta_path, 'w', encoding='utf-8') as f:
- json.dump(meta_info, f, ensure_ascii=False, indent=2)
-
- self.logger.info(f"数据文件已保存: {file_path}")
- return file_path
-
- def save_temp_data(self, df: pd.DataFrame, county_name: str) -> str:
- """
- 保存临时数据文件
-
- @param {pd.DataFrame} df - 数据
- @param {str} county_name - 县市名称
- @returns {str} 临时文件路径
- """
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f"{county_name}_temp_data_{timestamp}.csv"
- file_path = os.path.join(self.output_data_dir, "temp", filename)
-
- # 确保目录存在
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
-
- # 保存数据
- df.to_csv(file_path, index=False, encoding='utf-8-sig')
-
- # 记录详细的文件信息
- file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
- self.logger.info(f"📁 临时数据文件已保存:")
- self.logger.info(f" 路径: {file_path}")
- self.logger.info(f" 大小: {file_size:,} bytes ({file_size/1024:.1f} KB)")
- self.logger.info(f" 数据形状: {df.shape[0]} 行 × {df.shape[1]} 列")
-
- # 清理旧的临时文件,只保留最新的5个
- self._cleanup_temp_files(county_name)
-
- return file_path
-
- async def add_county_support(self, county_name: str, boundary_file,
- coordinate_file) -> Dict[str, Any]:
- """
- 添加新县市支持
-
- @param {str} county_name - 县市名称
- @param boundary_file - 边界文件(Shapefile压缩包)
- @param coordinate_file - 坐标文件
- @returns {Dict[str, Any]} 添加结果
- """
- try:
- # 创建县市专用目录
- county_dir = os.path.join(self.output_data_dir, "counties", county_name)
- os.makedirs(county_dir, exist_ok=True)
-
- # 处理边界文件
- if boundary_file.filename.endswith('.zip'):
- boundary_content = await boundary_file.read()
- boundary_zip_path = os.path.join(county_dir, "boundary.zip")
- with open(boundary_zip_path, 'wb') as f:
- f.write(boundary_content)
-
- # 解压Shapefile
- with zipfile.ZipFile(boundary_zip_path, 'r') as zip_ref:
- zip_ref.extractall(os.path.join(county_dir, "boundary"))
-
- # 处理坐标文件
- coordinate_content = await coordinate_file.read()
- coordinate_path = os.path.join(county_dir, "coordinates.csv")
- df_coords = pd.read_csv(io.StringIO(coordinate_content.decode('utf-8')))
- df_coords.to_csv(coordinate_path, index=False, encoding='utf-8-sig')
-
- # 更新支持的县市配置
- self.supported_counties[county_name] = {
- "boundary_file": os.path.join(county_dir, "boundary"),
- "coordinate_file": coordinate_path,
- "template_file": "", # 需要后续生成
- "region_code": "",
- "display_name": county_name,
- "province": ""
- }
-
- return {
- "county_name": county_name,
- "boundary_path": os.path.join(county_dir, "boundary"),
- "coordinate_path": coordinate_path,
- "status": "success"
- }
-
- except Exception as e:
- self.logger.error(f"添加县市支持失败: {str(e)}")
- raise
-
- async def generate_crop_cd_prediction_for_county(
- self,
- county_name: str,
- data_file: Optional[str] = None,
- raster_config_override: Optional[Dict[str, Any]] = None
- ) -> Dict[str, Any]:
- """
- 为指定县市生成作物Cd预测
-
- @param {str} county_name - 县市名称
- @param {Optional[str]} data_file - 可选的数据文件路径
- @returns {Dict[str, Any]} 预测结果信息
- """
- if not self.is_county_supported(county_name):
- raise ValueError(f"不支持的县市: {county_name}")
-
- try:
- # 获取县市配置
- county_config = self.supported_counties[county_name]
-
- # 如果提供了自定义数据文件,使用它替换默认数据
- if data_file:
- # 准备作物Cd模型的自定义数据
- self._prepare_crop_cd_custom_data(data_file, county_name)
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_crop_cd_prediction_with_county,
- county_name, county_config, raster_config_override
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"为{county_name}生成作物Cd预测失败: {str(e)}")
- raise
-
- async def generate_effective_cd_prediction_for_county(
- self,
- county_name: str,
- data_file: Optional[str] = None,
- raster_config_override: Optional[Dict[str, Any]] = None
- ) -> Dict[str, Any]:
- """
- 为指定县市生成有效态Cd预测
-
- @param {str} county_name - 县市名称
- @param {Optional[str]} data_file - 可选的数据文件路径
- @returns {Dict[str, Any]} 预测结果信息
- """
- if not self.is_county_supported(county_name):
- raise ValueError(f"不支持的县市: {county_name}")
-
- try:
- # 获取县市配置
- county_config = self.supported_counties[county_name]
-
- # 如果提供了自定义数据文件,使用它替换默认数据
- if data_file:
- # 准备有效态Cd模型的自定义数据
- self._prepare_effective_cd_custom_data(data_file, county_name)
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_effective_cd_prediction_with_county,
- county_name, county_config, raster_config_override
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"为{county_name}生成有效态Cd预测失败: {str(e)}")
- raise
-
- def _prepare_crop_cd_custom_data(self, data_file: str, county_name: str):
- """
- 准备作物Cd模型的自定义数据文件
-
- @param {str} data_file - 数据文件路径
- @param {str} county_name - 县市名称
- """
- try:
- import pandas as pd
-
- # 读取用户上传的CSV文件
- df = pd.read_csv(data_file, encoding='utf-8')
-
- # 获取Cd预测系统的数据目录
- cd_system_path = self.config.get_cd_system_path()
-
- # 1. 提取坐标信息并保存为独立的坐标文件
- coordinates_df = pd.DataFrame({
- 'longitude': df.iloc[:, 0], # 第一列为经度
- 'latitude': df.iloc[:, 1] # 第二列为纬度
- })
-
- # 保存坐标文件到系统数据目录
- coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
- os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
- coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
-
- # 记录坐标文件详细信息
- coord_file_size = os.path.getsize(coord_file_path) if os.path.exists(coord_file_path) else 0
- self.logger.info(f"🗺️ 坐标文件已保存:")
- self.logger.info(f" 路径: {coord_file_path}")
- self.logger.info(f" 大小: {coord_file_size:,} bytes ({coord_file_size/1024:.1f} KB)")
- self.logger.info(f" 坐标点数: {coordinates_df.shape[0]} 个")
-
- # 2. 准备作物Cd模型的训练数据
- crop_cd_data_dir = os.path.join(cd_system_path, "models", "crop_cd_model", "data")
- crop_target_file = os.path.join(crop_cd_data_dir, "areatest.csv")
-
- # 不再创建备份文件,因为此文件每次都会被用户数据完全覆盖
- if os.path.exists(crop_target_file):
- original_size = os.path.getsize(crop_target_file)
- self.logger.info(f"🔄 准备覆盖作物Cd模型数据文件:")
- self.logger.info(f" 文件路径: {crop_target_file}")
- self.logger.info(f" 原始文件大小: {original_size:,} bytes ({original_size/1024:.1f} KB)")
-
- # 清理现有的备份文件(如果存在)
- self._cleanup_backup_files(crop_target_file, max_backups=0)
-
- # 提取环境因子数据(去掉前两列的经纬度)
- environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
-
- # 保存环境因子数据到作物Cd模型目录(不包含坐标)
- environmental_data.to_csv(crop_target_file, index=False, encoding='utf-8-sig')
-
- # 记录作物Cd模型数据文件详细信息
- model_file_size = os.path.getsize(crop_target_file) if os.path.exists(crop_target_file) else 0
- self.logger.info(f"🌾 作物Cd模型数据文件已保存:")
- self.logger.info(f" 路径: {crop_target_file}")
- self.logger.info(f" 大小: {model_file_size:,} bytes ({model_file_size/1024:.1f} KB)")
- self.logger.info(f" 数据形状: {environmental_data.shape[0]} 行 × {environmental_data.shape[1]} 列")
- self.logger.info(f" 环境因子列数: {environmental_data.shape[1]}")
-
- self.logger.info(f"✅ 作物Cd模型自定义数据文件已准备完成,县市: {county_name}")
-
- except Exception as e:
- self.logger.error(f"准备作物Cd模型自定义数据文件失败: {str(e)}")
- raise
- def _prepare_effective_cd_custom_data(self, data_file: str, county_name: str):
- """
- 准备有效态Cd模型的自定义数据文件
- @param {str} data_file - 数据文件路径
- @param {str} county_name - 县市名称
- """
- try:
- import pandas as pd
- # 读取用户上传的CSV文件
- df = pd.read_csv(data_file, encoding='utf-8')
- # 获取Cd预测系统的数据目录
- cd_system_path = self.config.get_cd_system_path()
- # 1. 提取坐标信息并保存为独立的坐标文件
- coordinates_df = pd.DataFrame({
- 'longitude': df.iloc[:, 0], # 第一列为经度
- 'latitude': df.iloc[:, 1] # 第二列为纬度
- })
- # 保存坐标文件到系统数据目录
- coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
- os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
- coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
- self.logger.info(f"坐标文件已保存: {coord_file_path}")
- # 2. 准备有效态Cd模型的训练数据
- effective_cd_data_dir = os.path.join(cd_system_path, "models", "effective_cd_model", "data")
- effective_target_file = os.path.join(effective_cd_data_dir, "areatest.csv")
- # 创建备份文件路径
- backup_dir = os.path.join(cd_system_path, "backups")
- os.makedirs(backup_dir, exist_ok=True)
- backup_file = os.path.join(backup_dir, f"areatest_backup_{datetime.now().strftime('%Y%m%d')}.csv")
- # 如果目标文件存在,创建备份
- if os.path.exists(effective_target_file):
- # 创建备份
- shutil.copy2(effective_target_file, backup_file)
- self.logger.info(f"已创建备份文件: {backup_file}")
- original_size = os.path.getsize(effective_target_file)
- self.logger.info(f"🔄 准备覆盖有效态Cd模型数据文件:")
- self.logger.info(f" 文件路径: {effective_target_file}")
- self.logger.info(f" 原始文件大小: {original_size:,} bytes ({original_size / 1024:.1f} KB)")
- # 清理现有的备份文件(如果存在)
- self._cleanup_backup_files(effective_target_file, max_backups=0)
- # 检查用户数据是否包含足够的环境因子列
- user_env_columns = df.shape[1] - 2 # 减去经纬度列
- if user_env_columns >= 21:
- # 用户数据包含足够的环境因子列,使用用户数据
- # 提取环境因子数据(去掉前两列的经纬度)
- user_environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
- # 如果用户数据列数超过21列,只取前21列
- if user_environmental_data.shape[1] > 21:
- user_environmental_data = user_environmental_data.iloc[:, :21]
- self.logger.info(f"用户数据有{df.shape[1] - 2}列环境因子,取前21列用于有效态Cd模型")
- # 保存用户的环境因子数据到有效态Cd模型目录
- user_environmental_data.to_csv(effective_target_file, index=False, encoding='utf-8-sig')
- self.logger.info(f"有效态Cd模型使用用户数据,形状: {user_environmental_data.shape}")
- else:
- # 用户数据环境因子列数不足,恢复使用备份数据
- if os.path.exists(backup_file):
- shutil.copy2(backup_file, effective_target_file)
- self.logger.info(f"用户数据环境因子列数不足({user_env_columns} < 21),有效态Cd模型恢复使用备份数据")
- else:
- # 如果没有备份文件,使用原始数据但记录警告
- self.logger.warning(
- f"用户数据环境因子列数不足({user_env_columns} < 21)且无备份文件,继续使用当前数据")
- self.logger.info(f"有效态Cd模型自定义数据文件已准备完成,县市: {county_name}")
- except Exception as e:
- self.logger.error(f"准备有效态Cd模型自定义数据文件失败: {str(e)}")
- raise
-
- def _run_crop_cd_prediction_with_county(self, county_name: str,
- county_config: Dict[str, Any],
- raster_config_override: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
- """
- 执行指定县市的作物Cd预测
-
- @param {str} county_name - 县市名称
- @param {Dict[str, Any]} county_config - 县市配置
- @returns {Dict[str, Any]} 预测结果信息
- """
- try:
- # 用数据库边界覆盖环境变量给集成系统
- tmp_geojson = None
- try:
- db = SessionLocal()
- feature = get_boundary_geojson_by_name(db, county_name, level="auto")
- fc = {"type": "FeatureCollection", "features": [feature]}
- tmp_dir = tempfile.mkdtemp()
- tmp_geojson = os.path.join(tmp_dir, "boundary.geojson")
- with open(tmp_geojson, 'w', encoding='utf-8') as f:
- json.dump(fc, f, ensure_ascii=False)
- os.environ['CD_BOUNDARY_FILE'] = tmp_geojson
- except Exception as _e:
- self.logger.warning(f"从数据库获取边界失败,回退到默认配置: {str(_e)}")
- finally:
- try:
- db.close()
- except Exception:
- pass
- # 运行作物Cd预测
- self.logger.info(f"为{county_name}执行作物Cd预测")
- prediction_result = self.wrapper.run_prediction_script("crop", raster_config_override)
-
- # 获取输出文件(指定作物Cd模型类型)
- latest_outputs = self.wrapper.get_latest_outputs("all", "crop")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- model_type = f"crop_cd_{county_name}"
- copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
-
- # 清理旧文件
- self._cleanup_old_files(model_type)
-
- result_obj = {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': model_type,
- 'county_name': county_name,
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
- # 清理临时边界
- try:
- if tmp_geojson and os.path.exists(tmp_geojson):
- import shutil
- shutil.rmtree(os.path.dirname(tmp_geojson), ignore_errors=True)
- if 'CD_BOUNDARY_FILE' in os.environ:
- del os.environ['CD_BOUNDARY_FILE']
- except Exception:
- pass
- return result_obj
-
- except Exception as e:
- self.logger.error(f"为{county_name}执行作物Cd预测失败: {str(e)}")
- raise
-
- def _run_effective_cd_prediction_with_county(self, county_name: str,
- county_config: Dict[str, Any],
- raster_config_override: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
- """
- 执行指定县市的有效态Cd预测
-
- @param {str} county_name - 县市名称
- @param {Dict[str, Any]} county_config - 县市配置
- @returns {Dict[str, Any]} 预测结果信息
- """
- try:
- # 用数据库边界覆盖环境变量给集成系统
- tmp_geojson = None
- try:
- db = SessionLocal()
- feature = get_boundary_geojson_by_name(db, county_name, level="auto")
- fc = {"type": "FeatureCollection", "features": [feature]}
- tmp_dir = tempfile.mkdtemp()
- tmp_geojson = os.path.join(tmp_dir, "boundary.geojson")
- with open(tmp_geojson, 'w', encoding='utf-8') as f:
- json.dump(fc, f, ensure_ascii=False)
- os.environ['CD_BOUNDARY_FILE'] = tmp_geojson
- except Exception as _e:
- self.logger.warning(f"从数据库获取边界失败,回退到默认配置: {str(_e)}")
- finally:
- try:
- db.close()
- except Exception:
- pass
- # 运行有效态Cd预测
- self.logger.info(f"为{county_name}执行有效态Cd预测")
- prediction_result = self.wrapper.run_prediction_script("effective", raster_config_override)
-
- # 获取输出文件(指定有效态Cd模型类型)
- latest_outputs = self.wrapper.get_latest_outputs("all", "effective")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- model_type = f"effective_cd_{county_name}"
- copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
-
- # 清理旧文件
- self._cleanup_old_files(model_type)
-
- result_obj = {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': model_type,
- 'county_name': county_name,
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
- # 清理临时边界
- try:
- if tmp_geojson and os.path.exists(tmp_geojson):
- import shutil
- shutil.rmtree(os.path.dirname(tmp_geojson), ignore_errors=True)
- if 'CD_BOUNDARY_FILE' in os.environ:
- del os.environ['CD_BOUNDARY_FILE']
- except Exception:
- pass
- return result_obj
-
- except Exception as e:
- self.logger.error(f"为{county_name}执行有效态Cd预测失败: {str(e)}")
- raise
-
- async def generate_crop_cd_prediction(self) -> Dict[str, Any]:
- """
- 生成作物Cd预测结果和可视化
-
- @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
- @throws {Exception} 当预测过程发生错误时抛出
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_crop_cd_prediction()
- >>> print(result['map_path'])
- """
- try:
- self.logger.info("开始作物Cd模型预测流程")
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_crop_cd_prediction
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"作物Cd预测流程失败: {str(e)}")
- raise
-
- async def generate_effective_cd_prediction(self) -> Dict[str, Any]:
- """
- 生成有效态Cd预测结果和可视化
-
- @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
- @throws {Exception} 当预测过程发生错误时抛出
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_effective_cd_prediction()
- >>> print(result['map_path'])
- """
- try:
- self.logger.info("开始有效态Cd模型预测流程")
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_effective_cd_prediction
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"有效态Cd预测流程失败: {str(e)}")
- raise
-
- def _run_crop_cd_prediction(self) -> Dict[str, Any]:
- """
- 执行作物Cd预测的同步逻辑
-
- @returns {Dict[str, Any]} 预测结果信息
- @throws {Exception} 当预测过程发生错误时抛出
- """
- try:
- # 运行作物Cd预测
- self.logger.info("执行作物Cd预测")
- prediction_result = self.wrapper.run_prediction_script("crop")
-
- # 获取输出文件(指定作物Cd模型类型)
- latest_outputs = self.wrapper.get_latest_outputs("all", "crop")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- copied_files = self._copy_output_files(latest_outputs, "crop_cd", timestamp)
-
- # 清理旧文件
- self._cleanup_old_files("crop_cd")
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': 'crop_cd',
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"作物Cd预测执行失败: {str(e)}")
- raise
-
- def _run_effective_cd_prediction(self) -> Dict[str, Any]:
- """
- 执行有效态Cd预测的同步逻辑
-
- @returns {Dict[str, Any]} 预测结果信息
- @throws {Exception} 当预测过程发生错误时抛出
- """
- try:
- # 运行有效态Cd预测
- self.logger.info("执行有效态Cd预测")
- prediction_result = self.wrapper.run_prediction_script("effective")
-
- # 获取输出文件(指定有效态Cd模型类型)
- latest_outputs = self.wrapper.get_latest_outputs("all", "effective")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- copied_files = self._copy_output_files(latest_outputs, "effective_cd", timestamp)
-
- # 清理旧文件
- self._cleanup_old_files("effective_cd")
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': 'effective_cd',
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"有效态Cd预测执行失败: {str(e)}")
- raise
-
- def _copy_output_files(self, latest_outputs: Dict[str, Optional[str]],
- model_type: str, timestamp: str) -> Dict[str, Optional[str]]:
- """
- 复制输出文件到API目录
-
- @param {Dict[str, Optional[str]]} latest_outputs - 最新输出文件路径
- @param {str} model_type - 模型类型
- @param {str} timestamp - 时间戳
- @returns {Dict[str, Optional[str]]} 复制后的文件路径
- """
- copied_files = {}
-
- try:
- self.logger.info(f"🔄 开始复制输出文件 (模型类型: {model_type})")
- total_size = 0
-
- # 复制地图文件
- if latest_outputs.get('latest_map'):
- src_map = latest_outputs['latest_map']
- dst_map = os.path.join(
- self.output_figures_dir,
- f"{model_type}_prediction_map_{timestamp}.jpg"
- )
- shutil.copy2(src_map, dst_map)
- copied_files['map_path'] = dst_map
-
- # 记录详细信息
- src_size = os.path.getsize(src_map) if os.path.exists(src_map) else 0
- dst_size = os.path.getsize(dst_map) if os.path.exists(dst_map) else 0
- total_size += dst_size
- self.logger.info(f"🗺️ 地图文件已复制:")
- self.logger.info(f" 源文件: {src_map}")
- self.logger.info(f" 目标文件: {dst_map}")
- self.logger.info(f" 文件大小: {dst_size:,} bytes ({dst_size/1024:.1f} KB)")
-
- # 激进清理:立即删除源文件
- try:
- os.remove(src_map)
- self.logger.info(f"🗑️ 已删除源地图文件: {os.path.basename(src_map)}")
- except Exception as e:
- self.logger.warning(f"删除源地图文件失败: {str(e)}")
-
- # 复制直方图文件
- if latest_outputs.get('latest_histogram'):
- src_histogram = latest_outputs['latest_histogram']
- dst_histogram = os.path.join(
- self.output_figures_dir,
- f"{model_type}_prediction_histogram_{timestamp}.jpg"
- )
- shutil.copy2(src_histogram, dst_histogram)
- copied_files['histogram_path'] = dst_histogram
-
- # 记录详细信息
- src_size = os.path.getsize(src_histogram) if os.path.exists(src_histogram) else 0
- dst_size = os.path.getsize(dst_histogram) if os.path.exists(dst_histogram) else 0
- total_size += dst_size
- self.logger.info(f"📊 直方图文件已复制:")
- self.logger.info(f" 源文件: {src_histogram}")
- self.logger.info(f" 目标文件: {dst_histogram}")
- self.logger.info(f" 文件大小: {dst_size:,} bytes ({dst_size/1024:.1f} KB)")
-
- # 激进清理:立即删除源文件
- try:
- os.remove(src_histogram)
- self.logger.info(f"🗑️ 已删除源直方图文件: {os.path.basename(src_histogram)}")
- except Exception as e:
- self.logger.warning(f"删除源直方图文件失败: {str(e)}")
-
- # 处理栅格文件(不复制,直接删除源文件以节省空间)
- if latest_outputs.get('latest_raster'):
- src_raster = latest_outputs['latest_raster']
-
- # 记录栅格文件信息但不复制
- src_size = os.path.getsize(src_raster) if os.path.exists(src_raster) else 0
- self.logger.info(f"🌐 栅格文件处理:")
- self.logger.info(f" 源文件: {src_raster}")
- self.logger.info(f" 文件大小: {src_size:,} bytes ({src_size/1024:.1f} KB)")
- self.logger.info(f" 处理方式: 跳过复制,直接删除(栅格文件为中间文件)")
-
- # 激进清理:直接删除源文件,不进行复制
- try:
- os.remove(src_raster)
- self.logger.info(f"🗑️ 已删除中间栅格文件: {os.path.basename(src_raster)}")
- self.logger.info(f" 节省空间: {src_size:,} bytes ({src_size/1024:.1f} KB)")
- except Exception as e:
- self.logger.warning(f"删除源栅格文件失败: {str(e)}")
-
- # 不设置raster_path,因为不需要保留栅格文件
- copied_files['raster_path'] = None
-
- # 记录总体信息
- self.logger.info(f"✅ 文件复制完成,总大小: {total_size:,} bytes ({total_size/1024/1024:.1f} MB)")
-
- # 清理Cd预测系统原始输出文件
- self._cleanup_cd_system_outputs()
-
- # 清理中间数据文件
- self._cleanup_intermediate_data_files()
-
- except Exception as e:
- self.logger.error(f"复制输出文件失败: {str(e)}")
-
- return copied_files
-
- def _cleanup_old_files(self, model_type: str):
- """
- 清理旧的预测文件
-
- @param {str} model_type - 模型类型
- """
- try:
- max_files = self.config.get_api_config().get("max_prediction_files", 10)
-
- # 清理地图文件
- map_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_map_*.jpg")
- self._cleanup_files_by_pattern(map_pattern, max_files)
-
- # 清理直方图文件
- histogram_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_histogram_*.jpg")
- self._cleanup_files_by_pattern(histogram_pattern, max_files)
-
- # 不再需要清理API栅格文件,因为我们不再保留栅格文件
- # raster_pattern = os.path.join(self.output_raster_dir, f"{model_type}_prediction_raster_*.tif")
- # self._cleanup_files_by_pattern(raster_pattern, max_files)
- self.logger.info(f"跳过栅格文件清理(栅格文件已不再保留)")
-
- except Exception as e:
- self.logger.warning(f"清理旧文件失败: {str(e)}")
-
- def _cleanup_files_by_pattern(self, pattern: str, max_files: int):
- """
- 按模式清理文件
-
- @param {str} pattern - 文件模式
- @param {int} max_files - 最大保留文件数
- """
- try:
- files = glob.glob(pattern)
- if len(files) > max_files:
- # 按修改时间排序,删除最旧的文件
- files.sort(key=os.path.getmtime)
- for file_to_delete in files[:-max_files]:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧文件: {file_to_delete}")
- except Exception as e:
- self.logger.warning(f"清理文件失败 {pattern}: {str(e)}")
-
- def _cleanup_temp_files(self, county_name: str, max_files: int = 5):
- """
- 清理临时数据文件,只保留指定县市的最新文件
-
- @param {str} county_name - 县市名称
- @param {int} max_files - 最大保留文件数,默认5个
- """
- try:
- temp_dir = os.path.join(self.output_data_dir, "temp")
- temp_pattern = os.path.join(temp_dir, f"{county_name}_temp_data_*.csv")
-
- # 获取该县市的所有临时文件
- temp_files = glob.glob(temp_pattern)
-
- if len(temp_files) > max_files:
- # 按修改时间排序,删除最旧的文件
- temp_files.sort(key=os.path.getmtime)
- files_to_delete = temp_files[:-max_files]
-
- for file_to_delete in files_to_delete:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧临时文件: {os.path.basename(file_to_delete)}")
-
- self.logger.info(f"已清理{county_name}的临时文件,保留最新{max_files}个文件")
-
- except Exception as e:
- self.logger.warning(f"清理{county_name}临时文件失败: {str(e)}")
-
- def _cleanup_backup_files(self, target_file: str, max_backups: int = 3):
- """
- 清理备份文件,只保留最新的备份
-
- @param {str} target_file - 目标文件路径
- @param {int} max_backups - 最大保留备份数,默认3个
- """
- try:
- # 构建备份文件匹配模式
- backup_pattern = f"{target_file}.backup_*"
- backup_files = glob.glob(backup_pattern)
-
- if len(backup_files) > max_backups:
- # 按修改时间排序,删除最旧的备份文件
- backup_files.sort(key=os.path.getmtime)
- files_to_delete = backup_files[:-max_backups]
-
- for file_to_delete in files_to_delete:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧备份文件: {os.path.basename(file_to_delete)}")
-
- self.logger.info(f"已清理备份文件,保留最新{max_backups}个备份")
-
- except Exception as e:
- self.logger.warning(f"清理备份文件失败: {str(e)}")
-
- def _delete_all_files_by_pattern(self, pattern: str) -> Tuple[int, int]:
- """
- 删除所有匹配模式的文件(激进清理模式)
-
- @param {str} pattern - 文件匹配模式
- @returns {Tuple[int, int]} 返回删除的文件数量和释放的字节数
- """
- deleted_count = 0
- total_size_freed = 0
-
- try:
- files = glob.glob(pattern)
-
- for file_path in files:
- try:
- # 获取文件大小
- file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
-
- # 删除文件
- os.remove(file_path)
-
- deleted_count += 1
- total_size_freed += file_size
-
- self.logger.info(f"🗑️ 已删除中间文件: {os.path.basename(file_path)} ({file_size:,} bytes)")
-
- except Exception as e:
- self.logger.warning(f"删除文件失败 {file_path}: {str(e)}")
-
- if deleted_count > 0:
- self.logger.info(f"🧹 模式 '{os.path.basename(pattern)}' 清理完成: {deleted_count} 个文件, {total_size_freed:,} bytes")
-
- except Exception as e:
- self.logger.warning(f"按模式删除文件失败 {pattern}: {str(e)}")
-
- return deleted_count, total_size_freed
-
- def _cleanup_cd_system_outputs(self, aggressive_mode: bool = True):
- """
- 清理Cd预测系统原始输出文件(激进模式:删除所有中间文件)
-
- @param {bool} aggressive_mode - 激进模式,默认True(删除所有中间文件)
- """
- try:
- cd_system_path = self.config.get_cd_system_path()
- output_figures_dir = os.path.join(cd_system_path, "output", "figures")
- output_raster_dir = os.path.join(cd_system_path, "output", "raster")
-
- total_deleted = 0
- total_size_freed = 0
-
- # 清理figures目录下的所有预测输出文件
- if os.path.exists(output_figures_dir):
- # 立即删除所有地图文件
- map_patterns = [
- "Prediction_results_*.jpg",
- "Prediction_results_*.png"
- ]
- for pattern in map_patterns:
- full_pattern = os.path.join(output_figures_dir, pattern)
- deleted, size_freed = self._delete_all_files_by_pattern(full_pattern)
- total_deleted += deleted
- total_size_freed += size_freed
-
- # 立即删除所有直方图文件
- histogram_patterns = [
- "Prediction_frequency_*.jpg",
- "Prediction_frequency_*.png"
- ]
- for pattern in histogram_patterns:
- full_pattern = os.path.join(output_figures_dir, pattern)
- deleted, size_freed = self._delete_all_files_by_pattern(full_pattern)
- total_deleted += deleted
- total_size_freed += size_freed
-
- # 清理raster目录下的所有文件
- if os.path.exists(output_raster_dir):
- # 立即删除所有栅格相关文件
- raster_patterns = [
- "output_*.tif",
- "points_*.shp",
- "points_*.dbf",
- "points_*.prj",
- "points_*.shx",
- "points_*.cpg"
- ]
- for pattern in raster_patterns:
- full_pattern = os.path.join(output_raster_dir, pattern)
- deleted, size_freed = self._delete_all_files_by_pattern(full_pattern)
- total_deleted += deleted
- total_size_freed += size_freed
-
- self.logger.info(f"🧹 激进清理完成:")
- self.logger.info(f" 删除文件数: {total_deleted} 个")
- self.logger.info(f" 释放空间: {total_size_freed:,} bytes ({total_size_freed/1024/1024:.2f} MB)")
-
- except Exception as e:
- self.logger.warning(f"激进清理Cd预测系统输出文件失败: {str(e)}")
-
- def _cleanup_intermediate_data_files(self):
- """
- 清理中间数据文件
- 包括:坐标.csv 和 combined_pH.csv, pHcombined.csv
- """
- try:
- cd_system_path = self.config.get_cd_system_path()
- deleted_count = 0
- total_size_freed = 0
-
- # 要清理的中间数据文件
- intermediate_files = [
- os.path.join(cd_system_path, "data", "coordinates", "坐标.csv"),
- os.path.join(cd_system_path, "data", "predictions", "combined_pH.csv"),
- os.path.join(cd_system_path, "data", "predictions", "pHcombined.csv")
- ]
-
- self.logger.info("🧹 开始清理中间数据文件...")
-
- for file_path in intermediate_files:
- if os.path.exists(file_path):
- try:
- file_size = os.path.getsize(file_path)
- os.remove(file_path)
-
- deleted_count += 1
- total_size_freed += file_size
-
- self.logger.info(f"🗑️ 已删除中间数据文件: {os.path.basename(file_path)}")
- self.logger.info(f" 文件路径: {file_path}")
- self.logger.info(f" 文件大小: {file_size:,} bytes ({file_size/1024:.1f} KB)")
-
- except Exception as e:
- self.logger.warning(f"删除中间数据文件失败 {file_path}: {str(e)}")
- else:
- self.logger.debug(f"中间数据文件不存在,跳过: {os.path.basename(file_path)}")
-
- if deleted_count > 0:
- self.logger.info(f"✅ 中间数据文件清理完成:")
- self.logger.info(f" 删除文件数: {deleted_count} 个")
- self.logger.info(f" 释放空间: {total_size_freed:,} bytes ({total_size_freed/1024:.1f} KB)")
- else:
- self.logger.info("ℹ️ 没有找到需要清理的中间数据文件")
-
- except Exception as e:
- self.logger.warning(f"清理中间数据文件失败: {str(e)}")
-
- def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]:
- """
- 获取文件统计信息
-
- @param {Optional[str]} file_path - 文件路径
- @returns {Dict[str, Any]} 文件统计信息
- """
- if not file_path or not os.path.exists(file_path):
- return {}
-
- try:
- stat = os.stat(file_path)
- return {
- 'file_size': stat.st_size,
- 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
- 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat()
- }
- except Exception:
- return {}
-
- def get_latest_crop_cd_map(self) -> Optional[str]:
- """
- 获取最新的作物Cd预测地图文件路径
-
- @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_map_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_effective_cd_map(self) -> Optional[str]:
- """
- 获取最新的有效态Cd预测地图文件路径
-
- @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_map_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_crop_cd_histogram(self) -> Optional[str]:
- """
- 获取最新的作物Cd预测直方图文件路径
-
- @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_histogram_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_effective_cd_histogram(self) -> Optional[str]:
- """
- 获取最新的有效态Cd预测直方图文件路径
-
- @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_histogram_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- # =============================================================================
- # 统计信息方法
- # =============================================================================
-
- def get_crop_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取作物Cd预测结果的统计信息
-
- @param {str} county_name - 县市名称
- @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
- """
- try:
- # 查找最新的预测结果文件
- cd_system_path = self.config.get_cd_system_path()
- final_data_path = os.path.join(cd_system_path, "data", "final", "Final_predictions_crop_cd.csv")
-
- if not os.path.exists(final_data_path):
- self.logger.warning(f"未找到作物Cd预测结果文件: {final_data_path}")
- return None
-
- # 读取预测数据
- df = pd.read_csv(final_data_path)
-
- if 'Prediction' not in df.columns:
- self.logger.warning("预测结果文件中缺少'Prediction'列")
- return None
-
- predictions = df['Prediction']
-
- # 计算基础统计信息
- basic_stats = {
- "数据点总数": len(predictions),
- "均值": float(predictions.mean()),
- "中位数": float(predictions.median()),
- "标准差": float(predictions.std()),
- "最小值": float(predictions.min()),
- "最大值": float(predictions.max()),
- "25%分位数": float(predictions.quantile(0.25)),
- "75%分位数": float(predictions.quantile(0.75)),
- "偏度": float(predictions.skew()),
- "峰度": float(predictions.kurtosis())
- }
-
- # 计算分布直方图数据
- histogram_data = self._calculate_histogram_data(predictions)
-
- # 计算空间统计信息(如果有坐标信息)
- spatial_stats = None
- if 'longitude' in df.columns and 'latitude' in df.columns:
- spatial_stats = self._calculate_spatial_statistics(df)
-
- return {
- "模型类型": "作物Cd模型",
- "县市名称": county_name,
- "数据更新时间": datetime.fromtimestamp(os.path.getmtime(final_data_path)).isoformat(),
- "基础统计": basic_stats,
- "分布直方图": histogram_data,
- "空间统计": spatial_stats
- }
-
- except Exception as e:
- self.logger.error(f"获取作物Cd统计信息失败: {str(e)}")
- return None
-
- def get_effective_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取有效态Cd预测结果的统计信息
-
- @param {str} county_name - 县市名称
- @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
- """
- try:
- # 查找最新的预测结果文件
- cd_system_path = self.config.get_cd_system_path()
- final_data_path = os.path.join(cd_system_path, "data", "final", "Final_predictions_effective_cd.csv")
-
- if not os.path.exists(final_data_path):
- self.logger.warning(f"未找到有效态Cd预测结果文件: {final_data_path}")
- return None
-
- # 读取预测数据
- df = pd.read_csv(final_data_path)
-
- if 'Prediction' not in df.columns:
- self.logger.warning("预测结果文件中缺少'Prediction'列")
- return None
-
- predictions = df['Prediction']
-
- # 计算基础统计信息
- basic_stats = {
- "数据点总数": len(predictions),
- "均值": float(predictions.mean()),
- "中位数": float(predictions.median()),
- "标准差": float(predictions.std()),
- "最小值": float(predictions.min()),
- "最大值": float(predictions.max()),
- "25%分位数": float(predictions.quantile(0.25)),
- "75%分位数": float(predictions.quantile(0.75)),
- "偏度": float(predictions.skew()),
- "峰度": float(predictions.kurtosis())
- }
-
- # 计算分布直方图数据
- histogram_data = self._calculate_histogram_data(predictions)
-
- # 计算空间统计信息(如果有坐标信息)
- spatial_stats = None
- if 'longitude' in df.columns and 'latitude' in df.columns:
- spatial_stats = self._calculate_spatial_statistics(df)
-
- return {
- "模型类型": "有效态Cd模型",
- "县市名称": county_name,
- "数据更新时间": datetime.fromtimestamp(os.path.getmtime(final_data_path)).isoformat(),
- "基础统计": basic_stats,
- "分布直方图": histogram_data,
- "空间统计": spatial_stats
- }
-
- except Exception as e:
- self.logger.error(f"获取有效态Cd统计信息失败: {str(e)}")
- return None
-
- def get_combined_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取综合预测统计信息
-
- @param {str} county_name - 县市名称
- @returns {Optional[Dict[str, Any]]} 综合统计信息,如果没有数据则返回None
- """
- try:
- crop_stats = self.get_crop_cd_statistics(county_name)
- effective_stats = self.get_effective_cd_statistics(county_name)
-
- if not crop_stats and not effective_stats:
- return None
-
- return {
- "县市名称": county_name,
- "作物Cd统计": crop_stats,
- "有效态Cd统计": effective_stats,
- "生成时间": datetime.now().isoformat()
- }
-
- except Exception as e:
- self.logger.error(f"获取综合统计信息失败: {str(e)}")
- return None
-
-
- def get_all_counties_statistics(self) -> Dict[str, Any]:
- """
- 获取所有支持县市的统计概览
-
- @returns {Dict[str, Any]} 所有县市的统计概览
- """
- try:
- all_stats = {
- "支持县市总数": len(self.supported_counties),
- "统计生成时间": datetime.now().isoformat(),
- "县市统计": {},
- "汇总信息": {
- "有作物Cd数据的县市": 0,
- "有有效态Cd数据的县市": 0,
- "数据完整的县市": 0
- }
- }
-
- for county_name in self.supported_counties.keys():
- county_stats = {
- "县市名称": county_name,
- "有作物Cd数据": False,
- "有有效态Cd数据": False,
- "数据完整": False,
- "最新更新时间": None
- }
-
- # 检查作物Cd数据
- crop_stats = self.get_crop_cd_statistics(county_name)
- if crop_stats:
- county_stats["有作物Cd数据"] = True
- county_stats["作物Cd概要"] = {
- "数据点数": crop_stats["基础统计"]["数据点总数"],
- "均值": crop_stats["基础统计"]["均值"],
- "最大值": crop_stats["基础统计"]["最大值"]
- }
- all_stats["汇总信息"]["有作物Cd数据的县市"] += 1
-
- # 检查有效态Cd数据
- effective_stats = self.get_effective_cd_statistics(county_name)
- if effective_stats:
- county_stats["有有效态Cd数据"] = True
- county_stats["有效态Cd概要"] = {
- "数据点数": effective_stats["基础统计"]["数据点总数"],
- "均值": effective_stats["基础统计"]["均值"],
- "最大值": effective_stats["基础统计"]["最大值"]
- }
- all_stats["汇总信息"]["有有效态Cd数据的县市"] += 1
-
- # 检查数据完整性
- if county_stats["有作物Cd数据"] and county_stats["有有效态Cd数据"]:
- county_stats["数据完整"] = True
- all_stats["汇总信息"]["数据完整的县市"] += 1
-
- all_stats["县市统计"][county_name] = county_stats
-
- return all_stats
-
- except Exception as e:
- self.logger.error(f"获取所有县市统计概览失败: {str(e)}")
- return {
- "error": f"获取统计概览失败: {str(e)}",
- "支持县市总数": len(self.supported_counties),
- "统计生成时间": datetime.now().isoformat()
- }
-
- # =============================================================================
- # 辅助统计方法
- # =============================================================================
-
-
- def _calculate_histogram_data(self, predictions: pd.Series, bins: int = 20) -> Dict[str, Any]:
- """
- 计算分布直方图数据
-
- @param {pd.Series} predictions - 预测值
- @param {int} bins - 直方图区间数
- @returns {Dict[str, Any]} 直方图数据
- """
- try:
- import numpy as np
-
- hist, bin_edges = np.histogram(predictions, bins=bins)
-
- # 计算区间中心点
- bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
-
- return {
- "区间数": int(bins),
- "频次": [int(count) for count in hist],
- "区间中心": [float(center) for center in bin_centers],
- "区间边界": [float(edge) for edge in bin_edges],
- "总频次": int(hist.sum())
- }
-
- except Exception as e:
- self.logger.error(f"计算直方图数据失败: {str(e)}")
- return {}
-
- def _calculate_spatial_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
- """
- 计算空间统计信息
-
- @param {pd.DataFrame} df - 包含坐标和预测值的数据框
- @returns {Dict[str, Any]} 空间统计信息
- """
- try:
- spatial_stats = {
- "经度范围": {
- "最小值": float(df['longitude'].min()),
- "最大值": float(df['longitude'].max()),
- "跨度": float(df['longitude'].max() - df['longitude'].min())
- },
- "纬度范围": {
- "最小值": float(df['latitude'].min()),
- "最大值": float(df['latitude'].max()),
- "跨度": float(df['latitude'].max() - df['latitude'].min())
- }
- }
-
- return spatial_stats
-
- except Exception as e:
- self.logger.error(f"计算空间统计信息失败: {str(e)}")
- return {}
-
-
-
|