123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- """
- 主执行脚本 - Cd预测集成系统
- Main execution script for Cd Prediction Integrated System
- Description: 集成运行作物Cd模型、有效态Cd模型和数据分析可视化的完整流程
- 架构简化版本:直接调用通用绘图模块,删除中间封装层
- """
- import os
- import sys
- import logging
- from datetime import datetime
- # 添加项目根目录到Python路径
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- # 添加上级目录到路径以访问通用绘图模块
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import config
- from models.crop_cd_model.predict import CropCdPredictor
- from models.effective_cd_model.predict import EffectiveCdPredictor
- from analysis.data_processing import DataProcessor
- from utils.common import setup_logging, validate_files
- # 直接导入通用绘图模块
- from app.utils.mapping_utils import csv_to_raster_workflow, MappingUtils
- def main():
- """
- 主执行函数
- 执行完整的Cd预测分析流程
- """
- # 设置日志
- log_file = os.path.join(config.OUTPUT_PATHS["reports_dir"],
- f"execution_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
- setup_logging(log_file)
- logger = logging.getLogger(__name__)
-
- logger.info("=" * 60)
- logger.info("开始执行Cd预测集成系统 (架构简化版本)")
- logger.info("=" * 60)
-
- try:
- # 确保目录存在
- config.ensure_directories()
- logger.info("项目目录结构检查完成")
-
- # 步骤1: 运行作物Cd模型预测
- if config.WORKFLOW_CONFIG["run_crop_model"]:
- logger.info("步骤1: 运行作物Cd模型预测...")
- crop_predictor = CropCdPredictor()
- crop_output = crop_predictor.predict()
- logger.info(f"作物Cd模型预测完成,输出文件: {crop_output}")
-
- # 步骤2: 运行有效态Cd模型预测
- if config.WORKFLOW_CONFIG["run_effective_model"]:
- logger.info("步骤2: 运行有效态Cd模型预测...")
- effective_predictor = EffectiveCdPredictor()
- effective_output = effective_predictor.predict()
- logger.info(f"有效态Cd模型预测完成,输出文件: {effective_output}")
-
- # 步骤3: 数据整合处理
- if config.WORKFLOW_CONFIG["combine_predictions"]:
- logger.info("步骤3: 整合预测结果与坐标数据...")
- data_processor = DataProcessor()
- final_data_files = data_processor.combine_predictions_with_coordinates()
- logger.info(f"数据整合完成,生成文件: {list(final_data_files.keys())}")
-
- # 初始化通用绘图工具(用于可视化)
- mapping_utils = MappingUtils()
-
- # 步骤4: 为每个模型生成栅格文件和可视化图
- model_outputs = {}
-
- for model_name, final_data_file in final_data_files.items():
- if model_name == 'combined': # 跳过合并文件的可视化
- continue
-
- # 只为当前工作流配置中启用的模型生成输出文件
- if model_name == 'crop_cd' and not config.WORKFLOW_CONFIG.get("run_crop_model", False):
- logger.info(f"跳过{model_name}模型的可视化(工作流配置中未启用)")
- continue
- if model_name == 'effective_cd' and not config.WORKFLOW_CONFIG.get("run_effective_model", False):
- logger.info(f"跳过{model_name}模型的可视化(工作流配置中未启用)")
- continue
-
- logger.info(f"为{model_name}模型生成栅格和可视化...")
-
- # 为每个模型确定显示名称
- if model_name == 'crop_cd':
- display_name = "作物Cd"
- title_name = "Crop Cd Prediction"
- elif model_name == 'effective_cd':
- display_name = "有效态Cd"
- title_name = "Effective Cd Prediction"
- else:
- display_name = model_name
- title_name = f"{model_name} Prediction"
-
- model_outputs[model_name] = {}
-
- # 步骤4a: 生成栅格文件(直接调用通用绘图模块)
- if config.WORKFLOW_CONFIG["generate_raster"]:
- logger.info(f"步骤4a: 为{display_name}模型生成栅格文件...")
-
- # 使用csv_to_raster_workflow直接生成栅格
- try:
- workflow_result = csv_to_raster_workflow(
- csv_file=final_data_file,
- template_tif=config.ANALYSIS_CONFIG["template_tif"],
- output_dir=config.OUTPUT_PATHS["raster_dir"],
- boundary_shp=config.ANALYSIS_CONFIG.get("boundary_shp"),
- resolution_factor=1.0, # 高分辨率栅格生成
- interpolation_method='nearest',
- field_name='Prediction',
- lon_col=0,
- lat_col=1,
- value_col=2,
- enable_interpolation=False # 禁用空间插值
- )
-
- output_raster = workflow_result['raster']
-
- # 重命名为特定模型的文件
- model_raster_path = os.path.join(
- config.OUTPUT_PATHS["raster_dir"],
- f"output_{model_name}.tif"
- )
-
- if os.path.exists(output_raster) and output_raster != model_raster_path:
- import shutil
- shutil.move(output_raster, model_raster_path)
- output_raster = model_raster_path
-
- model_outputs[model_name]['raster'] = output_raster
- logger.info(f"{display_name}模型栅格文件生成完成: {output_raster}")
-
- # 清理中间shapefile文件
- try:
- shapefile_path = workflow_result.get('shapefile')
- if shapefile_path and os.path.exists(shapefile_path):
- # 删除shapefile及其相关文件
- import glob
- base_path = os.path.splitext(shapefile_path)[0]
- for ext in ['.shp', '.shx', '.dbf', '.prj', '.cpg']:
- file_to_delete = base_path + ext
- if os.path.exists(file_to_delete):
- os.remove(file_to_delete)
- logger.debug(f"已删除中间文件: {file_to_delete}")
- logger.info(f"已清理中间shapefile文件: {shapefile_path}")
- except Exception as cleanup_error:
- logger.warning(f"清理中间文件失败: {str(cleanup_error)}")
-
- except Exception as e:
- logger.error(f"栅格生成失败: {str(e)}")
- raise
-
- # 步骤4b: 创建可视化地图(直接调用通用绘图模块)
- if config.WORKFLOW_CONFIG["create_visualization"]:
- logger.info(f"步骤4b: 为{display_name}模型创建可视化地图...")
-
- # 为每个模型创建独立的地图输出路径
- map_output_path = os.path.join(
- config.OUTPUT_PATHS["figures_dir"],
- f"Prediction_results_{model_name}"
- )
-
- try:
- # 直接调用通用绘图模块的create_raster_map
- map_output = mapping_utils.create_raster_map(
- shp_path=config.ANALYSIS_CONFIG.get("boundary_shp"),
- tif_path=output_raster,
- output_path=map_output_path,
- title=title_name,
- colormap=config.VISUALIZATION_CONFIG["color_maps"][config.VISUALIZATION_CONFIG["default_colormap"]],
- output_size=config.VISUALIZATION_CONFIG["figure_size"],
- dpi=config.VISUALIZATION_CONFIG["dpi"],
- resolution_factor=1.0, # 保持栅格原始分辨率
- enable_interpolation=False, # 不在可视化阶段插值
- interpolation_method='nearest'
- )
-
- model_outputs[model_name]['map'] = map_output
- logger.info(f"{display_name}模型地图可视化完成: {map_output}")
-
- except Exception as e:
- logger.error(f"地图可视化失败: {str(e)}")
- raise
-
- # 步骤4c: 创建直方图(直接调用通用绘图模块)
- if config.WORKFLOW_CONFIG["create_histogram"]:
- logger.info(f"步骤4c: 为{display_name}模型创建预测值分布直方图...")
-
- # 为每个模型创建独立的直方图输出路径
- histogram_output_path = os.path.join(
- config.OUTPUT_PATHS["figures_dir"],
- f"Prediction_frequency_{model_name}.jpg"
- )
-
- # 为了避免中文字体问题,使用英文标题
- if model_name == 'crop_cd':
- hist_title = 'Crop Cd Prediction Frequency'
- hist_xlabel = 'Crop Cd Content (mg/kg)'
- elif model_name == 'effective_cd':
- hist_title = 'Effective Cd Prediction Frequency'
- hist_xlabel = 'Effective Cd Content (mg/kg)'
- else:
- hist_title = f'{model_name} Prediction Frequency'
- hist_xlabel = f'{model_name} Content'
-
- try:
- # 直接调用通用绘图模块的create_histogram
- histogram_output = mapping_utils.create_histogram(
- file_path=output_raster,
- save_path=histogram_output_path,
- figsize=(6, 6),
- xlabel=hist_xlabel,
- ylabel='Frequency',
- title=hist_title,
- dpi=config.VISUALIZATION_CONFIG["dpi"]
- )
-
- model_outputs[model_name]['histogram'] = histogram_output
- logger.info(f"{display_name}模型直方图创建完成: {histogram_output}")
-
- except Exception as e:
- logger.error(f"直方图生成失败: {str(e)}")
- raise
-
- logger.info("=" * 60)
- logger.info("所有流程执行完成!")
- logger.info("=" * 60)
-
- # 打印结果摘要
- print_summary(model_outputs)
-
- except Exception as e:
- logger.error(f"执行过程中发生错误: {str(e)}")
- logger.exception("详细错误信息:")
- raise
-
- def print_summary(model_outputs):
- """打印执行结果摘要"""
- print("\n" + "=" * 60)
- print("[SUCCESS] Cd预测集成系统执行完成! (架构简化版本)")
- print("=" * 60)
- print(f"[INFO] 总输出目录: {config.OUTPUT_PATHS}")
- print()
-
- # 显示每个模型的输出文件
- for model_name, outputs in model_outputs.items():
- if model_name == 'crop_cd':
- display_name = "作物Cd模型"
- elif model_name == 'effective_cd':
- display_name = "有效态Cd模型"
- else:
- display_name = f"{model_name}模型"
-
- print(f"[MODEL] {display_name}:")
- for output_type, file_path in outputs.items():
- if output_type == 'raster':
- print(f" [RASTER] 栅格文件: {file_path}")
- elif output_type == 'map':
- print(f" [MAP] 地图文件: {file_path}")
- elif output_type == 'histogram':
- print(f" [CHART] 直方图文件: {file_path}")
- print()
-
- print(f"[LOG] 日志文件目录: {config.OUTPUT_PATHS['reports_dir']}")
- print("=" * 60)
- if __name__ == "__main__":
- main()
|