123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- """
- 数据处理模块
- Data Processing Module
- 用于整合模型预测结果与坐标数据,生成最终的分析数据
- """
- import os
- import sys
- import logging
- import pandas as pd
- import numpy as np
- # 添加项目根目录到路径
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import config
- class DataProcessor:
- """
- 数据处理器
- 负责整合预测结果与坐标数据
- """
-
- def __init__(self):
- """
- 初始化数据处理器
- """
- self.logger = logging.getLogger(__name__)
-
- def load_predictions(self):
- """
- 加载模型预测结果(根据WORKFLOW_CONFIG配置)
-
- @return: 包含预测结果的字典
- """
- try:
- predictions = {}
-
- # 动态读取当前的工作流配置(运行时可能被修改)
- workflow_config = self._get_current_workflow_config()
- self.logger.info(f"当前工作流配置: {workflow_config}")
-
- # 只加载在工作流配置中启用的模型的预测结果
- # 加载作物Cd预测结果
- if workflow_config.get("run_crop_model", False):
- crop_cd_path = os.path.join(
- config.DATA_PATHS["predictions_dir"],
- config.CROP_CD_MODEL["output_file"]
- )
- if os.path.exists(crop_cd_path):
- predictions['crop_cd'] = pd.read_csv(crop_cd_path)
- self.logger.info(f"作物Cd预测结果加载成功: {crop_cd_path}")
- else:
- self.logger.warning(f"作物Cd预测文件不存在: {crop_cd_path}")
- else:
- self.logger.info("跳过作物Cd预测结果加载(工作流配置中未启用)")
-
- # 加载有效态Cd预测结果
- if workflow_config.get("run_effective_model", False):
- effective_cd_path = os.path.join(
- config.DATA_PATHS["predictions_dir"],
- config.EFFECTIVE_CD_MODEL["output_file"]
- )
- if os.path.exists(effective_cd_path):
- predictions['effective_cd'] = pd.read_csv(effective_cd_path)
- self.logger.info(f"有效态Cd预测结果加载成功: {effective_cd_path}")
- else:
- self.logger.warning(f"有效态Cd预测文件不存在: {effective_cd_path}")
- else:
- self.logger.info("跳过有效态Cd预测结果加载(工作流配置中未启用)")
-
- if not predictions:
- self.logger.warning("没有加载到任何预测结果,请检查WORKFLOW_CONFIG配置和预测文件是否存在")
- else:
- self.logger.info(f"根据工作流配置,成功加载了 {len(predictions)} 个模型的预测结果: {list(predictions.keys())}")
-
- return predictions
-
- except Exception as e:
- self.logger.error(f"预测结果加载失败: {str(e)}")
- raise
-
- def _get_current_workflow_config(self):
- """
- 动态读取当前的工作流配置
-
- @return: 当前的工作流配置字典
- """
- try:
- config_file = os.path.join(config.PROJECT_ROOT, "config.py")
-
- # 读取配置文件内容
- with open(config_file, 'r', encoding='utf-8') as f:
- config_content = f.read()
-
- # 提取WORKFLOW_CONFIG
- import re
- pattern = r'WORKFLOW_CONFIG\s*=\s*(\{[^}]*\})'
- match = re.search(pattern, config_content)
-
- if match:
- # 使用eval安全地解析配置(这里是安全的,因为我们控制配置文件内容)
- workflow_config_str = match.group(1)
- workflow_config = eval(workflow_config_str)
- return workflow_config
- else:
- self.logger.warning("无法从配置文件中提取WORKFLOW_CONFIG,使用默认配置")
- return config.WORKFLOW_CONFIG
-
- except Exception as e:
- self.logger.error(f"读取工作流配置失败: {str(e)},使用默认配置")
- return config.WORKFLOW_CONFIG
-
- def load_coordinates(self):
- """
- 加载坐标数据
-
- @return: 坐标数据DataFrame
- """
- try:
- # 首先尝试从数据目录加载
- coord_path = os.path.join(
- config.DATA_PATHS["coordinates_dir"],
- config.DATA_PATHS["coordinate_file"]
- )
-
- if not os.path.exists(coord_path):
- # 如果数据目录中没有,尝试从原始模型目录复制
- self.logger.info("坐标文件不存在,尝试从原始模型目录复制...")
- self._copy_coordinates_from_models()
-
- coordinates = pd.read_csv(coord_path)
- self.logger.info(f"坐标数据加载成功: {coord_path}, 数据形状: {coordinates.shape}")
- return coordinates
-
- except Exception as e:
- self.logger.error(f"坐标数据加载失败: {str(e)}")
- raise
-
- def _copy_coordinates_from_models(self):
- """
- 从原始模型目录复制坐标文件
- """
- try:
- # 从作物Cd模型目录复制坐标文件
- source_path = os.path.join(
- "..", "作物Cd模型文件与数据", "作物Cd模型文件与数据",
- config.DATA_PATHS["coordinate_file"]
- )
-
- if os.path.exists(source_path):
- target_path = os.path.join(
- config.DATA_PATHS["coordinates_dir"],
- config.DATA_PATHS["coordinate_file"]
- )
-
- # 确保目标目录存在
- os.makedirs(os.path.dirname(target_path), exist_ok=True)
-
- # 复制文件
- import shutil
- shutil.copy2(source_path, target_path)
- self.logger.info(f"坐标文件复制成功: {source_path} -> {target_path}")
- else:
- raise FileNotFoundError(f"坐标文件不存在: {source_path}")
-
- except Exception as e:
- self.logger.error(f"坐标文件复制失败: {str(e)}")
- raise
-
- def _detect_coordinate_columns(self, coordinates):
- """
- 自动检测坐标数据的列名
-
- @param coordinates: 坐标数据DataFrame
- @return: (经度列名, 纬度列名)
- """
- columns = coordinates.columns.tolist()
-
- # 可能的经度列名
- lon_candidates = ['lon', 'longitude', '经度', 'x', 'lng']
- # 可能的纬度列名
- lat_candidates = ['lat', 'latitude', '纬度', 'y', 'lan']
-
- lon_col = None
- lat_col = None
-
- # 查找经度列
- for col in columns:
- for candidate in lon_candidates:
- if candidate.lower() in col.lower():
- lon_col = col
- break
- if lon_col:
- break
-
- # 查找纬度列
- for col in columns:
- for candidate in lat_candidates:
- if candidate.lower() in col.lower():
- lat_col = col
- break
- if lat_col:
- break
-
- if not lon_col or not lat_col:
- raise ValueError(f"无法识别坐标列。现有列名: {columns}")
-
- self.logger.info(f"检测到坐标列: 经度={lon_col}, 纬度={lat_col}")
- return lon_col, lat_col
-
- def combine_predictions_with_coordinates(self):
- """
- 将预测结果与坐标数据合并,为每个模型生成独立的最终数据文件
-
- @return: 最终数据文件路径字典
- """
- try:
- self.logger.info("开始整合预测结果与坐标数据...")
-
- # 加载数据
- predictions = self.load_predictions()
- coordinates = self.load_coordinates()
-
- # 自动检测坐标列名
- lon_col, lat_col = self._detect_coordinate_columns(coordinates)
-
- final_files = {}
-
- # 为每个模型创建独立的最终数据文件
- for model_name, prediction_data in predictions.items():
- # 创建单独的数据DataFrame
- final_data = coordinates[[lon_col, lat_col]].copy()
- final_data['Prediction'] = prediction_data.iloc[:, 0]
-
- # 重命名列以匹配期望的格式
- column_mapping = {
- lon_col: 'longitude',
- lat_col: 'latitude'
- }
- final_data = final_data.rename(columns=column_mapping)
-
- # 确定输出文件名
- if model_name == 'crop_cd':
- output_filename = "Final_predictions_crop_cd.csv"
- model_display_name = "作物Cd模型"
- elif model_name == 'effective_cd':
- output_filename = "Final_predictions_effective_cd.csv"
- model_display_name = "有效态Cd模型"
- else:
- output_filename = f"Final_predictions_{model_name}.csv"
- model_display_name = f"{model_name}模型"
-
- # 保存最终数据
- output_path = os.path.join(
- config.DATA_PATHS["final_dir"],
- output_filename
- )
-
- # 确保输出目录存在
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
-
- final_data.to_csv(output_path, index=False)
- final_files[model_name] = output_path
-
- self.logger.info(f"{model_display_name}数据整合完成,文件保存至: {output_path}")
- self.logger.info(f"{model_display_name}数据形状: {final_data.shape}")
-
- # 打印统计信息
- pred_stats = final_data['Prediction'].describe()
- self.logger.info(f"{model_display_name}预测值统计:\n{pred_stats}")
-
- # 如果同时有两个模型,也创建一个合并的文件供参考
- if len(predictions) > 1:
- combined_data = coordinates[[lon_col, lat_col]].copy()
-
- # 添加所有预测结果
- for model_name, prediction_data in predictions.items():
- if model_name == 'crop_cd':
- combined_data['Crop_Cd_Prediction'] = prediction_data.iloc[:, 0]
- elif model_name == 'effective_cd':
- combined_data['Effective_Cd_Prediction'] = prediction_data.iloc[:, 0]
-
- # 创建一个平均预测列
- prediction_columns = [col for col in combined_data.columns if 'Prediction' in col]
- if len(prediction_columns) > 0:
- combined_data['Average_Prediction'] = combined_data[prediction_columns].mean(axis=1)
-
- # 重命名坐标列
- column_mapping = {
- lon_col: 'longitude',
- lat_col: 'latitude'
- }
- combined_data = combined_data.rename(columns=column_mapping)
-
- # 保存合并文件
- combined_output_path = os.path.join(
- config.DATA_PATHS["final_dir"],
- "Final_predictions_combined.csv"
- )
- combined_data.to_csv(combined_output_path, index=False)
- final_files['combined'] = combined_output_path
-
- self.logger.info(f"合并数据文件保存至: {combined_output_path}")
- self.logger.info(f"合并数据形状: {combined_data.shape}")
-
- return final_files
-
- except Exception as e:
- self.logger.error(f"数据整合失败: {str(e)}")
- raise
-
- def validate_final_data(self, file_path):
- """
- 验证最终数据的格式和内容
-
- @param file_path: 最终数据文件路径
- @return: 验证是否通过
- """
- try:
- data = pd.read_csv(file_path)
-
- # 检查必要的列
- required_columns = ['longitude', 'latitude', 'Prediction']
- missing_columns = [col for col in required_columns if col not in data.columns]
-
- if missing_columns:
- self.logger.error(f"缺少必要的列: {missing_columns}")
- return False
-
- # 检查数据完整性
- if data.isnull().any().any():
- self.logger.warning("数据中存在空值")
-
- # 检查坐标范围(假设为合理的地理坐标)
- if not (data['longitude'].between(-180, 180).all() and
- data['latitude'].between(-90, 90).all()):
- self.logger.warning("坐标值超出合理范围")
-
- self.logger.info("数据验证通过")
- return True
-
- except Exception as e:
- self.logger.error(f"数据验证失败: {str(e)}")
- return False
- if __name__ == "__main__":
- # 测试代码
- processor = DataProcessor()
- final_files = processor.combine_predictions_with_coordinates()
- for model_name, final_file in final_files.items():
- processor.validate_final_data(final_file)
- print(f"{model_name}数据处理完成,最终文件: {final_file}")
|