import os import geopandas as gpd import pandas as pd from pyproj import Transformer from shapely.geometry import Point import rasterio from typing import Optional, Dict, Any from datetime import datetime import numpy as np import logging import shutil import sys # 导入MappingUtils from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow # 配置日志 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) # 设置全局字体 import matplotlib.pyplot as plt plt.rcParams['font.family'] = 'Arial' plt.rcParams['axes.unicode_minus'] = False plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei'] # 设定常用的colormap(与mapping_utils保持一致) COLORMAPS = { "yellow_orange_brown": ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'], "blue_series": ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'], "yellow_green": ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'], "green_brown": ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'], "yellow_pink_purple": ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'], "green_yellow_red_purple": ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F'] } # 建立中文到英文的colormap映射 COLORMAP_MAPPING = { "黄-橙-棕": "yellow_orange_brown", "蓝色系": "blue_series", "淡黄-草绿": "yellow_green", "绿色-棕色": "green_brown", "黄-粉-紫": "yellow_pink_purple", "绿-黄-红-紫": "green_yellow_red_purple" } # 主路径配置 def get_base_dir(): """获取基础目录路径""" if getattr(sys, 'frozen', False): # 打包后的可执行文件 return os.path.dirname(sys.executable) else: # 脚本运行模式 return os.path.dirname(os.path.abspath(__file__)) # 土地数据处理函数 def process_land_data(land_type, coefficient_params=None): """处理土地类型数据并生成清洗后的简化数据""" base_dir = get_base_dir() shp_file = os.path.join(base_dir, "..", "static", "water", "Raster", "四县三种用电.shp") xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx") logger.info(f"处理土地类型: {land_type}") logger.info(f"SHP文件: {shp_file}") logger.info(f"Excel文件: {xls_file}") # 读取和处理SHP数据 gdf_shp = gpd.read_file(shp_file) gdf_shp = gdf_shp[gdf_shp['DLMC'] == land_type] if gdf_shp.empty: logger.warning(f"没有找到 '{land_type}' 类型的要素") return None, None # 坐标系转换器 transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True) # 读取Excel采样点数据 df_xls = pd.read_excel(xls_file) # 设置土地类型系数 default_params = { "水田": (711, 0.524), "水浇地": (427, 0.599), "旱地": (200, 0.7) } params = coefficient_params or default_params param1, param2 = params.get(land_type, (200, 0.7)) Num = param1 * param2 logger.info(f"系数: {param1} * {param2} = {Num}") # 处理每个面要素 cd_values = [] centers = [] for index, row in gdf_shp.iterrows(): center_original = row['geometry'].centroid center_lon, center_lat = transformer.transform(center_original.x, center_original.y) centers.append((center_lon, center_lat)) distances = df_xls.apply( lambda x: Point(center_lon, center_lat).distance(Point(x['经度'], x['纬度'])), axis=1 ) min_idx = distances.idxmin() nearest = df_xls.loc[min_idx] # 只计算Cd含量值 cd_value = nearest['Cd (ug/L)'] * Num cd_values.append(cd_value) # 创建输出目录 data_dir = os.path.join(base_dir, "..", "static", "water", "Data") os.makedirs(data_dir, exist_ok=True) logger.info(f"数据目录: {data_dir}") # 创建简化数据DataFrame simplified_data = pd.DataFrame({ 'lon': [c[0] for c in centers], 'lat': [c[1] for c in centers], 'Prediction': cd_values }) # 应用3σ原则检测异常值 mean_value = simplified_data['Prediction'].mean() std_value = simplified_data['Prediction'].std() lower_bound = mean_value - 3 * std_value upper_bound = mean_value + 3 * std_value logger.info(f"Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}") logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}") # 识别异常值 outliers = simplified_data[ (simplified_data['Prediction'] < lower_bound) | (simplified_data['Prediction'] > upper_bound) ] logger.info(f"检测到异常值数量: {len(outliers)}") # 创建清洗后的数据 cleaned_data = simplified_data[ (simplified_data['Prediction'] >= lower_bound) & (simplified_data['Prediction'] <= upper_bound) ] logger.info(f"清洗后数据点数: {len(cleaned_data)}") # 保存清洗后的简化数据CSV cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv") cleaned_data.to_csv(cleaned_csv, index=False, encoding='utf-8-sig') logger.info(f"保存CSV: {cleaned_csv}") return cleaned_csv, Num # 可视化函数(使用MappingUtils) def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8): """生成栅格地图可视化""" try: logger.info(f"生成栅格地图: {title_name}") # 确保输出目录存在 os.makedirs(os.path.dirname(output_path), exist_ok=True) # 创建MappingUtils实例 mapper = MappingUtils() # 转换颜色方案名称 colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple") # 调用MappingUtils的高级绘图功能 mapper.create_raster_map( shp_path=shp_path, tif_path=tif_path, output_path=os.path.splitext(output_path)[0], # 去掉扩展名 colormap=colormap_key, title=title_name, output_size=output_size, figsize=None, dpi=300, enable_interpolation=False, interpolation_method='linear' ) # MappingUtils会自动添加.jpg扩展名,重命名文件以匹配原始输出路径 generated_path = os.path.splitext(output_path)[0] + ".jpg" if generated_path != output_path and os.path.exists(generated_path): shutil.move(generated_path, output_path) logger.info(f"重命名图片: {generated_path} -> {output_path}") return output_path except Exception as e: logger.error(f"栅格地图生成失败: {str(e)}") return None def plot_tif_histogram(file_path, output_path, figsize=(8, 8), xlabel='Cd(ug/L)', ylabel='frequency density', title='Irrigation water input flux'): """生成TIFF文件的直方图""" try: logger.info(f"生成直方图: {file_path}") # 创建MappingUtils实例 mapper = MappingUtils() # 调用MappingUtils的高级直方图功能 mapper.create_histogram( file_path=file_path, save_path=output_path, figsize=figsize, xlabel=xlabel, ylabel=ylabel, title=title, bins=100, dpi=300 ) return output_path except Exception as e: logger.error(f"直方图生成失败: {str(e)}") return None # 完整的处理流程 def process_land_to_visualization(land_type, coefficient_params=None, color_map_name="绿-黄-红-紫", output_size=8): """ 完整的土地数据处理可视化流程: 1. 生成清洗后CSV 2. 使用csv_to_raster_workflow转换为GeoTIFF 3. 生成栅格地图 4. 生成直方图 返回所有生成文件的路径 """ base_dir = get_base_dir() logger.info(f"开始处理: {land_type}") # 1. 生成清洗后的CSV cleaned_csv, used_coeff = process_land_data(land_type, coefficient_params) if not cleaned_csv: logger.error(f"处理土地数据失败: {land_type}") return None, None, None, None, None, None # 2. 使用csv_to_raster_workflow转换为GeoTIFF raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster") template_tif = os.path.join(raster_dir, "meanTemp.tif") output_dir = os.path.dirname(cleaned_csv) # 使用CSV所在目录作为输出目录 # 调用csv_to_raster_workflow workflow_result = csv_to_raster_workflow( csv_file=cleaned_csv, template_tif=template_tif, output_dir=output_dir, boundary_shp=None, resolution_factor=4.0, interpolation_method='linear', field_name='Prediction', lon_col=0, # CSV中经度列索引 lat_col=1, # CSV中纬度列索引 value_col=2, # CSV中数值列索引 enable_interpolation=True ) # 获取输出的栅格文件路径 output_tif = workflow_result['raster'] logger.info(f"生成栅格文件: {output_tif}") # 3. 生成栅格地图 map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg") county_shp = os.path.join(raster_dir, "Lechang.shp") # 县界SHP mapping_raster( shp_path=county_shp, tif_path=output_tif, color_map_name=color_map_name, title_name=f"Irrigation water input flux", output_path=map_output, output_size=output_size ) # 4. 生成直方图 hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg") plot_tif_histogram( file_path=output_tif, output_path=hist_output, title=f"Irrigation water input flux" ) return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff def get_land_statistics(land_type: str) -> Optional[Dict[str, Any]]: """ 获取指定土地类型的Cd预测结果统计信息 @param {str} land_type - 土地类型(水田、旱地或水浇地) @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None """ try: logger.info(f"获取土地类型统计信息: {land_type}") # 获取基础目录 base_dir = get_base_dir() data_dir = os.path.join(base_dir, "..", "static", "water", "Data") # 构建数据文件路径 data_file = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv") logger.info(f"数据文件路径: {data_file}") if not os.path.exists(data_file): logger.warning(f"未找到土地类型数据文件: {data_file}") return None # 读取预测数据 df = pd.read_csv(data_file) logger.info(f"成功读取数据文件,包含 {len(df)} 行数据") # 检查必要的列是否存在 if 'Prediction' not in df.columns: logger.warning("数据文件中缺少'Prediction'列") return None predictions = df['Prediction'] # 计算基础统计信息 stats = { "土地类型": land_type, "数据更新时间": datetime.fromtimestamp(os.path.getmtime(data_file)).strftime('%Y-%m-%d %H:%M:%S'), "数据点总数": len(predictions), "均值": float(predictions.mean()), "中位数": float(predictions.median()), "标准差": float(predictions.std()), "最小值": float(predictions.min()), "最大值": float(predictions.max()), "25%分位数": float(predictions.quantile(0.25)), "75%分位数": float(predictions.quantile(0.75)), "偏度": float(predictions.skew()), "峰度": float(predictions.kurtosis()) } return stats except Exception as e: logger.error(f"获取土地类型统计信息失败: {str(e)}") return None # 主函数 def main(): """主函数用于测试完整的处理流程""" logger.info("=" * 50) logger.info("土地数据处理与可视化系统") logger.info("=" * 50) try: # 处理水田数据 logger.info("\n===== 处理水田数据 =====") results = process_land_to_visualization("水田") if results and all(results): cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results logger.info(f"清洗后CSV: {cleaned_csv}") logger.info(f"Shapefile: {shapefile}") logger.info(f"GeoTIFF文件: {tif_file}") logger.info(f"地图图片: {map_file}") logger.info(f"直方图: {hist_file}") logger.info(f"使用系数: {used_coeff}") else: logger.error("水田数据处理失败") # 处理旱地数据(使用自定义参数) logger.info("\n===== 处理旱地数据 =====") custom_params = {"旱地": (220, 0.65)} results = process_land_to_visualization( "旱地", coefficient_params=custom_params, color_map_name="蓝色系" ) if results and all(results): cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results logger.info(f"清洗后CSV: {cleaned_csv}") logger.info(f"Shapefile: {shapefile}") logger.info(f"GeoTIFF文件: {tif_file}") logger.info(f"地图图片: {map_file}") logger.info(f"直方图: {hist_file}") logger.info(f"使用系数: {used_coeff}") else: logger.error("旱地数据处理失败") except Exception as e: logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True) finally: logger.info("处理完成") if __name__ == "__main__": main()