123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610 |
- import os
- import geopandas as gpd
- import pandas as pd
- from pyproj import Transformer
- from shapely.geometry import Point
- import rasterio
- from rasterio.features import rasterize
- from typing import Optional, Dict, Any
- from datetime import datetime
- import numpy as np
- import json
- import logging
- import shutil
- import tempfile
- from pathlib import Path
- import matplotlib.pyplot as plt
- from matplotlib.colors import ListedColormap, BoundaryNorm
- from rasterio.mask import mask
- from rasterio.plot import show
- import seaborn as sns
- import sys
- # 配置日志
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- handler = logging.StreamHandler()
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- # 设置全局字体
- plt.rcParams['font.family'] = 'Arial'
- plt.rcParams['axes.unicode_minus'] = False
- plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
- # 设定常用的colormap
- COLORMAPS = {
- "黄-橙-棕": ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'],
- "蓝色系": ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'],
- "淡黄-草绿": ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'],
- "绿色-棕色": ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'],
- "黄-粉-紫": ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'],
- "绿-黄-红-紫": ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F']
- }
- # 主路径配置
- def get_base_dir():
- """获取基础目录路径"""
- if getattr(sys, 'frozen', False):
- # 打包后的可执行文件
- return os.path.dirname(sys.executable)
- else:
- # 脚本运行模式
- return os.path.dirname(os.path.abspath(__file__))
- # 土地数据处理函数
- def process_land_data(land_type, coefficient_params=None):
- """处理土地类型数据并生成清洗后的简化数据"""
- base_dir = get_base_dir()
- shp_file = os.path.join(base_dir, "..", "static", "water", "Raster", "四县三种用电.shp")
- xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
- logger.info(f"处理土地类型: {land_type}")
- logger.info(f"SHP文件: {shp_file}")
- logger.info(f"Excel文件: {xls_file}")
- # 读取和处理SHP数据
- gdf_shp = gpd.read_file(shp_file)
- gdf_shp = gdf_shp[gdf_shp['DLMC'] == land_type]
- if gdf_shp.empty:
- logger.warning(f"没有找到 '{land_type}' 类型的要素")
- return None, None
- # 坐标系转换器
- transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True)
- # 读取Excel采样点数据
- df_xls = pd.read_excel(xls_file)
- # 设置土地类型系数
- default_params = {
- "水田": (711, 0.524),
- "水浇地": (427, 0.599),
- "旱地": (200, 0.7)
- }
- params = coefficient_params or default_params
- param1, param2 = params.get(land_type, (200, 0.7))
- Num = param1 * param2
- logger.info(f"系数: {param1} * {param2} = {Num}")
- # 处理每个面要素
- cd_values = []
- centers = []
- for index, row in gdf_shp.iterrows():
- center_original = row['geometry'].centroid
- center_lon, center_lat = transformer.transform(center_original.x, center_original.y)
- centers.append((center_lon, center_lat))
- distances = df_xls.apply(
- lambda x: Point(center_lon, center_lat).distance(Point(x['经度'], x['纬度'])),
- axis=1
- )
- min_idx = distances.idxmin()
- nearest = df_xls.loc[min_idx]
- # 只计算Cd含量值
- cd_value = nearest['Cd (ug/L)'] * Num
- cd_values.append(cd_value)
- # 创建输出目录
- data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
- os.makedirs(data_dir, exist_ok=True)
- logger.info(f"数据目录: {data_dir}")
- # 创建简化数据DataFrame
- simplified_data = pd.DataFrame({
- 'lon': [c[0] for c in centers],
- 'lat': [c[1] for c in centers],
- 'Prediction': cd_values
- })
- # 应用3σ原则检测异常值
- mean_value = simplified_data['Prediction'].mean()
- std_value = simplified_data['Prediction'].std()
- lower_bound = mean_value - 3 * std_value
- upper_bound = mean_value + 3 * std_value
- logger.info(f"Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
- logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
- # 识别异常值
- outliers = simplified_data[
- (simplified_data['Prediction'] < lower_bound) |
- (simplified_data['Prediction'] > upper_bound)
- ]
- logger.info(f"检测到异常值数量: {len(outliers)}")
- # 创建清洗后的数据
- cleaned_data = simplified_data[
- (simplified_data['Prediction'] >= lower_bound) &
- (simplified_data['Prediction'] <= upper_bound)
- ]
- logger.info(f"清洗后数据点数: {len(cleaned_data)}")
- # 保存清洗后的简化数据CSV
- cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
- cleaned_data.to_csv(cleaned_csv, index=False, encoding='utf-8-sig')
- logger.info(f"保存CSV: {cleaned_csv}")
- return cleaned_csv, Num
- # 格式转换函数
- def csv_to_shapefile(csv_file, shapefile_output):
- """将CSV文件转换为Shapefile文件"""
- df = pd.read_csv(csv_file)
- lon = df.iloc[:, 0]
- lat = df.iloc[:, 1]
- val = df.iloc[:, 2]
- geometry = [Point(xy) for xy in zip(lon, lat)]
- gdf = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326")
- os.makedirs(os.path.dirname(shapefile_output), exist_ok=True)
- gdf.to_file(shapefile_output, driver="ESRI Shapefile")
- logger.info(f"保存Shapefile: {shapefile_output}")
- def vector_to_raster(input_shapefile, template_tif, output_tif, field="Prediction"):
- """将点矢量数据转换为栅格数据"""
- gdf = gpd.read_file(input_shapefile)
- with rasterio.open(template_tif) as src:
- template_meta = src.meta.copy()
- transform = src.transform
- width = src.width
- height = src.height
- crs = src.crs
- if gdf.crs != crs:
- gdf = gdf.to_crs(crs)
- shapes = ((geom, value) for geom, value in zip(gdf.geometry, gdf[field]))
- raster = rasterize(
- shapes=shapes,
- out_shape=(height, width),
- transform=transform,
- fill=np.nan,
- dtype='float32'
- )
- os.makedirs(os.path.dirname(output_tif), exist_ok=True)
- template_meta.update({
- "count": 1,
- "dtype": 'float32',
- "nodata": np.nan
- })
- with rasterio.open(output_tif, 'w', ** template_meta) as dst:
- dst.write(raster, 1)
- logger.info(f"保存GeoTIFF: {output_tif}")
- # 可视化函数
- def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8):
- """生成栅格地图可视化"""
- try:
- logger.info(f"生成栅格地图: {title_name}")
- # 确保输出目录存在
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
- # 读取矢量边界
- gdf = gpd.read_file(shp_path)
- if gdf.empty:
- raise ValueError("矢量数据为空")
- # 读取并裁剪栅格数据
- with rasterio.open(tif_path) as src:
- if gdf.crs != src.crs:
- gdf = gdf.to_crs(src.crs)
- geoms = [json.loads(gdf.to_json())["features"][0]["geometry"]]
- out_image, out_transform = mask(src, geoms, crop=True)
- # 处理数据
- raster = out_image[0].astype('float16')
- if np.all(np.isnan(raster)):
- raise ValueError("裁剪后的栅格不包含有效数据")
- # 根据分位数分为6个等级
- bounds = np.nanpercentile(raster, [0, 20, 40, 60, 80, 90, 100])
- norm = BoundaryNorm(bounds, ncolors=len(bounds) - 1)
- # 自定义颜色设定
- cmap = ListedColormap(color_map_name)
- # 绘图
- fig, ax = plt.subplots(figsize=(output_size, output_size))
- show(raster, transform=out_transform, ax=ax, cmap=cmap, norm=norm)
- gdf.boundary.plot(ax=ax, color='black', linewidth=1)
- # 设置标题和标签
- ax.set_title(title_name, fontsize=20)
- ax.set_xlabel("Longitude", fontsize=18)
- ax.set_ylabel("Latitude", fontsize=18)
- # 添加经纬网
- ax.grid(True, linestyle='--', color='gray', alpha=0.5)
- ax.tick_params(axis='y', labelrotation=90)
- # 创建图例
- tick_labels = [f"{bounds[i]:.1f}" for i in range(len(bounds) - 1)]
- cbar = plt.colorbar(
- plt.cm.ScalarMappable(norm=norm, cmap=cmap),
- ax=ax,
- ticks=[(bounds[i] + bounds[i + 1]) / 2 for i in range(len(bounds) - 1)],
- shrink=0.6,
- aspect=15
- )
- cbar.ax.set_yticklabels(tick_labels)
- cbar.set_label("Cd含量 (ug/L)")
- # 保存图像
- plt.tight_layout()
- plt.savefig(output_path, dpi=300)
- plt.close(fig)
- logger.info(f"保存地图图片: {output_path}")
- return output_path
- except Exception as e:
- logger.error(f"栅格地图生成失败: {str(e)}")
- return None
- def plot_tif_histogram(file_path, output_path, figsize=(10, 6),
- xlabel='Cd含量 (ug/L)', ylabel='频率密度',
- title='Cd含量分布直方图'):
- """生成TIFF文件的直方图"""
- try:
- logger.info(f"生成直方图: {file_path}")
- with rasterio.open(file_path) as src:
- band = src.read(1)
- nodata = src.nodata
- if nodata is not None:
- band = np.where(band == nodata, np.nan, band)
- band_flat = band.flatten()
- band_flat = band_flat[~np.isnan(band_flat)]
- plt.figure(figsize=figsize)
- sns.histplot(band_flat, bins=100, color='steelblue', alpha=0.7, edgecolor='black', stat='density')
- sns.kdeplot(band_flat, color='red', linewidth=2)
- plt.xlabel(xlabel, fontsize=14)
- plt.ylabel(ylabel, fontsize=14)
- plt.title(title, fontsize=16)
- plt.grid(True, linestyle='--', alpha=0.5)
- plt.tight_layout()
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
- plt.savefig(output_path, dpi=300, format='jpg', bbox_inches='tight')
- plt.close()
- logger.info(f"保存直方图: {output_path}")
- return output_path
- except Exception as e:
- logger.error(f"直方图生成失败: {str(e)}")
- return None
- # 完整的处理流程
- def process_land_to_visualization(land_type, coefficient_params=None,
- color_map_name="绿-黄-红-紫",
- output_size=8):
- """
- 完整的土地数据处理可视化流程:
- 1. 生成清洗后CSV
- 2. 转换为Shapefile
- 3. 转换为GeoTIFF
- 4. 生成栅格地图
- 5. 生成直方图
- 返回所有生成文件的路径
- """
- base_dir = get_base_dir()
- logger.info(f"开始处理: {land_type}")
- # 1. 生成清洗后的CSV
- cleaned_csv, used_coeff = process_land_data(land_type, coefficient_params)
- if not cleaned_csv:
- logger.error(f"处理土地数据失败: {land_type}")
- return None, None, None, None, None, None
- # 2. 转换为Shapefile
- shapefile_output = os.path.join(
- os.path.dirname(cleaned_csv),
- f"中心点经纬度与预测值&{land_type}_清洗.shp"
- )
- csv_to_shapefile(cleaned_csv, shapefile_output)
- # 3. 转换为GeoTIFF
- raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
- template_tif = os.path.join(raster_dir, "meanTemp.tif")
- output_tif = os.path.join(raster_dir, f"{land_type}_Cd含量.tif")
- vector_to_raster(shapefile_output, template_tif, output_tif)
- # 4. 生成栅格地图
- map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
- county_shp = os.path.join(raster_dir, "Lechang.shp") # 县界SHP
- mapping_raster(
- shp_path=county_shp,
- tif_path=output_tif,
- color_map_name=COLORMAPS.get(color_map_name, COLORMAPS["绿-黄-红-紫"]),
- title_name=f"{land_type} Cd含量分布图",
- output_path=map_output,
- output_size=output_size
- )
- # 5. 生成直方图
- hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
- plot_tif_histogram(
- file_path=output_tif,
- output_path=hist_output,
- title=f"{land_type} Cd含量分布直方图"
- )
- return cleaned_csv, shapefile_output, output_tif, map_output, hist_output, used_coeff
- def get_land_statistics(land_type: str) -> Optional[Dict[str, Any]]:
- """
- 获取指定土地类型的Cd预测结果统计信息
- @param {str} land_type - 土地类型(水田、旱地或水浇地)
- @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
- """
- try:
- logger.info(f"获取土地类型统计信息: {land_type}")
- # 获取基础目录
- base_dir = get_base_dir()
- data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
- # 构建数据文件路径
- data_file = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
- logger.info(f"数据文件路径: {data_file}")
- if not os.path.exists(data_file):
- logger.warning(f"未找到土地类型数据文件: {data_file}")
- return None
- # 读取预测数据
- df = pd.read_csv(data_file)
- logger.info(f"成功读取数据文件,包含 {len(df)} 行数据")
- # 检查必要的列是否存在
- if 'Prediction' not in df.columns:
- logger.warning("数据文件中缺少'Prediction'列")
- return None
- predictions = df['Prediction']
- # 计算基础统计信息
- basic_stats = {
- "数据点总数": len(predictions),
- "均值": float(predictions.mean()),
- "中位数": float(predictions.median()),
- "标准差": float(predictions.std()),
- "最小值": float(predictions.min()),
- "最大值": float(predictions.max()),
- "25%分位数": float(predictions.quantile(0.25)),
- "75%分位数": float(predictions.quantile(0.75)),
- "偏度": float(predictions.skew()),
- "峰度": float(predictions.kurtosis())
- }
- # 计算分布直方图数据
- histogram_data = calculate_histogram_data(predictions)
- # 计算空间统计信息(如果有坐标信息)
- spatial_stats = None
- if 'lon' in df.columns and 'lat' in df.columns:
- spatial_stats = calculate_spatial_statistics(df)
- return {
- "土地类型": land_type,
- "数据更新时间": datetime.fromtimestamp(os.path.getmtime(data_file)).isoformat(),
- "基础统计": basic_stats,
- "分布直方图": histogram_data,
- "空间统计": spatial_stats,
- }
- except Exception as e:
- logger.error(f"获取土地类型统计信息失败: {str(e)}")
- return None
- def calculate_histogram_data(data: pd.Series, bins: int = 20) -> Dict[str, Any]:
- """
- 计算数据的直方图分布
- @param data: 数据序列
- @param bins: 直方图分组数量
- @return: 包含直方图数据的字典
- """
- # 计算直方图
- counts, bin_edges = np.histogram(data, bins=bins)
- # 将直方图数据转换为前端可用的格式
- bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
- return {
- "bins": bin_edges.tolist(),
- "counts": counts.tolist(),
- "bin_centers": bin_centers.tolist(),
- "density": (counts / len(data)).tolist() # 频率密度
- }
- def calculate_spatial_statistics(df: pd.DataFrame) -> Dict[str, Any]:
- """
- 计算空间统计信息
- @param df: 包含经纬度和预测值的数据框
- @return: 包含空间统计信息的字典
- """
- try:
- # 检查必要的列
- if 'lon' not in df.columns or 'lat' not in df.columns or 'Prediction' not in df.columns:
- logger.warning("缺少必要的经纬度或预测值列")
- return {}
- # 计算空间分布中心
- center_x = df['lon'].mean()
- center_y = df['lat'].mean()
- spatial_stats = {
- "空间中心点": [center_x, center_y]
- }
- # 尝试计算莫兰指数(空间自相关)
- try:
- from esda.moran import Moran
- import libpysal as lps
- # 创建空间权重矩阵
- coords = list(zip(df['lon'], df['lat']))
- w = lps.weights.KNN(coords, k=5)
- # 计算莫兰指数
- mi = Moran(df['Prediction'], w)
- spatial_stats["莫兰指数"] = {
- "I": mi.I,
- "p_value": mi.p_norm,
- "z_score": mi.z_norm
- }
- except ImportError:
- logger.warning("无法计算莫兰指数,请安装esda和libpysal库")
- except Exception as e:
- logger.error(f"计算莫兰指数时出错: {str(e)}")
- # 尝试计算热点分析(Getis-Ord Gi*)
- try:
- from esda.getisord import G_Local
- # 使用相同的权重矩阵
- gi = G_Local(df['Prediction'], w)
- # 识别热点(高值聚集)和冷点(低值聚集)
- hotspot = np.where(gi.z_sim > 1.96, 1, 0) # 95%置信度
- coldspot = np.where(gi.z_sim < -1.96, 1, 0)
- spatial_stats["热点分析"] = {
- "热点数量": int(hotspot.sum()),
- "冷点数量": int(coldspot.sum()),
- }
- except ImportError:
- logger.warning("无法计算Getis-Ord Gi*统计,请安装esda库")
- except Exception as e:
- logger.error(f"计算Getis-Ord Gi*统计时出错: {str(e)}")
- # 尝试计算标准差椭圆
- try:
- import geopandas as gpd
- from shapely.geometry import Point
- # 创建GeoDataFrame
- geometry = [Point(xy) for xy in zip(df['lon'], df['lat'])]
- gdf = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326")
- # 计算标准差椭圆
- from pointpats import centrography
- std_e = centrography.std_ellipse(gdf.geometry)
- spatial_stats["标准差椭圆"] = {
- "中心点": [std_e[0], std_e[1]],
- "长轴": std_e[2],
- "短轴": std_e[3],
- "方向角": std_e[4]
- }
- except ImportError:
- logger.warning("无法计算标准差椭圆,请安装pointpats库")
- except Exception as e:
- logger.error(f"计算标准差椭圆时出错: {str(e)}")
- return spatial_stats
- except Exception as e:
- logger.error(f"计算空间统计信息失败: {str(e)}")
- return {}
- # 主函数
- def main():
- """主函数用于测试完整的处理流程"""
- logger.info("=" * 50)
- logger.info("土地数据处理与可视化系统")
- logger.info("=" * 50)
- try:
- # 处理水田数据
- logger.info("\n===== 处理水田数据 =====")
- results = process_land_to_visualization("水田")
- if results and all(results):
- cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
- logger.info(f"清洗后CSV: {cleaned_csv}")
- logger.info(f"Shapefile: {shapefile}")
- logger.info(f"GeoTIFF文件: {tif_file}")
- logger.info(f"地图图片: {map_file}")
- logger.info(f"直方图: {hist_file}")
- logger.info(f"使用系数: {used_coeff}")
- else:
- logger.error("水田数据处理失败")
- # 处理旱地数据(使用自定义参数)
- logger.info("\n===== 处理旱地数据 =====")
- custom_params = {"旱地": (220, 0.65)}
- results = process_land_to_visualization(
- "旱地",
- coefficient_params=custom_params,
- color_map_name="蓝色系"
- )
- if results and all(results):
- cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
- logger.info(f"清洗后CSV: {cleaned_csv}")
- logger.info(f"Shapefile: {shapefile}")
- logger.info(f"GeoTIFF文件: {tif_file}")
- logger.info(f"地图图片: {map_file}")
- logger.info(f"直方图: {hist_file}")
- logger.info(f"使用系数: {used_coeff}")
- else:
- logger.error("旱地数据处理失败")
- except Exception as e:
- logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
- finally:
- logger.info("处理完成")
- if __name__ == "__main__":
- main()
|