""" 数据处理模块 Data Processing Module 用于整合模型预测结果与坐标数据,生成最终的分析数据 """ import os import sys import logging import pandas as pd import numpy as np # 添加项目根目录到路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import config class DataProcessor: """ 数据处理器 负责整合预测结果与坐标数据 """ def __init__(self): """ 初始化数据处理器 """ self.logger = logging.getLogger(__name__) def load_predictions(self): """ 加载模型预测结果(根据WORKFLOW_CONFIG配置) @return: 包含预测结果的字典 """ try: predictions = {} # 动态读取当前的工作流配置(运行时可能被修改) workflow_config = self._get_current_workflow_config() self.logger.info(f"当前工作流配置: {workflow_config}") # 只加载在工作流配置中启用的模型的预测结果 # 加载作物Cd预测结果 if workflow_config.get("run_crop_model", False): crop_cd_path = os.path.join( config.DATA_PATHS["predictions_dir"], config.CROP_CD_MODEL["output_file"] ) if os.path.exists(crop_cd_path): predictions['crop_cd'] = pd.read_csv(crop_cd_path) self.logger.info(f"作物Cd预测结果加载成功: {crop_cd_path}") else: self.logger.warning(f"作物Cd预测文件不存在: {crop_cd_path}") else: self.logger.info("跳过作物Cd预测结果加载(工作流配置中未启用)") # 加载有效态Cd预测结果 if workflow_config.get("run_effective_model", False): effective_cd_path = os.path.join( config.DATA_PATHS["predictions_dir"], config.EFFECTIVE_CD_MODEL["output_file"] ) if os.path.exists(effective_cd_path): predictions['effective_cd'] = pd.read_csv(effective_cd_path) self.logger.info(f"有效态Cd预测结果加载成功: {effective_cd_path}") else: self.logger.warning(f"有效态Cd预测文件不存在: {effective_cd_path}") else: self.logger.info("跳过有效态Cd预测结果加载(工作流配置中未启用)") if not predictions: self.logger.warning("没有加载到任何预测结果,请检查WORKFLOW_CONFIG配置和预测文件是否存在") else: self.logger.info(f"根据工作流配置,成功加载了 {len(predictions)} 个模型的预测结果: {list(predictions.keys())}") return predictions except Exception as e: self.logger.error(f"预测结果加载失败: {str(e)}") raise def _get_current_workflow_config(self): """ 动态读取当前的工作流配置 @return: 当前的工作流配置字典 """ try: config_file = os.path.join(config.PROJECT_ROOT, "config.py") # 读取配置文件内容 with open(config_file, 'r', encoding='utf-8') as f: config_content = f.read() # 提取WORKFLOW_CONFIG import re pattern = r'WORKFLOW_CONFIG\s*=\s*(\{[^}]*\})' match = re.search(pattern, config_content) if match: # 使用eval安全地解析配置(这里是安全的,因为我们控制配置文件内容) workflow_config_str = match.group(1) workflow_config = eval(workflow_config_str) return workflow_config else: self.logger.warning("无法从配置文件中提取WORKFLOW_CONFIG,使用默认配置") return config.WORKFLOW_CONFIG except Exception as e: self.logger.error(f"读取工作流配置失败: {str(e)},使用默认配置") return config.WORKFLOW_CONFIG def load_coordinates(self): """ 加载坐标数据 @return: 坐标数据DataFrame """ try: # 首先尝试从数据目录加载 coord_path = os.path.join( config.DATA_PATHS["coordinates_dir"], config.DATA_PATHS["coordinate_file"] ) if not os.path.exists(coord_path): # 如果数据目录中没有,尝试从原始模型目录复制 self.logger.info("坐标文件不存在,尝试从原始模型目录复制...") self._copy_coordinates_from_models() coordinates = pd.read_csv(coord_path) self.logger.info(f"坐标数据加载成功: {coord_path}, 数据形状: {coordinates.shape}") return coordinates except Exception as e: self.logger.error(f"坐标数据加载失败: {str(e)}") raise def _copy_coordinates_from_models(self): """ 从原始模型目录复制坐标文件 """ try: # 从作物Cd模型目录复制坐标文件 source_path = os.path.join( "..", "作物Cd模型文件与数据", "作物Cd模型文件与数据", config.DATA_PATHS["coordinate_file"] ) if os.path.exists(source_path): target_path = os.path.join( config.DATA_PATHS["coordinates_dir"], config.DATA_PATHS["coordinate_file"] ) # 确保目标目录存在 os.makedirs(os.path.dirname(target_path), exist_ok=True) # 复制文件 import shutil shutil.copy2(source_path, target_path) self.logger.info(f"坐标文件复制成功: {source_path} -> {target_path}") else: raise FileNotFoundError(f"坐标文件不存在: {source_path}") except Exception as e: self.logger.error(f"坐标文件复制失败: {str(e)}") raise def _detect_coordinate_columns(self, coordinates): """ 自动检测坐标数据的列名 @param coordinates: 坐标数据DataFrame @return: (经度列名, 纬度列名) """ columns = coordinates.columns.tolist() # 可能的经度列名 lon_candidates = ['lon', 'longitude', '经度', 'x', 'lng'] # 可能的纬度列名 lat_candidates = ['lat', 'latitude', '纬度', 'y', 'lan'] lon_col = None lat_col = None # 查找经度列 for col in columns: for candidate in lon_candidates: if candidate.lower() in col.lower(): lon_col = col break if lon_col: break # 查找纬度列 for col in columns: for candidate in lat_candidates: if candidate.lower() in col.lower(): lat_col = col break if lat_col: break if not lon_col or not lat_col: raise ValueError(f"无法识别坐标列。现有列名: {columns}") self.logger.info(f"检测到坐标列: 经度={lon_col}, 纬度={lat_col}") return lon_col, lat_col def combine_predictions_with_coordinates(self): """ 将预测结果与坐标数据合并,为每个模型生成独立的最终数据文件 @return: 最终数据文件路径字典 """ try: self.logger.info("开始整合预测结果与坐标数据...") # 加载数据 predictions = self.load_predictions() coordinates = self.load_coordinates() # 自动检测坐标列名 lon_col, lat_col = self._detect_coordinate_columns(coordinates) final_files = {} # 为每个模型创建独立的最终数据文件 for model_name, prediction_data in predictions.items(): # 创建单独的数据DataFrame final_data = coordinates[[lon_col, lat_col]].copy() final_data['Prediction'] = prediction_data.iloc[:, 0] # 重命名列以匹配期望的格式 column_mapping = { lon_col: 'longitude', lat_col: 'latitude' } final_data = final_data.rename(columns=column_mapping) # 确定输出文件名 if model_name == 'crop_cd': output_filename = "Final_predictions_crop_cd.csv" model_display_name = "作物Cd模型" elif model_name == 'effective_cd': output_filename = "Final_predictions_effective_cd.csv" model_display_name = "有效态Cd模型" else: output_filename = f"Final_predictions_{model_name}.csv" model_display_name = f"{model_name}模型" # 保存最终数据 output_path = os.path.join( config.DATA_PATHS["final_dir"], output_filename ) # 确保输出目录存在 os.makedirs(os.path.dirname(output_path), exist_ok=True) final_data.to_csv(output_path, index=False) final_files[model_name] = output_path self.logger.info(f"{model_display_name}数据整合完成,文件保存至: {output_path}") self.logger.info(f"{model_display_name}数据形状: {final_data.shape}") # 打印统计信息 pred_stats = final_data['Prediction'].describe() self.logger.info(f"{model_display_name}预测值统计:\n{pred_stats}") # 如果同时有两个模型,也创建一个合并的文件供参考 if len(predictions) > 1: combined_data = coordinates[[lon_col, lat_col]].copy() # 添加所有预测结果 for model_name, prediction_data in predictions.items(): if model_name == 'crop_cd': combined_data['Crop_Cd_Prediction'] = prediction_data.iloc[:, 0] elif model_name == 'effective_cd': combined_data['Effective_Cd_Prediction'] = prediction_data.iloc[:, 0] # 创建一个平均预测列 prediction_columns = [col for col in combined_data.columns if 'Prediction' in col] if len(prediction_columns) > 0: combined_data['Average_Prediction'] = combined_data[prediction_columns].mean(axis=1) # 重命名坐标列 column_mapping = { lon_col: 'longitude', lat_col: 'latitude' } combined_data = combined_data.rename(columns=column_mapping) # 保存合并文件 combined_output_path = os.path.join( config.DATA_PATHS["final_dir"], "Final_predictions_combined.csv" ) combined_data.to_csv(combined_output_path, index=False) final_files['combined'] = combined_output_path self.logger.info(f"合并数据文件保存至: {combined_output_path}") self.logger.info(f"合并数据形状: {combined_data.shape}") return final_files except Exception as e: self.logger.error(f"数据整合失败: {str(e)}") raise def validate_final_data(self, file_path): """ 验证最终数据的格式和内容 @param file_path: 最终数据文件路径 @return: 验证是否通过 """ try: data = pd.read_csv(file_path) # 检查必要的列 required_columns = ['longitude', 'latitude', 'Prediction'] missing_columns = [col for col in required_columns if col not in data.columns] if missing_columns: self.logger.error(f"缺少必要的列: {missing_columns}") return False # 检查数据完整性 if data.isnull().any().any(): self.logger.warning("数据中存在空值") # 检查坐标范围(假设为合理的地理坐标) if not (data['longitude'].between(-180, 180).all() and data['latitude'].between(-90, 90).all()): self.logger.warning("坐标值超出合理范围") self.logger.info("数据验证通过") return True except Exception as e: self.logger.error(f"数据验证失败: {str(e)}") return False if __name__ == "__main__": # 测试代码 processor = DataProcessor() final_files = processor.combine_predictions_with_coordinates() for model_name, final_file in final_files.items(): processor.validate_final_data(final_file) print(f"{model_name}数据处理完成,最终文件: {final_file}")