123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- """
- 栅格映射模块
- Raster Mapping Module
- 基于通用绘图模块 app.utils.mapping_utils 的封装
- 提供与原有接口兼容的栅格转换功能,包含空间插值处理
- """
- import os
- import sys
- import logging
- import pandas as pd
- import geopandas as gpd
- from shapely.geometry import Point
- import rasterio
- from rasterio.features import rasterize
- from rasterio.transform import from_origin
- import numpy as np
- # 添加项目根目录到路径
- sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- # 导入通用绘图模块
- from app.utils.mapping_utils import MappingUtils
- import config
- class RasterMapper:
- """
- 栅格映射器
- 负责将CSV数据转换为GeoTIFF栅格数据
- """
-
- def __init__(self):
- """
- 初始化栅格映射器
- """
- self.logger = logging.getLogger(__name__)
- # 初始化通用绘图模块
- self.mapping_utils = MappingUtils()
-
- def csv_to_shapefile(self, csv_file, shapefile_output):
- """
- 将CSV文件转换为Shapefile文件
-
- @param csv_file: CSV文件路径
- @param shapefile_output: 输出Shapefile文件路径
- """
- try:
- # 读取CSV数据
- df = pd.read_csv(csv_file)
-
- # 确保列名正确
- if 'longitude' not in df.columns or 'latitude' not in df.columns:
- # 尝试自动识别经纬度列
- lon_col = None
- lat_col = None
-
- for col in df.columns:
- col_lower = col.lower()
- if any(keyword in col_lower for keyword in ['lon', '经度', 'x']):
- lon_col = col
- elif any(keyword in col_lower for keyword in ['lat', '纬度', 'y']):
- lat_col = col
-
- if lon_col and lat_col:
- df = df.rename(columns={lon_col: 'longitude', lat_col: 'latitude'})
- self.logger.info(f"自动识别坐标列: {lon_col} -> longitude, {lat_col} -> latitude")
- else:
- raise ValueError("无法识别经纬度列")
-
- # 创建几何对象
- lon = df['longitude']
- lat = df['latitude']
-
- # 获取预测值列
- value_col = 'Prediction'
- if value_col not in df.columns:
- # 寻找可能的预测值列
- for col in df.columns:
- if col not in ['longitude', 'latitude'] and pd.api.types.is_numeric_dtype(df[col]):
- value_col = col
- break
- else:
- raise ValueError("无法找到预测值列")
-
- val = df[value_col]
-
- # 创建Point几何对象
- geometry = [Point(xy) for xy in zip(lon, lat)]
-
- # 创建GeoDataFrame
- gdf = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326")
-
- # 确保输出目录存在
- os.makedirs(os.path.dirname(shapefile_output), exist_ok=True)
-
- # 保存为Shapefile
- gdf.to_file(shapefile_output, driver="ESRI Shapefile")
-
- self.logger.info(f"Shapefile创建成功: {shapefile_output}")
- return shapefile_output
-
- except Exception as e:
- self.logger.error(f"CSV转Shapefile失败: {str(e)}")
- raise
-
- def vector_to_raster(self, input_shapefile, template_tif, output_tif, field='Prediction', resolution_factor=16.0):
- """
- 将点矢量数据转换为栅格数据(使用通用绘图模块)
-
- @param input_shapefile: 输入点矢量数据的Shapefile文件路径
- @param template_tif: 用作模板的GeoTIFF文件路径
- @param output_tif: 输出栅格化后的GeoTIFF文件路径
- @param field: 用于栅格化的属性字段名
- @param resolution_factor: 分辨率倍数因子,16.0表示分辨率提高16倍(像素单元变为1/16)
- """
- try:
- self.logger.info(f"开始矢量转栅格: {input_shapefile}")
- self.logger.info(f"分辨率因子: {resolution_factor}")
-
- # 获取边界文件
- boundary_shp = config.ANALYSIS_CONFIG.get("boundary_shp")
-
- # 调用通用绘图模块的矢量转栅格方法(包含空间插值)
- output_path, stats = self.mapping_utils.vector_to_raster(
- input_shapefile=input_shapefile,
- template_tif=template_tif,
- output_tif=output_tif,
- field=field,
- resolution_factor=resolution_factor,
- boundary_shp=boundary_shp,
- interpolation_method='nearest'
- )
-
- self.logger.info(f"栅格文件创建成功: {output_path}")
- if stats:
- self.logger.info(f"统计信息: 有效像素 {stats.get('valid_pixels', 0)}/{stats.get('total_pixels', 0)}")
-
- return output_path
-
- except Exception as e:
- self.logger.error(f"矢量转栅格失败: {str(e)}")
- raise
-
- def _create_default_template(self, gdf, template_tif):
- """
- 创建默认的模板栅格文件
-
- @param gdf: GeoDataFrame
- @param template_tif: 模板文件路径
- """
- try:
- # 获取边界
- bounds = gdf.total_bounds # [minx, miny, maxx, maxy]
-
- # 设置分辨率(约1公里)
- resolution = 0.01 # 度
-
- # 计算栅格尺寸
- width = int((bounds[2] - bounds[0]) / resolution)
- height = int((bounds[3] - bounds[1]) / resolution)
-
- # 创建变换矩阵
- transform = from_origin(bounds[0], bounds[3], resolution, resolution)
-
- # 创建空的栅格数据
- data = np.ones((height, width), dtype='float32') * np.nan
-
- # 元数据
- meta = {
- 'driver': 'GTiff',
- 'dtype': 'float32',
- 'nodata': np.nan,
- 'width': width,
- 'height': height,
- 'count': 1,
- 'crs': gdf.crs,
- 'transform': transform
- }
-
- # 确保目录存在
- os.makedirs(os.path.dirname(template_tif), exist_ok=True)
-
- # 写入模板文件
- with rasterio.open(template_tif, 'w', **meta) as dst:
- dst.write(data, 1)
-
- self.logger.info(f"默认模板创建成功: {template_tif}")
-
- except Exception as e:
- self.logger.error(f"默认模板创建失败: {str(e)}")
- raise
-
- def csv_to_raster(self, csv_file, output_raster=None, output_shp=None):
- """
- 完整的CSV到栅格转换流程
-
- @param csv_file: 输入CSV文件路径
- @param output_raster: 输出栅格文件路径,如果为None则使用默认路径
- @param output_shp: 输出shapefile路径,如果为None则使用默认路径
- @return: 输出栅格文件路径
- """
- try:
- self.logger.info(f"开始CSV到栅格转换: {csv_file}")
-
- # 使用默认路径或提供的自定义路径
- if output_raster is None:
- output_raster = config.ANALYSIS_CONFIG["output_raster"]
- if output_shp is None:
- shapefile_path = config.ANALYSIS_CONFIG["temp_shapefile"]
- else:
- shapefile_path = output_shp
-
- template_tif = config.ANALYSIS_CONFIG["template_tif"]
-
- # 步骤1: CSV转Shapefile
- self.csv_to_shapefile(csv_file, shapefile_path)
-
- # 步骤2: Shapefile转栅格
- self.vector_to_raster(shapefile_path, template_tif, output_raster)
-
- self.logger.info(f"CSV到栅格转换完成: {output_raster}")
- return output_raster
-
- except Exception as e:
- self.logger.error(f"CSV到栅格转换失败: {str(e)}")
- raise
- if __name__ == "__main__":
- # 测试代码
- mapper = RasterMapper()
-
- # 假设有一个测试CSV文件
- test_csv = os.path.join(config.DATA_PATHS["final_dir"], "Final_predictions.csv")
-
- if os.path.exists(test_csv):
- output_raster = mapper.csv_to_raster(test_csv)
- print(f"栅格转换完成,输出文件: {output_raster}")
- else:
- print(f"测试文件不存在: {test_csv}")
|