12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439 |
- """
- Cd预测服务类
- @description: 封装Cd预测模型的业务逻辑,提供作物Cd和有效态Cd的预测功能
- @author: AcidMap Team
- @version: 1.0.0
- """
- import os
- import logging
- import asyncio
- from datetime import datetime
- from typing import Dict, Any, Optional, List, Tuple
- import glob
- import shutil
- import zipfile
- import tempfile
- import pandas as pd
- import io
- from ..config.cd_prediction_config import cd_config
- from ..utils.cd_prediction_wrapper import CdPredictionWrapper
- class CdPredictionService:
- """
- Cd预测服务类
-
- @description: 提供作物Cd和有效态Cd模型的预测与可视化功能
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_crop_cd_prediction()
- """
-
- def __init__(self):
- """
- 初始化Cd预测服务
-
- @description: 设置Cd预测系统的路径和配置
- """
- # 设置日志
- self.logger = logging.getLogger(__name__)
-
- # 获取配置
- self.config = cd_config
-
- # 初始化包装器
- cd_system_path = self.config.get_cd_system_path()
- self.wrapper = CdPredictionWrapper(cd_system_path)
-
- # 输出目录
- self.output_figures_dir = self.config.get_output_dir("figures")
- self.output_raster_dir = self.config.get_output_dir("raster")
- self.output_data_dir = self.config.get_output_dir("data")
-
- # 支持的县市配置
- self.supported_counties = self._load_supported_counties()
-
- self.logger.info("Cd预测服务初始化完成")
-
- def _load_supported_counties(self) -> Dict[str, Dict]:
- """
- 加载支持的县市配置
-
- @returns {Dict[str, Dict]} 支持的县市配置信息
- """
- # 获取Cd预测系统的基础路径
- cd_system_base = self.config.get_cd_system_path()
-
- return {
- "乐昌市": {
- "boundary_file": os.path.join(cd_system_base, "output/raster/lechang.shp"),
- "template_file": os.path.join(cd_system_base, "output/raster/meanTemp.tif"),
- "coordinate_file": os.path.join(cd_system_base, "data/coordinates/坐标.csv"),
- "region_code": "440282",
- "display_name": "乐昌市",
- "province": "广东省"
- },
- # 可扩展添加更多县市
- # "韶关市": {
- # "boundary_file": os.path.join(cd_system_base, "output/raster/shaoguan.shp"),
- # "template_file": os.path.join(cd_system_base, "output/raster/shaoguan_template.tif"),
- # "coordinate_file": os.path.join(cd_system_base, "data/coordinates/韶关_坐标.csv"),
- # "region_code": "440200",
- # "display_name": "韶关市",
- # "province": "广东省"
- # }
- }
-
- def is_county_supported(self, county_name: str) -> bool:
- """
- 检查县市是否被支持
-
- @param {str} county_name - 县市名称
- @returns {bool} 是否支持该县市
- """
- return county_name in self.supported_counties
-
- def get_supported_counties(self) -> List[str]:
- """
- 获取支持的县市名称列表
-
- @returns {List[str]} 支持的县市名称列表
- """
- return list(self.supported_counties.keys())
-
- def get_supported_counties_info(self) -> List[Dict[str, Any]]:
- """
- 获取支持的县市详细信息
-
- @returns {List[Dict[str, Any]]} 支持的县市详细信息列表
- """
- counties_info = []
- for county_name, config in self.supported_counties.items():
- counties_info.append({
- "name": county_name,
- "display_name": config.get("display_name", county_name),
- "province": config.get("province", ""),
- "region_code": config.get("region_code", ""),
- "has_boundary": os.path.exists(config.get("boundary_file", "")),
- "has_template": os.path.exists(config.get("template_file", "")),
- "has_coordinates": os.path.exists(config.get("coordinate_file", ""))
- })
- return counties_info
-
- def validate_input_data(self, df: pd.DataFrame, county_name: str) -> Dict[str, Any]:
- """
- 验证输入数据格式
-
- @param {pd.DataFrame} df - 输入数据
- @param {str} county_name - 县市名称
- @returns {Dict[str, Any]} 验证结果
- """
- # 基本要求:前两列为坐标,至少需要3列数据
- if df.shape[1] < 3:
- return {
- "valid": False,
- "errors": ["数据至少需要3列:前两列为经纬度,后续列为环境因子"],
- "warnings": [],
- "data_shape": df.shape,
- "county_supported": self.is_county_supported(county_name),
- "null_summary": {}
- }
-
- # 坐标列验证(假设前两列是经纬度)
- coordinate_issues = []
- try:
- # 检查前两列是否为数值型
- if not pd.api.types.is_numeric_dtype(df.iloc[:, 0]):
- coordinate_issues.append("第一列(经度)不是数值型数据")
- elif not df.iloc[:, 0].between(70, 140).all():
- coordinate_issues.append("经度值超出合理范围(70-140度)")
-
- if not pd.api.types.is_numeric_dtype(df.iloc[:, 1]):
- coordinate_issues.append("第二列(纬度)不是数值型数据")
- elif not df.iloc[:, 1].between(15, 55).all():
- coordinate_issues.append("纬度值超出合理范围(15-55度)")
- except Exception as e:
- coordinate_issues.append(f"坐标数据验证失败: {str(e)}")
-
- # 检查数据完整性
- null_counts = df.isnull().sum()
- high_null_columns = [col for col, count in null_counts.items()
- if count > len(df) * 0.1] # 空值超过10%的列
-
- # 检查环境因子列是否为数值型
- non_numeric_columns = []
- for i in range(2, df.shape[1]): # 从第三列开始检查
- col_name = df.columns[i]
- if not pd.api.types.is_numeric_dtype(df.iloc[:, i]):
- non_numeric_columns.append(col_name)
-
- warnings = []
- if high_null_columns:
- warnings.append(f"以下列空值较多: {', '.join(high_null_columns)}")
- if coordinate_issues:
- warnings.extend(coordinate_issues)
- if non_numeric_columns:
- warnings.append(f"以下列不是数值型: {', '.join(non_numeric_columns)}")
-
- # 如果有严重的坐标问题,标记为无效
- critical_errors = []
- if any("不是数值型数据" in issue for issue in coordinate_issues):
- critical_errors.extend([issue for issue in coordinate_issues if "不是数值型数据" in issue])
-
- return {
- "valid": len(critical_errors) == 0,
- "errors": critical_errors,
- "warnings": warnings,
- "data_shape": df.shape,
- "county_supported": self.is_county_supported(county_name),
- "null_summary": null_counts.to_dict()
- }
-
- def save_uploaded_data(self, df: pd.DataFrame, county_name: str,
- description: Optional[str] = None) -> str:
- """
- 保存上传的数据文件
-
- @param {pd.DataFrame} df - 数据
- @param {str} county_name - 县市名称
- @param {Optional[str]} description - 数据描述
- @returns {str} 保存的文件路径
- """
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f"{county_name}_uploaded_data_{timestamp}.csv"
- file_path = os.path.join(self.output_data_dir, "uploaded", filename)
-
- # 确保目录存在
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
-
- # 保存数据
- df.to_csv(file_path, index=False, encoding='utf-8-sig')
-
- # 保存元信息
- meta_info = {
- "county_name": county_name,
- "description": description,
- "upload_time": datetime.now().isoformat(),
- "data_shape": df.shape,
- "columns": df.columns.tolist()
- }
-
- meta_path = file_path.replace('.csv', '_meta.json')
- import json
- with open(meta_path, 'w', encoding='utf-8') as f:
- json.dump(meta_info, f, ensure_ascii=False, indent=2)
-
- self.logger.info(f"数据文件已保存: {file_path}")
- return file_path
-
- def save_temp_data(self, df: pd.DataFrame, county_name: str) -> str:
- """
- 保存临时数据文件
-
- @param {pd.DataFrame} df - 数据
- @param {str} county_name - 县市名称
- @returns {str} 临时文件路径
- """
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f"{county_name}_temp_data_{timestamp}.csv"
- file_path = os.path.join(self.output_data_dir, "temp", filename)
-
- # 确保目录存在
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
-
- # 保存数据
- df.to_csv(file_path, index=False, encoding='utf-8-sig')
-
- # 记录详细的文件信息
- file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
- self.logger.info(f"📁 临时数据文件已保存:")
- self.logger.info(f" 路径: {file_path}")
- self.logger.info(f" 大小: {file_size:,} bytes ({file_size/1024:.1f} KB)")
- self.logger.info(f" 数据形状: {df.shape[0]} 行 × {df.shape[1]} 列")
-
- # 清理旧的临时文件,只保留最新的5个
- self._cleanup_temp_files(county_name)
-
- return file_path
-
- async def add_county_support(self, county_name: str, boundary_file,
- coordinate_file) -> Dict[str, Any]:
- """
- 添加新县市支持
-
- @param {str} county_name - 县市名称
- @param boundary_file - 边界文件(Shapefile压缩包)
- @param coordinate_file - 坐标文件
- @returns {Dict[str, Any]} 添加结果
- """
- try:
- # 创建县市专用目录
- county_dir = os.path.join(self.output_data_dir, "counties", county_name)
- os.makedirs(county_dir, exist_ok=True)
-
- # 处理边界文件
- if boundary_file.filename.endswith('.zip'):
- boundary_content = await boundary_file.read()
- boundary_zip_path = os.path.join(county_dir, "boundary.zip")
- with open(boundary_zip_path, 'wb') as f:
- f.write(boundary_content)
-
- # 解压Shapefile
- with zipfile.ZipFile(boundary_zip_path, 'r') as zip_ref:
- zip_ref.extractall(os.path.join(county_dir, "boundary"))
-
- # 处理坐标文件
- coordinate_content = await coordinate_file.read()
- coordinate_path = os.path.join(county_dir, "coordinates.csv")
- df_coords = pd.read_csv(io.StringIO(coordinate_content.decode('utf-8')))
- df_coords.to_csv(coordinate_path, index=False, encoding='utf-8-sig')
-
- # 更新支持的县市配置
- self.supported_counties[county_name] = {
- "boundary_file": os.path.join(county_dir, "boundary"),
- "coordinate_file": coordinate_path,
- "template_file": "", # 需要后续生成
- "region_code": "",
- "display_name": county_name,
- "province": ""
- }
-
- return {
- "county_name": county_name,
- "boundary_path": os.path.join(county_dir, "boundary"),
- "coordinate_path": coordinate_path,
- "status": "success"
- }
-
- except Exception as e:
- self.logger.error(f"添加县市支持失败: {str(e)}")
- raise
-
- async def generate_crop_cd_prediction_for_county(
- self,
- county_name: str,
- data_file: Optional[str] = None
- ) -> Dict[str, Any]:
- """
- 为指定县市生成作物Cd预测
-
- @param {str} county_name - 县市名称
- @param {Optional[str]} data_file - 可选的数据文件路径
- @returns {Dict[str, Any]} 预测结果信息
- """
- if not self.is_county_supported(county_name):
- raise ValueError(f"不支持的县市: {county_name}")
-
- try:
- # 获取县市配置
- county_config = self.supported_counties[county_name]
-
- # 如果提供了自定义数据文件,使用它替换默认数据
- if data_file:
- # 准备作物Cd模型的自定义数据
- self._prepare_crop_cd_custom_data(data_file, county_name)
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_crop_cd_prediction_with_county,
- county_name, county_config
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"为{county_name}生成作物Cd预测失败: {str(e)}")
- raise
-
- async def generate_effective_cd_prediction_for_county(
- self,
- county_name: str,
- data_file: Optional[str] = None
- ) -> Dict[str, Any]:
- """
- 为指定县市生成有效态Cd预测
-
- @param {str} county_name - 县市名称
- @param {Optional[str]} data_file - 可选的数据文件路径
- @returns {Dict[str, Any]} 预测结果信息
- """
- if not self.is_county_supported(county_name):
- raise ValueError(f"不支持的县市: {county_name}")
-
- try:
- # 获取县市配置
- county_config = self.supported_counties[county_name]
-
- # 如果提供了自定义数据文件,使用它替换默认数据
- if data_file:
- # 准备有效态Cd模型的自定义数据
- self._prepare_effective_cd_custom_data(data_file, county_name)
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_effective_cd_prediction_with_county,
- county_name, county_config
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"为{county_name}生成有效态Cd预测失败: {str(e)}")
- raise
-
- def _prepare_crop_cd_custom_data(self, data_file: str, county_name: str):
- """
- 准备作物Cd模型的自定义数据文件
-
- @param {str} data_file - 数据文件路径
- @param {str} county_name - 县市名称
- """
- try:
- import pandas as pd
-
- # 读取用户上传的CSV文件
- df = pd.read_csv(data_file, encoding='utf-8')
-
- # 获取Cd预测系统的数据目录
- cd_system_path = self.config.get_cd_system_path()
-
- # 1. 提取坐标信息并保存为独立的坐标文件
- coordinates_df = pd.DataFrame({
- 'longitude': df.iloc[:, 0], # 第一列为经度
- 'latitude': df.iloc[:, 1] # 第二列为纬度
- })
-
- # 保存坐标文件到系统数据目录
- coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
- os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
- coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
-
- # 记录坐标文件详细信息
- coord_file_size = os.path.getsize(coord_file_path) if os.path.exists(coord_file_path) else 0
- self.logger.info(f"🗺️ 坐标文件已保存:")
- self.logger.info(f" 路径: {coord_file_path}")
- self.logger.info(f" 大小: {coord_file_size:,} bytes ({coord_file_size/1024:.1f} KB)")
- self.logger.info(f" 坐标点数: {coordinates_df.shape[0]} 个")
-
- # 2. 准备作物Cd模型的训练数据
- crop_cd_data_dir = os.path.join(cd_system_path, "models", "crop_cd_model", "data")
- crop_target_file = os.path.join(crop_cd_data_dir, "areatest.csv")
-
- # 不再创建备份文件,因为此文件每次都会被用户数据完全覆盖
- if os.path.exists(crop_target_file):
- original_size = os.path.getsize(crop_target_file)
- self.logger.info(f"🔄 准备覆盖作物Cd模型数据文件:")
- self.logger.info(f" 文件路径: {crop_target_file}")
- self.logger.info(f" 原始文件大小: {original_size:,} bytes ({original_size/1024:.1f} KB)")
-
- # 清理现有的备份文件(如果存在)
- self._cleanup_backup_files(crop_target_file, max_backups=0)
-
- # 提取环境因子数据(去掉前两列的经纬度)
- environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
-
- # 保存环境因子数据到作物Cd模型目录(不包含坐标)
- environmental_data.to_csv(crop_target_file, index=False, encoding='utf-8-sig')
-
- # 记录作物Cd模型数据文件详细信息
- model_file_size = os.path.getsize(crop_target_file) if os.path.exists(crop_target_file) else 0
- self.logger.info(f"🌾 作物Cd模型数据文件已保存:")
- self.logger.info(f" 路径: {crop_target_file}")
- self.logger.info(f" 大小: {model_file_size:,} bytes ({model_file_size/1024:.1f} KB)")
- self.logger.info(f" 数据形状: {environmental_data.shape[0]} 行 × {environmental_data.shape[1]} 列")
- self.logger.info(f" 环境因子列数: {environmental_data.shape[1]}")
-
- self.logger.info(f"✅ 作物Cd模型自定义数据文件已准备完成,县市: {county_name}")
-
- except Exception as e:
- self.logger.error(f"准备作物Cd模型自定义数据文件失败: {str(e)}")
- raise
-
- def _prepare_effective_cd_custom_data(self, data_file: str, county_name: str):
- """
- 准备有效态Cd模型的自定义数据文件
-
- @param {str} data_file - 数据文件路径
- @param {str} county_name - 县市名称
- """
- try:
- import pandas as pd
-
- # 读取用户上传的CSV文件
- df = pd.read_csv(data_file, encoding='utf-8')
-
- # 获取Cd预测系统的数据目录
- cd_system_path = self.config.get_cd_system_path()
-
- # 1. 提取坐标信息并保存为独立的坐标文件
- coordinates_df = pd.DataFrame({
- 'longitude': df.iloc[:, 0], # 第一列为经度
- 'latitude': df.iloc[:, 1] # 第二列为纬度
- })
-
- # 保存坐标文件到系统数据目录
- coord_file_path = os.path.join(cd_system_path, "data", "coordinates", "坐标.csv")
- os.makedirs(os.path.dirname(coord_file_path), exist_ok=True)
- coordinates_df.to_csv(coord_file_path, index=False, encoding='utf-8-sig')
- self.logger.info(f"坐标文件已保存: {coord_file_path}")
-
- # 2. 准备有效态Cd模型的训练数据
- effective_cd_data_dir = os.path.join(cd_system_path, "models", "effective_cd_model", "data")
- effective_target_file = os.path.join(effective_cd_data_dir, "areatest.csv")
-
- # 不再创建备份文件,因为此文件每次都会被用户数据完全覆盖
- if os.path.exists(effective_target_file):
- original_size = os.path.getsize(effective_target_file)
- self.logger.info(f"🔄 准备覆盖有效态Cd模型数据文件:")
- self.logger.info(f" 文件路径: {effective_target_file}")
- self.logger.info(f" 原始文件大小: {original_size:,} bytes ({original_size/1024:.1f} KB)")
-
- # 清理现有的备份文件(如果存在)
- self._cleanup_backup_files(effective_target_file, max_backups=0)
-
- # 检查用户数据是否包含足够的环境因子列
- user_env_columns = df.shape[1] - 2 # 减去经纬度列
-
- if user_env_columns >= 21:
- # 用户数据包含足够的环境因子列,使用用户数据
- # 提取环境因子数据(去掉前两列的经纬度)
- user_environmental_data = df.iloc[:, 2:].copy() # 从第3列开始的所有列
-
- # 如果用户数据列数超过21列,只取前21列
- if user_environmental_data.shape[1] > 21:
- user_environmental_data = user_environmental_data.iloc[:, :21]
- self.logger.info(f"用户数据有{df.shape[1] - 2}列环境因子,取前21列用于有效态Cd模型")
-
- # 保存用户的环境因子数据到有效态Cd模型目录
- user_environmental_data.to_csv(effective_target_file, index=False, encoding='utf-8-sig')
- self.logger.info(f"有效态Cd模型使用用户数据,形状: {user_environmental_data.shape}")
- else:
- # 用户数据环境因子列数不足,恢复使用原始数据
- if os.path.exists(backup_file):
- shutil.copy2(backup_file, effective_target_file)
- self.logger.info(f"用户数据环境因子列数不足({user_env_columns} < 21),有效态Cd模型恢复使用原始21列数据")
- else:
- self.logger.warning(f"用户数据环境因子列数不足且未找到备份文件,继续使用当前数据")
-
- self.logger.info(f"有效态Cd模型自定义数据文件已准备完成,县市: {county_name}")
-
- except Exception as e:
- self.logger.error(f"准备有效态Cd模型自定义数据文件失败: {str(e)}")
- raise
-
- def _run_crop_cd_prediction_with_county(self, county_name: str,
- county_config: Dict[str, Any]) -> Dict[str, Any]:
- """
- 执行指定县市的作物Cd预测
-
- @param {str} county_name - 县市名称
- @param {Dict[str, Any]} county_config - 县市配置
- @returns {Dict[str, Any]} 预测结果信息
- """
- try:
- # 运行作物Cd预测
- self.logger.info(f"为{county_name}执行作物Cd预测")
- prediction_result = self.wrapper.run_prediction_script("crop")
-
- # 获取输出文件(指定作物Cd模型类型)
- latest_outputs = self.wrapper.get_latest_outputs("all", "crop")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- model_type = f"crop_cd_{county_name}"
- copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
-
- # 清理旧文件
- self._cleanup_old_files(model_type)
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': model_type,
- 'county_name': county_name,
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"为{county_name}执行作物Cd预测失败: {str(e)}")
- raise
-
- def _run_effective_cd_prediction_with_county(self, county_name: str,
- county_config: Dict[str, Any]) -> Dict[str, Any]:
- """
- 执行指定县市的有效态Cd预测
-
- @param {str} county_name - 县市名称
- @param {Dict[str, Any]} county_config - 县市配置
- @returns {Dict[str, Any]} 预测结果信息
- """
- try:
- # 运行有效态Cd预测
- self.logger.info(f"为{county_name}执行有效态Cd预测")
- prediction_result = self.wrapper.run_prediction_script("effective")
-
- # 获取输出文件(指定有效态Cd模型类型)
- latest_outputs = self.wrapper.get_latest_outputs("all", "effective")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- model_type = f"effective_cd_{county_name}"
- copied_files = self._copy_output_files(latest_outputs, model_type, timestamp)
-
- # 清理旧文件
- self._cleanup_old_files(model_type)
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': model_type,
- 'county_name': county_name,
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"为{county_name}执行有效态Cd预测失败: {str(e)}")
- raise
-
- async def generate_crop_cd_prediction(self) -> Dict[str, Any]:
- """
- 生成作物Cd预测结果和可视化
-
- @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
- @throws {Exception} 当预测过程发生错误时抛出
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_crop_cd_prediction()
- >>> print(result['map_path'])
- """
- try:
- self.logger.info("开始作物Cd模型预测流程")
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_crop_cd_prediction
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"作物Cd预测流程失败: {str(e)}")
- raise
-
- async def generate_effective_cd_prediction(self) -> Dict[str, Any]:
- """
- 生成有效态Cd预测结果和可视化
-
- @returns {Dict[str, Any]} 包含地图文件路径、直方图路径等信息
- @throws {Exception} 当预测过程发生错误时抛出
- @example
- >>> service = CdPredictionService()
- >>> result = await service.generate_effective_cd_prediction()
- >>> print(result['map_path'])
- """
- try:
- self.logger.info("开始有效态Cd模型预测流程")
-
- # 在线程池中运行CPU密集型任务
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- None,
- self._run_effective_cd_prediction
- )
-
- return result
-
- except Exception as e:
- self.logger.error(f"有效态Cd预测流程失败: {str(e)}")
- raise
-
- def _run_crop_cd_prediction(self) -> Dict[str, Any]:
- """
- 执行作物Cd预测的同步逻辑
-
- @returns {Dict[str, Any]} 预测结果信息
- @throws {Exception} 当预测过程发生错误时抛出
- """
- try:
- # 运行作物Cd预测
- self.logger.info("执行作物Cd预测")
- prediction_result = self.wrapper.run_prediction_script("crop")
-
- # 获取输出文件(指定作物Cd模型类型)
- latest_outputs = self.wrapper.get_latest_outputs("all", "crop")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- copied_files = self._copy_output_files(latest_outputs, "crop_cd", timestamp)
-
- # 清理旧文件
- self._cleanup_old_files("crop_cd")
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': 'crop_cd',
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"作物Cd预测执行失败: {str(e)}")
- raise
-
- def _run_effective_cd_prediction(self) -> Dict[str, Any]:
- """
- 执行有效态Cd预测的同步逻辑
-
- @returns {Dict[str, Any]} 预测结果信息
- @throws {Exception} 当预测过程发生错误时抛出
- """
- try:
- # 运行有效态Cd预测
- self.logger.info("执行有效态Cd预测")
- prediction_result = self.wrapper.run_prediction_script("effective")
-
- # 获取输出文件(指定有效态Cd模型类型)
- latest_outputs = self.wrapper.get_latest_outputs("all", "effective")
-
- # 复制文件到API输出目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- copied_files = self._copy_output_files(latest_outputs, "effective_cd", timestamp)
-
- # 清理旧文件
- self._cleanup_old_files("effective_cd")
-
- return {
- 'map_path': copied_files.get('map_path'),
- 'histogram_path': copied_files.get('histogram_path'),
- 'raster_path': copied_files.get('raster_path'),
- 'model_type': 'effective_cd',
- 'timestamp': timestamp,
- 'stats': self._get_file_stats(copied_files.get('map_path'))
- }
-
- except Exception as e:
- self.logger.error(f"有效态Cd预测执行失败: {str(e)}")
- raise
-
- def _copy_output_files(self, latest_outputs: Dict[str, Optional[str]],
- model_type: str, timestamp: str) -> Dict[str, Optional[str]]:
- """
- 复制输出文件到API目录
-
- @param {Dict[str, Optional[str]]} latest_outputs - 最新输出文件路径
- @param {str} model_type - 模型类型
- @param {str} timestamp - 时间戳
- @returns {Dict[str, Optional[str]]} 复制后的文件路径
- """
- copied_files = {}
-
- try:
- self.logger.info(f"🔄 开始复制输出文件 (模型类型: {model_type})")
- total_size = 0
-
- # 复制地图文件
- if latest_outputs.get('latest_map'):
- src_map = latest_outputs['latest_map']
- dst_map = os.path.join(
- self.output_figures_dir,
- f"{model_type}_prediction_map_{timestamp}.jpg"
- )
- shutil.copy2(src_map, dst_map)
- copied_files['map_path'] = dst_map
-
- # 记录详细信息
- src_size = os.path.getsize(src_map) if os.path.exists(src_map) else 0
- dst_size = os.path.getsize(dst_map) if os.path.exists(dst_map) else 0
- total_size += dst_size
- self.logger.info(f"🗺️ 地图文件已复制:")
- self.logger.info(f" 源文件: {src_map}")
- self.logger.info(f" 目标文件: {dst_map}")
- self.logger.info(f" 文件大小: {dst_size:,} bytes ({dst_size/1024:.1f} KB)")
-
- # 激进清理:立即删除源文件
- try:
- os.remove(src_map)
- self.logger.info(f"🗑️ 已删除源地图文件: {os.path.basename(src_map)}")
- except Exception as e:
- self.logger.warning(f"删除源地图文件失败: {str(e)}")
-
- # 复制直方图文件
- if latest_outputs.get('latest_histogram'):
- src_histogram = latest_outputs['latest_histogram']
- dst_histogram = os.path.join(
- self.output_figures_dir,
- f"{model_type}_prediction_histogram_{timestamp}.jpg"
- )
- shutil.copy2(src_histogram, dst_histogram)
- copied_files['histogram_path'] = dst_histogram
-
- # 记录详细信息
- src_size = os.path.getsize(src_histogram) if os.path.exists(src_histogram) else 0
- dst_size = os.path.getsize(dst_histogram) if os.path.exists(dst_histogram) else 0
- total_size += dst_size
- self.logger.info(f"📊 直方图文件已复制:")
- self.logger.info(f" 源文件: {src_histogram}")
- self.logger.info(f" 目标文件: {dst_histogram}")
- self.logger.info(f" 文件大小: {dst_size:,} bytes ({dst_size/1024:.1f} KB)")
-
- # 激进清理:立即删除源文件
- try:
- os.remove(src_histogram)
- self.logger.info(f"🗑️ 已删除源直方图文件: {os.path.basename(src_histogram)}")
- except Exception as e:
- self.logger.warning(f"删除源直方图文件失败: {str(e)}")
-
- # 处理栅格文件(不复制,直接删除源文件以节省空间)
- if latest_outputs.get('latest_raster'):
- src_raster = latest_outputs['latest_raster']
-
- # 记录栅格文件信息但不复制
- src_size = os.path.getsize(src_raster) if os.path.exists(src_raster) else 0
- self.logger.info(f"🌐 栅格文件处理:")
- self.logger.info(f" 源文件: {src_raster}")
- self.logger.info(f" 文件大小: {src_size:,} bytes ({src_size/1024:.1f} KB)")
- self.logger.info(f" 处理方式: 跳过复制,直接删除(栅格文件为中间文件)")
-
- # 激进清理:直接删除源文件,不进行复制
- try:
- os.remove(src_raster)
- self.logger.info(f"🗑️ 已删除中间栅格文件: {os.path.basename(src_raster)}")
- self.logger.info(f" 节省空间: {src_size:,} bytes ({src_size/1024:.1f} KB)")
- except Exception as e:
- self.logger.warning(f"删除源栅格文件失败: {str(e)}")
-
- # 不设置raster_path,因为不需要保留栅格文件
- copied_files['raster_path'] = None
-
- # 记录总体信息
- self.logger.info(f"✅ 文件复制完成,总大小: {total_size:,} bytes ({total_size/1024/1024:.1f} MB)")
-
- # 清理Cd预测系统原始输出文件
- self._cleanup_cd_system_outputs()
-
- # 清理中间数据文件
- self._cleanup_intermediate_data_files()
-
- except Exception as e:
- self.logger.error(f"复制输出文件失败: {str(e)}")
-
- return copied_files
-
- def _cleanup_old_files(self, model_type: str):
- """
- 清理旧的预测文件
-
- @param {str} model_type - 模型类型
- """
- try:
- max_files = self.config.get_api_config().get("max_prediction_files", 10)
-
- # 清理地图文件
- map_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_map_*.jpg")
- self._cleanup_files_by_pattern(map_pattern, max_files)
-
- # 清理直方图文件
- histogram_pattern = os.path.join(self.output_figures_dir, f"{model_type}_prediction_histogram_*.jpg")
- self._cleanup_files_by_pattern(histogram_pattern, max_files)
-
- # 不再需要清理API栅格文件,因为我们不再保留栅格文件
- # raster_pattern = os.path.join(self.output_raster_dir, f"{model_type}_prediction_raster_*.tif")
- # self._cleanup_files_by_pattern(raster_pattern, max_files)
- self.logger.info(f"跳过栅格文件清理(栅格文件已不再保留)")
-
- except Exception as e:
- self.logger.warning(f"清理旧文件失败: {str(e)}")
-
- def _cleanup_files_by_pattern(self, pattern: str, max_files: int):
- """
- 按模式清理文件
-
- @param {str} pattern - 文件模式
- @param {int} max_files - 最大保留文件数
- """
- try:
- files = glob.glob(pattern)
- if len(files) > max_files:
- # 按修改时间排序,删除最旧的文件
- files.sort(key=os.path.getmtime)
- for file_to_delete in files[:-max_files]:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧文件: {file_to_delete}")
- except Exception as e:
- self.logger.warning(f"清理文件失败 {pattern}: {str(e)}")
-
- def _cleanup_temp_files(self, county_name: str, max_files: int = 5):
- """
- 清理临时数据文件,只保留指定县市的最新文件
-
- @param {str} county_name - 县市名称
- @param {int} max_files - 最大保留文件数,默认5个
- """
- try:
- temp_dir = os.path.join(self.output_data_dir, "temp")
- temp_pattern = os.path.join(temp_dir, f"{county_name}_temp_data_*.csv")
-
- # 获取该县市的所有临时文件
- temp_files = glob.glob(temp_pattern)
-
- if len(temp_files) > max_files:
- # 按修改时间排序,删除最旧的文件
- temp_files.sort(key=os.path.getmtime)
- files_to_delete = temp_files[:-max_files]
-
- for file_to_delete in files_to_delete:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧临时文件: {os.path.basename(file_to_delete)}")
-
- self.logger.info(f"已清理{county_name}的临时文件,保留最新{max_files}个文件")
-
- except Exception as e:
- self.logger.warning(f"清理{county_name}临时文件失败: {str(e)}")
-
- def _cleanup_backup_files(self, target_file: str, max_backups: int = 3):
- """
- 清理备份文件,只保留最新的备份
-
- @param {str} target_file - 目标文件路径
- @param {int} max_backups - 最大保留备份数,默认3个
- """
- try:
- # 构建备份文件匹配模式
- backup_pattern = f"{target_file}.backup_*"
- backup_files = glob.glob(backup_pattern)
-
- if len(backup_files) > max_backups:
- # 按修改时间排序,删除最旧的备份文件
- backup_files.sort(key=os.path.getmtime)
- files_to_delete = backup_files[:-max_backups]
-
- for file_to_delete in files_to_delete:
- os.remove(file_to_delete)
- self.logger.info(f"已删除旧备份文件: {os.path.basename(file_to_delete)}")
-
- self.logger.info(f"已清理备份文件,保留最新{max_backups}个备份")
-
- except Exception as e:
- self.logger.warning(f"清理备份文件失败: {str(e)}")
-
- def _delete_all_files_by_pattern(self, pattern: str) -> Tuple[int, int]:
- """
- 删除所有匹配模式的文件(激进清理模式)
-
- @param {str} pattern - 文件匹配模式
- @returns {Tuple[int, int]} 返回删除的文件数量和释放的字节数
- """
- deleted_count = 0
- total_size_freed = 0
-
- try:
- files = glob.glob(pattern)
-
- for file_path in files:
- try:
- # 获取文件大小
- file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
-
- # 删除文件
- os.remove(file_path)
-
- deleted_count += 1
- total_size_freed += file_size
-
- self.logger.info(f"🗑️ 已删除中间文件: {os.path.basename(file_path)} ({file_size:,} bytes)")
-
- except Exception as e:
- self.logger.warning(f"删除文件失败 {file_path}: {str(e)}")
-
- if deleted_count > 0:
- self.logger.info(f"🧹 模式 '{os.path.basename(pattern)}' 清理完成: {deleted_count} 个文件, {total_size_freed:,} bytes")
-
- except Exception as e:
- self.logger.warning(f"按模式删除文件失败 {pattern}: {str(e)}")
-
- return deleted_count, total_size_freed
-
- def _cleanup_cd_system_outputs(self, aggressive_mode: bool = True):
- """
- 清理Cd预测系统原始输出文件(激进模式:删除所有中间文件)
-
- @param {bool} aggressive_mode - 激进模式,默认True(删除所有中间文件)
- """
- try:
- cd_system_path = self.config.get_cd_system_path()
- output_figures_dir = os.path.join(cd_system_path, "output", "figures")
- output_raster_dir = os.path.join(cd_system_path, "output", "raster")
-
- total_deleted = 0
- total_size_freed = 0
-
- # 清理figures目录下的所有预测输出文件
- if os.path.exists(output_figures_dir):
- # 立即删除所有地图文件
- map_patterns = [
- "Prediction_results_*.jpg",
- "Prediction_results_*.png"
- ]
- for pattern in map_patterns:
- full_pattern = os.path.join(output_figures_dir, pattern)
- deleted, size_freed = self._delete_all_files_by_pattern(full_pattern)
- total_deleted += deleted
- total_size_freed += size_freed
-
- # 立即删除所有直方图文件
- histogram_patterns = [
- "Prediction_frequency_*.jpg",
- "Prediction_frequency_*.png"
- ]
- for pattern in histogram_patterns:
- full_pattern = os.path.join(output_figures_dir, pattern)
- deleted, size_freed = self._delete_all_files_by_pattern(full_pattern)
- total_deleted += deleted
- total_size_freed += size_freed
-
- # 清理raster目录下的所有文件
- if os.path.exists(output_raster_dir):
- # 立即删除所有栅格相关文件
- raster_patterns = [
- "output_*.tif",
- "points_*.shp",
- "points_*.dbf",
- "points_*.prj",
- "points_*.shx",
- "points_*.cpg"
- ]
- for pattern in raster_patterns:
- full_pattern = os.path.join(output_raster_dir, pattern)
- deleted, size_freed = self._delete_all_files_by_pattern(full_pattern)
- total_deleted += deleted
- total_size_freed += size_freed
-
- self.logger.info(f"🧹 激进清理完成:")
- self.logger.info(f" 删除文件数: {total_deleted} 个")
- self.logger.info(f" 释放空间: {total_size_freed:,} bytes ({total_size_freed/1024/1024:.2f} MB)")
-
- except Exception as e:
- self.logger.warning(f"激进清理Cd预测系统输出文件失败: {str(e)}")
-
- def _cleanup_intermediate_data_files(self):
- """
- 清理中间数据文件
- 包括:坐标.csv 和 combined_pH.csv, pHcombined.csv
- """
- try:
- cd_system_path = self.config.get_cd_system_path()
- deleted_count = 0
- total_size_freed = 0
-
- # 要清理的中间数据文件
- intermediate_files = [
- os.path.join(cd_system_path, "data", "coordinates", "坐标.csv"),
- os.path.join(cd_system_path, "data", "predictions", "combined_pH.csv"),
- os.path.join(cd_system_path, "data", "predictions", "pHcombined.csv")
- ]
-
- self.logger.info("🧹 开始清理中间数据文件...")
-
- for file_path in intermediate_files:
- if os.path.exists(file_path):
- try:
- file_size = os.path.getsize(file_path)
- os.remove(file_path)
-
- deleted_count += 1
- total_size_freed += file_size
-
- self.logger.info(f"🗑️ 已删除中间数据文件: {os.path.basename(file_path)}")
- self.logger.info(f" 文件路径: {file_path}")
- self.logger.info(f" 文件大小: {file_size:,} bytes ({file_size/1024:.1f} KB)")
-
- except Exception as e:
- self.logger.warning(f"删除中间数据文件失败 {file_path}: {str(e)}")
- else:
- self.logger.debug(f"中间数据文件不存在,跳过: {os.path.basename(file_path)}")
-
- if deleted_count > 0:
- self.logger.info(f"✅ 中间数据文件清理完成:")
- self.logger.info(f" 删除文件数: {deleted_count} 个")
- self.logger.info(f" 释放空间: {total_size_freed:,} bytes ({total_size_freed/1024:.1f} KB)")
- else:
- self.logger.info("ℹ️ 没有找到需要清理的中间数据文件")
-
- except Exception as e:
- self.logger.warning(f"清理中间数据文件失败: {str(e)}")
-
- def _get_file_stats(self, file_path: Optional[str]) -> Dict[str, Any]:
- """
- 获取文件统计信息
-
- @param {Optional[str]} file_path - 文件路径
- @returns {Dict[str, Any]} 文件统计信息
- """
- if not file_path or not os.path.exists(file_path):
- return {}
-
- try:
- stat = os.stat(file_path)
- return {
- 'file_size': stat.st_size,
- 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
- 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat()
- }
- except Exception:
- return {}
-
- def get_latest_crop_cd_map(self) -> Optional[str]:
- """
- 获取最新的作物Cd预测地图文件路径
-
- @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_map_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_effective_cd_map(self) -> Optional[str]:
- """
- 获取最新的有效态Cd预测地图文件路径
-
- @returns {Optional[str]} 最新地图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_map_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_crop_cd_histogram(self) -> Optional[str]:
- """
- 获取最新的作物Cd预测直方图文件路径
-
- @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "crop_cd_prediction_histogram_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- def get_latest_effective_cd_histogram(self) -> Optional[str]:
- """
- 获取最新的有效态Cd预测直方图文件路径
-
- @returns {Optional[str]} 最新直方图文件路径,如果不存在则返回None
- """
- try:
- pattern = os.path.join(self.output_figures_dir, "effective_cd_prediction_histogram_*.jpg")
- files = glob.glob(pattern)
- if files:
- return max(files, key=os.path.getctime)
- return None
- except Exception:
- return None
-
- # =============================================================================
- # 统计信息方法
- # =============================================================================
-
- def get_crop_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取作物Cd预测结果的统计信息
-
- @param {str} county_name - 县市名称
- @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
- """
- try:
- # 查找最新的预测结果文件
- cd_system_path = self.config.get_cd_system_path()
- final_data_path = os.path.join(cd_system_path, "data", "final", "Final_predictions_crop_cd.csv")
-
- if not os.path.exists(final_data_path):
- self.logger.warning(f"未找到作物Cd预测结果文件: {final_data_path}")
- return None
-
- # 读取预测数据
- df = pd.read_csv(final_data_path)
-
- if 'Prediction' not in df.columns:
- self.logger.warning("预测结果文件中缺少'Prediction'列")
- return None
-
- predictions = df['Prediction']
-
- # 计算基础统计信息
- basic_stats = {
- "数据点总数": len(predictions),
- "均值": float(predictions.mean()),
- "中位数": float(predictions.median()),
- "标准差": float(predictions.std()),
- "最小值": float(predictions.min()),
- "最大值": float(predictions.max()),
- "25%分位数": float(predictions.quantile(0.25)),
- "75%分位数": float(predictions.quantile(0.75)),
- "偏度": float(predictions.skew()),
- "峰度": float(predictions.kurtosis())
- }
-
- # 计算分布直方图数据
- histogram_data = self._calculate_histogram_data(predictions)
-
- # 计算空间统计信息(如果有坐标信息)
- spatial_stats = None
- if 'longitude' in df.columns and 'latitude' in df.columns:
- spatial_stats = self._calculate_spatial_statistics(df)
-
- return {
- "模型类型": "作物Cd模型",
- "县市名称": county_name,
- "数据更新时间": datetime.fromtimestamp(os.path.getmtime(final_data_path)).isoformat(),
- "基础统计": basic_stats,
- "分布直方图": histogram_data,
- "空间统计": spatial_stats
- }
-
- except Exception as e:
- self.logger.error(f"获取作物Cd统计信息失败: {str(e)}")
- return None
-
- def get_effective_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取有效态Cd预测结果的统计信息
-
- @param {str} county_name - 县市名称
- @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
- """
- try:
- # 查找最新的预测结果文件
- cd_system_path = self.config.get_cd_system_path()
- final_data_path = os.path.join(cd_system_path, "data", "final", "Final_predictions_effective_cd.csv")
-
- if not os.path.exists(final_data_path):
- self.logger.warning(f"未找到有效态Cd预测结果文件: {final_data_path}")
- return None
-
- # 读取预测数据
- df = pd.read_csv(final_data_path)
-
- if 'Prediction' not in df.columns:
- self.logger.warning("预测结果文件中缺少'Prediction'列")
- return None
-
- predictions = df['Prediction']
-
- # 计算基础统计信息
- basic_stats = {
- "数据点总数": len(predictions),
- "均值": float(predictions.mean()),
- "中位数": float(predictions.median()),
- "标准差": float(predictions.std()),
- "最小值": float(predictions.min()),
- "最大值": float(predictions.max()),
- "25%分位数": float(predictions.quantile(0.25)),
- "75%分位数": float(predictions.quantile(0.75)),
- "偏度": float(predictions.skew()),
- "峰度": float(predictions.kurtosis())
- }
-
- # 计算分布直方图数据
- histogram_data = self._calculate_histogram_data(predictions)
-
- # 计算空间统计信息(如果有坐标信息)
- spatial_stats = None
- if 'longitude' in df.columns and 'latitude' in df.columns:
- spatial_stats = self._calculate_spatial_statistics(df)
-
- return {
- "模型类型": "有效态Cd模型",
- "县市名称": county_name,
- "数据更新时间": datetime.fromtimestamp(os.path.getmtime(final_data_path)).isoformat(),
- "基础统计": basic_stats,
- "分布直方图": histogram_data,
- "空间统计": spatial_stats
- }
-
- except Exception as e:
- self.logger.error(f"获取有效态Cd统计信息失败: {str(e)}")
- return None
-
- def get_combined_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取综合预测统计信息
-
- @param {str} county_name - 县市名称
- @returns {Optional[Dict[str, Any]]} 综合统计信息,如果没有数据则返回None
- """
- try:
- crop_stats = self.get_crop_cd_statistics(county_name)
- effective_stats = self.get_effective_cd_statistics(county_name)
-
- if not crop_stats and not effective_stats:
- return None
-
- return {
- "县市名称": county_name,
- "作物Cd统计": crop_stats,
- "有效态Cd统计": effective_stats,
- "生成时间": datetime.now().isoformat()
- }
-
- except Exception as e:
- self.logger.error(f"获取综合统计信息失败: {str(e)}")
- return None
-
-
- def get_all_counties_statistics(self) -> Dict[str, Any]:
- """
- 获取所有支持县市的统计概览
-
- @returns {Dict[str, Any]} 所有县市的统计概览
- """
- try:
- all_stats = {
- "支持县市总数": len(self.supported_counties),
- "统计生成时间": datetime.now().isoformat(),
- "县市统计": {},
- "汇总信息": {
- "有作物Cd数据的县市": 0,
- "有有效态Cd数据的县市": 0,
- "数据完整的县市": 0
- }
- }
-
- for county_name in self.supported_counties.keys():
- county_stats = {
- "县市名称": county_name,
- "有作物Cd数据": False,
- "有有效态Cd数据": False,
- "数据完整": False,
- "最新更新时间": None
- }
-
- # 检查作物Cd数据
- crop_stats = self.get_crop_cd_statistics(county_name)
- if crop_stats:
- county_stats["有作物Cd数据"] = True
- county_stats["作物Cd概要"] = {
- "数据点数": crop_stats["基础统计"]["数据点总数"],
- "均值": crop_stats["基础统计"]["均值"],
- "最大值": crop_stats["基础统计"]["最大值"]
- }
- all_stats["汇总信息"]["有作物Cd数据的县市"] += 1
-
- # 检查有效态Cd数据
- effective_stats = self.get_effective_cd_statistics(county_name)
- if effective_stats:
- county_stats["有有效态Cd数据"] = True
- county_stats["有效态Cd概要"] = {
- "数据点数": effective_stats["基础统计"]["数据点总数"],
- "均值": effective_stats["基础统计"]["均值"],
- "最大值": effective_stats["基础统计"]["最大值"]
- }
- all_stats["汇总信息"]["有有效态Cd数据的县市"] += 1
-
- # 检查数据完整性
- if county_stats["有作物Cd数据"] and county_stats["有有效态Cd数据"]:
- county_stats["数据完整"] = True
- all_stats["汇总信息"]["数据完整的县市"] += 1
-
- all_stats["县市统计"][county_name] = county_stats
-
- return all_stats
-
- except Exception as e:
- self.logger.error(f"获取所有县市统计概览失败: {str(e)}")
- return {
- "error": f"获取统计概览失败: {str(e)}",
- "支持县市总数": len(self.supported_counties),
- "统计生成时间": datetime.now().isoformat()
- }
-
- # =============================================================================
- # 辅助统计方法
- # =============================================================================
-
-
- def _calculate_histogram_data(self, predictions: pd.Series, bins: int = 20) -> Dict[str, Any]:
- """
- 计算分布直方图数据
-
- @param {pd.Series} predictions - 预测值
- @param {int} bins - 直方图区间数
- @returns {Dict[str, Any]} 直方图数据
- """
- try:
- import numpy as np
-
- hist, bin_edges = np.histogram(predictions, bins=bins)
-
- # 计算区间中心点
- bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
-
- return {
- "区间数": int(bins),
- "频次": [int(count) for count in hist],
- "区间中心": [float(center) for center in bin_centers],
- "区间边界": [float(edge) for edge in bin_edges],
- "总频次": int(hist.sum())
- }
-
- except Exception as e:
- self.logger.error(f"计算直方图数据失败: {str(e)}")
- return {}
-
- def _calculate_spatial_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
- """
- 计算空间统计信息
-
- @param {pd.DataFrame} df - 包含坐标和预测值的数据框
- @returns {Dict[str, Any]} 空间统计信息
- """
- try:
- spatial_stats = {
- "经度范围": {
- "最小值": float(df['longitude'].min()),
- "最大值": float(df['longitude'].max()),
- "跨度": float(df['longitude'].max() - df['longitude'].min())
- },
- "纬度范围": {
- "最小值": float(df['latitude'].min()),
- "最大值": float(df['latitude'].max()),
- "跨度": float(df['latitude'].max() - df['latitude'].min())
- }
- }
-
- return spatial_stats
-
- except Exception as e:
- self.logger.error(f"计算空间统计信息失败: {str(e)}")
- return {}
-
-
-
|