""" 栅格映射模块 Raster Mapping Module 基于原始02_Transfer_csv_to_geotif.py改进,用于将CSV数据转换为GeoTIFF格式 """ import os import sys import logging import pandas as pd import geopandas as gpd from shapely.geometry import Point import rasterio from rasterio.features import rasterize from rasterio.transform import from_origin import numpy as np # 添加项目根目录到路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import config class RasterMapper: """ 栅格映射器 负责将CSV数据转换为GeoTIFF栅格数据 """ def __init__(self): """ 初始化栅格映射器 """ self.logger = logging.getLogger(__name__) def csv_to_shapefile(self, csv_file, shapefile_output): """ 将CSV文件转换为Shapefile文件 @param csv_file: CSV文件路径 @param shapefile_output: 输出Shapefile文件路径 """ try: # 读取CSV数据 df = pd.read_csv(csv_file) # 确保列名正确 if 'longitude' not in df.columns or 'latitude' not in df.columns: # 尝试自动识别经纬度列 lon_col = None lat_col = None for col in df.columns: col_lower = col.lower() if any(keyword in col_lower for keyword in ['lon', '经度', 'x']): lon_col = col elif any(keyword in col_lower for keyword in ['lat', '纬度', 'y']): lat_col = col if lon_col and lat_col: df = df.rename(columns={lon_col: 'longitude', lat_col: 'latitude'}) self.logger.info(f"自动识别坐标列: {lon_col} -> longitude, {lat_col} -> latitude") else: raise ValueError("无法识别经纬度列") # 创建几何对象 lon = df['longitude'] lat = df['latitude'] # 获取预测值列 value_col = 'Prediction' if value_col not in df.columns: # 寻找可能的预测值列 for col in df.columns: if col not in ['longitude', 'latitude'] and pd.api.types.is_numeric_dtype(df[col]): value_col = col break else: raise ValueError("无法找到预测值列") val = df[value_col] # 创建Point几何对象 geometry = [Point(xy) for xy in zip(lon, lat)] # 创建GeoDataFrame gdf = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326") # 确保输出目录存在 os.makedirs(os.path.dirname(shapefile_output), exist_ok=True) # 保存为Shapefile gdf.to_file(shapefile_output, driver="ESRI Shapefile") self.logger.info(f"Shapefile创建成功: {shapefile_output}") return shapefile_output except Exception as e: self.logger.error(f"CSV转Shapefile失败: {str(e)}") raise def vector_to_raster(self, input_shapefile, template_tif, output_tif, field='Prediction'): """ 将点矢量数据转换为栅格数据 @param input_shapefile: 输入点矢量数据的Shapefile文件路径 @param template_tif: 用作模板的GeoTIFF文件路径 @param output_tif: 输出栅格化后的GeoTIFF文件路径 @param field: 用于栅格化的属性字段名 """ try: # 读取矢量数据 gdf = gpd.read_file(input_shapefile) self.logger.info(f"矢量数据加载成功: {input_shapefile}") # 检查模板文件是否存在 if not os.path.exists(template_tif): self.logger.warning(f"模板文件不存在: {template_tif},将创建默认栅格") self._create_default_template(gdf, template_tif) # 打开模板栅格,获取元信息 with rasterio.open(template_tif) as src: template_meta = src.meta.copy() transform = src.transform width = src.width height = src.height crs = src.crs self.logger.info(f"模板栅格信息 - 宽度: {width}, 高度: {height}, CRS: {crs}") # 将矢量数据投影到模板的坐标系 if gdf.crs != crs: gdf = gdf.to_crs(crs) self.logger.info(f"矢量数据投影到: {crs}") # 检查字段是否存在 if field not in gdf.columns: # 尝试寻找替代字段 numeric_cols = [col for col in gdf.columns if pd.api.types.is_numeric_dtype(gdf[col])] if numeric_cols: field = numeric_cols[0] self.logger.warning(f"字段'{field}'不存在,使用'{field}'替代") else: raise ValueError(f"无法找到数值字段进行栅格化") # 栅格化 shapes = ((geom, value) for geom, value in zip(gdf.geometry, gdf[field])) raster = rasterize( shapes=shapes, out_shape=(height, width), transform=transform, fill=np.nan, dtype='float32' ) # 确保输出目录存在 os.makedirs(os.path.dirname(output_tif), exist_ok=True) # 写入GeoTIFF template_meta.update({ "count": 1, "dtype": 'float32', "nodata": np.nan }) with rasterio.open(output_tif, 'w', **template_meta) as dst: dst.write(raster, 1) self.logger.info(f"栅格文件创建成功: {output_tif}") return output_tif except Exception as e: self.logger.error(f"矢量转栅格失败: {str(e)}") raise def _create_default_template(self, gdf, template_tif): """ 创建默认的模板栅格文件 @param gdf: GeoDataFrame @param template_tif: 模板文件路径 """ try: # 获取边界 bounds = gdf.total_bounds # [minx, miny, maxx, maxy] # 设置分辨率(约1公里) resolution = 0.01 # 度 # 计算栅格尺寸 width = int((bounds[2] - bounds[0]) / resolution) height = int((bounds[3] - bounds[1]) / resolution) # 创建变换矩阵 transform = from_origin(bounds[0], bounds[3], resolution, resolution) # 创建空的栅格数据 data = np.ones((height, width), dtype='float32') * np.nan # 元数据 meta = { 'driver': 'GTiff', 'dtype': 'float32', 'nodata': np.nan, 'width': width, 'height': height, 'count': 1, 'crs': gdf.crs, 'transform': transform } # 确保目录存在 os.makedirs(os.path.dirname(template_tif), exist_ok=True) # 写入模板文件 with rasterio.open(template_tif, 'w', **meta) as dst: dst.write(data, 1) self.logger.info(f"默认模板创建成功: {template_tif}") except Exception as e: self.logger.error(f"默认模板创建失败: {str(e)}") raise def csv_to_raster(self, csv_file, output_raster=None, output_shp=None): """ 完整的CSV到栅格转换流程 @param csv_file: 输入CSV文件路径 @param output_raster: 输出栅格文件路径,如果为None则使用默认路径 @param output_shp: 输出shapefile路径,如果为None则使用默认路径 @return: 输出栅格文件路径 """ try: self.logger.info(f"开始CSV到栅格转换: {csv_file}") # 使用默认路径或提供的自定义路径 if output_raster is None: output_raster = config.ANALYSIS_CONFIG["output_raster"] if output_shp is None: shapefile_path = config.ANALYSIS_CONFIG["temp_shapefile"] else: shapefile_path = output_shp template_tif = config.ANALYSIS_CONFIG["template_tif"] # 步骤1: CSV转Shapefile self.csv_to_shapefile(csv_file, shapefile_path) # 步骤2: Shapefile转栅格 self.vector_to_raster(shapefile_path, template_tif, output_raster) self.logger.info(f"CSV到栅格转换完成: {output_raster}") return output_raster except Exception as e: self.logger.error(f"CSV到栅格转换失败: {str(e)}") raise if __name__ == "__main__": # 测试代码 mapper = RasterMapper() # 假设有一个测试CSV文件 test_csv = os.path.join(config.DATA_PATHS["final_dir"], "Final_predictions.csv") if os.path.exists(test_csv): output_raster = mapper.csv_to_raster(test_csv) print(f"栅格转换完成,输出文件: {output_raster}") else: print(f"测试文件不存在: {test_csv}")