123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- """
- Author: Wanxue Zhu
- Time: 2025 - April - 18
- Description: 这个代码用于绘制栅格地图,第一个函数是绘制地图的,第二个是绘制栅格数据的直方图
- """
- import json
- import os
- import logging
- import shutil
- import tempfile
- from datetime import datetime
- from pathlib import Path
- import geopandas as gpd
- import matplotlib.pyplot as plt
- from matplotlib.colors import ListedColormap, BoundaryNorm
- from rasterio.mask import mask
- from rasterio.plot import show
- import numpy as np
- import rasterio
- import seaborn as sns
- # 设定一些常用的colormap
- colormap1 = ['#FFFECE', '#FFF085', '#FEBA17','#BE3D2A','#74512D', '#4E1F00'] #黄-橙-棕
- colormap2 = ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60','#2A3335'] #蓝色系
- colormap3 = ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E']#淡黄-草绿
- colormap4 = ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652']#绿色-棕色
- colormap5 = ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB']#黄-粉-紫
- colormap6 = ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F']#绿-黄-红-紫
- # 配置日志
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- handler = logging.StreamHandler()
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- # 设置全局字体为 Arial,并设置中文字体(SimHei或Microsoft YaHei)
- plt.rcParams['font.family'] = 'Arial'
- plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
- plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei'] # 添加多个中文字体
- def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size):
- """
- 生成栅格地图可视化
- 参数:
- - shp_path: 输入的矢量数据的路径
- - tif_path: 输入的栅格数据的路径
- - color_map_name: 使用的色彩方案
- - title_name: 输出数据的图的名称
- - output_path: 输出保存的图片的完整路径(包括扩展名)
- - output_size: 输出图片的尺寸(英寸)
- """
- try:
- logger.info(f"开始生成栅格地图: shp={shp_path}, tif={tif_path}, output={output_path}")
- # 确保输出目录存在
- output_dir = os.path.dirname(output_path)
- if output_dir:
- os.makedirs(output_dir, exist_ok=True)
- # 读取矢量边界
- logger.info("读取矢量边界...")
- gdf = gpd.read_file(shp_path)
- logger.info(f"矢量数据包含 {len(gdf)} 个要素")
- if gdf.empty:
- logger.warning("矢量数据为空,尝试重新读取")
- gdf = gpd.read_file(shp_path) # 再次尝试
- if gdf.empty:
- raise ValueError("矢量数据为空")
- # 读取并裁剪栅格数据
- logger.info("处理栅格数据...")
- with rasterio.open(tif_path) as src:
- logger.info(f"栅格尺寸: {src.width}x{src.height}, 波段数: {src.count}")
- # 确保几何体格式正确
- if gdf.crs is not None and gdf.crs != src.crs:
- logger.info(f"转换矢量CRS: {gdf.crs} -> {src.crs}")
- gdf = gdf.to_crs(src.crs)
- # 确保转换后的几何体有效
- if gdf.is_empty.any():
- logger.warning("转换后几何体为空,使用原始几何体")
- gdf = gpd.read_file(shp_path) # 回退到原始数据
- geoms = [json.loads(gdf.to_json())["features"][0]["geometry"]]
- out_image, out_transform = mask(src, geoms, crop=True)
- out_meta = src.meta.copy()
- # 提取数据并处理无效值
- raster = out_image[0].astype('float16')
- nodata = out_meta.get("nodata", None)
- logger.info(f"栅格数据范围: {raster.min()} - {raster.max()}, nodata={nodata}")
- if nodata is not None:
- raster[raster == nodata] = np.nan
- # 处理无穷大值
- raster[np.isinf(raster)] = np.nan
- # 验证数据有效性
- valid_mask = ~np.isnan(raster)
- valid_count = np.sum(valid_mask)
- logger.info(f"处理无效值后有效点数: {valid_count}")
- # 检查是否有有效数据
- if valid_count == 0:
- logger.warning("裁剪后的栅格不包含有效数据,使用默认值填充")
- raster = np.zeros_like(raster) # 使用0填充
- # 确保数据范围有效
- min_val = np.nanmin(raster)
- max_val = np.nanmax(raster)
- # 如果数据范围太小,设置为合理范围
- if min_val == max_val:
- min_val = min_val - 0.1 if min_val > 0 else -0.1
- max_val = max_val + 0.1
- logger.info(f"有效数据范围: {min_val} - {max_val}")
- # 根据分位数分为6个等级
- logger.info("计算数据分位数...")
- try:
- bounds = np.nanpercentile(raster, [0, 20, 40, 60, 80, 90, 100])
- except Exception as e:
- logger.warning(f"分位数计算失败: {str(e)},使用线性范围")
- bounds = np.linspace(min_val, max_val, 7)
- # 确保分位数有效
- if np.any(np.isnan(bounds)) or np.any(np.isinf(bounds)):
- logger.warning("分位数包含无效值,使用线性范围")
- bounds = np.linspace(min_val, max_val, 7)
- logger.info(f"分位数: {bounds}")
- norm = BoundaryNorm(bounds, ncolors=len(bounds) - 1)
- # 自定义颜色设定
- cmap = ListedColormap(color_map_name)
- # 绘图
- logger.info("创建图表...")
- fig, ax = plt.subplots(figsize=(output_size, output_size))
- logger.info(f"创建图表完成: 尺寸={output_size}英寸")
- # 显示栅格数据
- logger.info("显示栅格数据...")
- show(raster, transform=out_transform, ax=ax, cmap=cmap, norm=norm)
- logger.info("栅格数据显示完成")
- # 添加矢量边界
- gdf.boundary.plot(ax=ax, color='black', linewidth=1)
- logger.info("矢量边界添加完成")
- # 设置标题和标签
- ax.set_title(title_name, fontsize=20)
- ax.set_xlabel("Longitude", fontsize=18)
- ax.set_ylabel("Latitude", fontsize=18)
- # 添加经纬网
- ax.grid(True, linestyle='--', color='gray', alpha=0.5)
- ax.tick_params(axis='y', labelrotation=90)
- logger.info("坐标轴设置完成")
- # 创建图例
- tick_labels = [f"{bounds[i]:.1f}" for i in range(len(bounds) - 1)]
- cbar = plt.colorbar(
- plt.cm.ScalarMappable(norm=norm, cmap=cmap),
- ax=ax,
- ticks=[(bounds[i] + bounds[i + 1]) / 2 for i in range(len(bounds) - 1)],
- shrink=0.6,
- aspect=15
- )
- cbar.ax.set_yticklabels(tick_labels)
- cbar.set_label("Values")
- logger.info("图例创建完成")
- # 保存图像
- logger.info(f"保存图像到: {output_path}")
- plt.tight_layout()
- plt.savefig(output_path, dpi=300)
- plt.close(fig)
- logger.info("图像保存完成")
- # 验证文件是否创建成功
- if not os.path.exists(output_path):
- raise RuntimeError(f"图像文件创建失败: {output_path}")
- logger.info("栅格地图生成成功")
- return output_path
- except rasterio.RasterioIOError as e:
- logger.error(f"栅格文件读取错误: {str(e)}")
- raise
- except ValueError as e:
- logger.error(f"数据值错误: {str(e)}")
- raise
- except Exception as e:
- logger.error(f"栅格地图生成失败: {str(e)}", exc_info=True)
- raise RuntimeError(f"地图可视化错误: {str(e)}")
- def plot_tif_histogram(file_path, figsize=(10, 6), xlabel='像元值', ylabel='频率密度',
- title='GeoTIFF 波段值分布图', save_path=None):
- """
- 生成TIFF文件的直方图
- 参数:
- - file_path (str): GeoTIFF 文件路径
- - figsize (tuple): 图像尺寸,如 (10, 6)
- - xlabel (str): 横坐标标签
- - ylabel (str): 纵坐标标签
- - title (str): 图标题
- - save_path (str): 可选,保存图片路径(含文件名和扩展名)
- """
- try:
- logger.info(f"开始生成直方图: {file_path}")
- sns.set(style='ticks')
- with rasterio.open(file_path) as src:
- band = src.read(1)
- nodata = src.nodata
- if nodata is not None:
- band = np.where(band == nodata, np.nan, band)
- band_flat = band.flatten()
- band_flat = band_flat[~np.isnan(band_flat)]
- plt.figure(figsize=figsize)
- sns.histplot(band_flat, bins=100, color='steelblue', alpha=0.7, edgecolor='black', stat='density')
- sns.kdeplot(band_flat, color='red', linewidth=2)
- plt.xlabel(xlabel, fontsize=14)
- plt.ylabel(ylabel, fontsize=14)
- plt.title(title, fontsize=16)
- plt.grid(True, linestyle='--', alpha=0.5)
- plt.tight_layout()
- if save_path:
- # 确保保存目录存在
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
- plt.savefig(save_path, dpi=300, format='jpg', bbox_inches='tight')
- logger.info(f"保存图像到: {save_path}")
- else:
- save_path = os.path.join(tempfile.gettempdir(), "tif_histogram.jpg")
- plt.savefig(save_path, dpi=300, format='jpg', bbox_inches='tight')
- logger.info(f"保存临时图像到: {save_path}")
- plt.close()
- return save_path
- except Exception as e:
- logger.error(f"直方图生成失败: {str(e)}", exc_info=True)
- raise RuntimeError(f"直方图生成错误: {str(e)}")
- def main():
- """主函数用于测试栅格地图和直方图绘制功能"""
- try:
- # 创建静态目录用于保存结果
- static_dir = r"D:\17417\Documents\backend\Water\Figures" # 静态目录名称
- os.makedirs(static_dir, exist_ok=True) # 确保目录存在
- logger.info(f"使用静态目录: {os.path.abspath(static_dir)}")
- # ================== 栅格地图绘制测试 ==================
- logger.info("开始测试栅格地图绘制功能...")
- test_shp = r"D:\17417\Documents\backend\Water\Raster\lechang.shp" # 替换为实际路径
- test_tif = r"D:\destkop\DQCJ12321311.tif" # 替换为实际路径
- test_colormap = colormap6
- test_title = "Atmospheric input flux"
- # 在静态目录中创建唯一文件名(添加时间戳避免覆盖)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- output_file = os.path.join(static_dir, f"raster_map_{timestamp}.jpg")
- # 调用栅格地图绘制函数
- output_file = mapping_raster(
- shp_path=test_shp,
- tif_path=test_tif,
- color_map_name=test_colormap,
- title_name=test_title,
- output_path=output_file, # 直接使用完整路径
- output_size=8
- )
- if os.path.exists(output_file):
- logger.info(f"✅ 栅格地图绘制测试完成: {os.path.abspath(output_file)}")
- else:
- logger.error(f"❌ 栅格地图文件未创建: {output_file}")
- # ================== 直方图绘制测试 ==================
- logger.info("开始测试直方图绘制功能...")
- histogram_file = test_tif
- histogram_output = os.path.join(static_dir, f"histogram_{timestamp}.jpg")
- # 调用直方图绘制函数
- hist_file = plot_tif_histogram(
- file_path=histogram_file,
- figsize=(8, 8),
- xlabel='Cd content',
- ylabel='Frequency',
- title='Atmospheric input flux',
- save_path=histogram_output
- )
- if os.path.exists(hist_file):
- logger.info(f"✅ 直方图绘制测试完成: {os.path.abspath(hist_file)}")
- else:
- logger.error(f"❌ 直方图文件未创建: {hist_file}")
- logger.info("🎉 所有测试完成! 结果图片已保存到静态目录")
- except FileNotFoundError as e:
- logger.error(f"❌ 文件未找到错误: {e}")
- except rasterio.RasterioIOError:
- logger.error("❌ 栅格文件读取失败,请检查文件格式和路径")
- except Exception as e:
- logger.error(f"❌ 测试过程中发生未知错误: {str(e)}", exc_info=True)
- if __name__ == "__main__":
- # 安全执行区域
- logger.info("=" * 50)
- logger.info("栅格地图和直方图绘制测试程序")
- logger.info("=" * 50)
- # 执行测试
- main()
|