Browse Source

优化土地数据处理接口,新增CSV文件生成选项,支持内存处理和直接从DataFrame转换为GeoDataFrame,提升数据处理效率和灵活性。同时更新日志记录,确保关键文件缺失时的错误处理更加完善。

drggboy 4 days ago
parent
commit
1d5f612c36
3 changed files with 293 additions and 33 deletions
  1. 15 4
      app/api/water.py
  2. 44 28
      app/services/water_service.py
  3. 234 1
      app/utils/mapping_utils.py

+ 15 - 4
app/api/water.py

@@ -126,7 +126,8 @@ async def recalculate_land_data(
         level: Optional[str] = Form(None, description="可选的行政层级,必须为: county | city | province"),
         enable_interpolation: Optional[bool] = Form(False, description="是否启用空间插值,默认启用"),
         interpolation_method: Optional[str] = Form("linear", description="插值方法: nearest | linear | cubic"),
-        resolution_factor: Optional[float] = Form(4.0, description="分辨率因子,默认4.0,越大分辨率越高")
+        resolution_factor: Optional[float] = Form(4.0, description="分辨率因子,默认4.0,越大分辨率越高"),
+        save_csv: Optional[bool] = Form(True, description="是否生成CSV文件,默认生成")
 ) -> Dict[str, Any]:
     """重新计算土地数据并返回结果路径,支持动态边界控制和插值控制"""
     try:
@@ -149,7 +150,7 @@ async def recalculate_land_data(
             land_type: (param1, param2)
         }
 
-        # 调用服务层函数处理土地数据(支持动态边界和插值控制)
+        # 调用统一的处理函数,CSV生成作为可选参数
         results = process_land_to_visualization(
             land_type=land_type,
             coefficient_params=coefficient_params,
@@ -159,10 +160,11 @@ async def recalculate_land_data(
             level=level,
             enable_interpolation=enable_interpolation,
             interpolation_method=interpolation_method,
-            resolution_factor=resolution_factor
+            resolution_factor=resolution_factor,
+            save_csv=save_csv  # 将CSV生成选项传递给处理函数
         )
 
-        if not results or any(r is None for r in results):
+        if not results:
             logger.error(f"重新计算土地数据失败: {land_type}")
             raise HTTPException(
                 status_code=500,
@@ -170,6 +172,15 @@ async def recalculate_land_data(
             )
 
         cleaned_csv, shapefile, tif_file, map_output, hist_output, used_coeff = results
+        
+        # 检查关键文件是否存在(shapefile可以为None,因为使用的是内存处理)
+        if not tif_file or not map_output or not hist_output:
+            logger.error(f"重新计算土地数据失败,关键文件缺失: {land_type}")
+            logger.error(f"GeoTIFF: {tif_file}, 地图: {map_output}, 直方图: {hist_output}")
+            raise HTTPException(
+                status_code=500,
+                detail=f"重新计算土地数据失败: {land_type}"
+            )
 
         # 定义默认路径
         default_map_path = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")

+ 44 - 28
app/services/water_service.py

@@ -12,7 +12,7 @@ import shutil
 import sys
 
 # 导入MappingUtils
-from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow
+from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow, dataframe_to_raster_workflow
 # 导入土地利用服务
 from .land_use_service import land_use_service
 # 导入边界服务
@@ -90,7 +90,7 @@ def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDat
 
 
 # 土地数据处理函数
-def process_land_data(land_type, coefficient_params=None):
+def process_land_data(land_type, coefficient_params=None, save_csv=True):
     """处理土地类型数据并生成清洗后的简化数据"""
     base_dir = get_base_dir()
     xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
@@ -104,14 +104,14 @@ def process_land_data(land_type, coefficient_params=None):
     
     if land_centers_df is None or land_centers_df.empty:
         logger.warning(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
-        return None, None
+        return None, None, None
 
     logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
 
     # 读取Excel采样点数据
     if not os.path.exists(xls_file):
         logger.error(f"采样点Excel文件不存在: {xls_file}")
-        return None, None
+        return None, None, None
         
     df_xls = pd.read_excel(xls_file)
     logger.info(f"读取到 {len(df_xls)} 个采样点数据")
@@ -149,11 +149,6 @@ def process_land_data(land_type, coefficient_params=None):
         cd_value = nearest['Cd (ug/L)'] * Num
         cd_values.append(cd_value)
 
-    # 创建输出目录
-    data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
-    os.makedirs(data_dir, exist_ok=True)
-    logger.info(f"数据目录: {data_dir}")
-
     # 创建简化数据DataFrame
     simplified_data = pd.DataFrame({
         'lon': [c[0] for c in centers],
@@ -184,12 +179,21 @@ def process_land_data(land_type, coefficient_params=None):
         ]
     logger.info(f"清洗后数据点数: {len(cleaned_data)}")
 
-    # 保存清洗后的简化数据CSV
-    cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
-    cleaned_data.to_csv(cleaned_csv, index=False, encoding='utf-8-sig')
-    logger.info(f"保存CSV: {cleaned_csv}")
+    # 可选:保存CSV文件用于兼容性和调试
+    cleaned_csv_path = None
+    if save_csv:
+        # 创建输出目录
+        data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
+        os.makedirs(data_dir, exist_ok=True)
+        logger.info(f"数据目录: {data_dir}")
+        
+        cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
+        cleaned_data.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
+        logger.info(f"保存CSV: {cleaned_csv_path}")
+    else:
+        logger.info("跳过CSV文件生成(内存处理优化)")
 
-    return cleaned_csv, Num
+    return cleaned_data, Num, cleaned_csv_path  # 返回DataFrame, 系数, 和CSV路径(如果生成了)
 
 
 # 可视化函数(完全使用统一的MappingUtils接口)
@@ -285,7 +289,6 @@ def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
         return None
 
 
-# 完整的处理流程(已更新为完全使用统一绘图接口,支持动态边界和插值控制)
 def process_land_to_visualization(land_type, coefficient_params=None,
                                   color_map_name="绿-黄-红-紫",
                                   output_size=8,
@@ -293,7 +296,8 @@ def process_land_to_visualization(land_type, coefficient_params=None,
                                   level: Optional[str] = None,
                                   enable_interpolation: Optional[bool] = True,
                                   interpolation_method: Optional[str] = "linear",
-                                  resolution_factor: Optional[float] = 4.0):
+                                  resolution_factor: Optional[float] = 4.0,
+                                  save_csv: Optional[bool] = True):
     """
     完整的土地数据处理可视化流程(使用统一的MappingUtils接口,支持动态边界和插值控制)
     
@@ -316,14 +320,19 @@ def process_land_to_visualization(land_type, coefficient_params=None,
     @param enable_interpolation: 是否启用空间插值,默认True
     @param interpolation_method: 插值方法,nearest | linear | cubic,默认linear
     @param resolution_factor: 分辨率因子,默认4.0,越大分辨率越高
+    @param save_csv: 是否生成CSV文件,默认True
     @returns: 包含所有生成文件路径的元组
     """
     base_dir = get_base_dir()
     logger.info(f"开始处理: {land_type}")
 
-    # 1. 生成清洗后的CSV
-    cleaned_csv, used_coeff = process_land_data(land_type, coefficient_params)
-    if not cleaned_csv:
+    # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
+    cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
+        land_type, 
+        coefficient_params, 
+        save_csv=save_csv  # 根据参数决定是否生成CSV文件
+    )
+    if cleaned_data is None:
         logger.error(f"处理土地数据失败: {land_type}")
         return None, None, None, None, None, None
 
@@ -346,24 +355,23 @@ def process_land_to_visualization(land_type, coefficient_params=None,
         else:
             logger.info("未能获取默认边界,栅格转换将使用数据点范围")
             
-    # 3. 使用csv_to_raster_workflow转换为GeoTIFF(支持动态边界)
+    # 3. 使用dataframe_to_raster_workflow转换为GeoTIFF(内存处理,支持动态边界)
     raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
     template_tif = os.path.join(raster_dir, "meanTemp.tif")
-    output_dir = os.path.dirname(cleaned_csv)  # 使用CSV所在目录作为输出目录
+    output_dir = raster_dir  # 直接使用raster目录作为输出目录
 
-    # 调用csv_to_raster_workflow,支持动态边界和插值控制
-    workflow_result = csv_to_raster_workflow(
-        csv_file=cleaned_csv,
+    # 调用dataframe_to_raster_workflow,支持动态边界和插值控制(内存处理优化)
+    workflow_result = dataframe_to_raster_workflow(
+        df=cleaned_data,  # 直接使用DataFrame
         template_tif=template_tif,
         output_dir=output_dir,
-        boundary_shp=None,  # 不使用Shapefile
         boundary_gdf=boundary_gdf,  # 直接使用GeoDataFrame
         resolution_factor=resolution_factor,  # 可配置的分辨率因子
         interpolation_method=interpolation_method,  # 可配置的插值方法
         field_name='Prediction',
-        lon_col=0,  # CSV中经度列索引
-        lat_col=1,  # CSV中纬度列索引
-        value_col=2,  # CSV中数值列索引
+        lon_col=0,  # DataFrame中经度列索引
+        lat_col=1,  # DataFrame中纬度列索引
+        value_col=2,  # DataFrame中数值列索引
         enable_interpolation=enable_interpolation  # 可配置的插值开关
     )
 
@@ -425,6 +433,14 @@ def process_land_to_visualization(land_type, coefficient_params=None,
     except Exception as e:
         logger.error(f"直方图生成失败: {str(e)}")
 
+    # 使用实际的CSV路径(如果生成了)或生成兼容性路径
+    if cleaned_csv_path:
+        cleaned_csv = cleaned_csv_path
+    else:
+        # 为了兼容性,生成路径(即使文件不存在)
+        data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
+        cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
+    
     return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
 
 

+ 234 - 1
app/utils/mapping_utils.py

@@ -124,6 +124,50 @@ class MappingUtils:
             self.logger.error(f"CSV转Shapefile失败: {str(e)}")
             raise
     
+    def dataframe_to_geodataframe(self, df, lon_col=0, lat_col=1, value_col=2, field_name='Prediction'):
+        """
+        将DataFrame直接转换为GeoDataFrame(内存处理)
+        
+        @param df: pandas DataFrame
+        @param lon_col: 经度列索引或列名,默认第0列
+        @param lat_col: 纬度列索引或列名,默认第1列  
+        @param value_col: 数值列索引或列名,默认第2列
+        @param field_name: 值字段名称
+        @return: GeoDataFrame
+        """
+        try:
+            self.logger.info("开始将DataFrame转换为GeoDataFrame(内存处理)")
+            
+            # 支持列索引或列名
+            if isinstance(lon_col, int):
+                lon = df.iloc[:, lon_col]
+            else:
+                lon = df[lon_col]
+                
+            if isinstance(lat_col, int):
+                lat = df.iloc[:, lat_col]
+            else:
+                lat = df[lat_col]
+                
+            if isinstance(value_col, int):
+                val = df.iloc[:, value_col]
+            else:
+                val = df[value_col]
+            
+            # 创建几何对象
+            geometry = [Point(xy) for xy in zip(lon, lat)]
+            
+            # 创建新的DataFrame,只包含必要的列
+            data = {field_name: val}
+            gdf = gpd.GeoDataFrame(data, geometry=geometry, crs="EPSG:4326")
+            
+            self.logger.info(f"✓ 成功转换DataFrame到GeoDataFrame: {len(gdf)} 个点")
+            return gdf
+            
+        except Exception as e:
+            self.logger.error(f"DataFrame转GeoDataFrame失败: {str(e)}")
+            raise
+    
     def create_boundary_mask(self, raster, transform, gdf):
         """
         创建边界掩膜,只保留边界内的区域
@@ -202,6 +246,148 @@ class MappingUtils:
             self.logger.error(f"插值失败: {str(e)}")
             return raster
     
+    def geodataframe_to_raster(self, gdf, template_tif, output_tif, field, 
+                              resolution_factor=16.0, boundary_gdf=None, interpolation_method='nearest', enable_interpolation=True):
+        """
+        将GeoDataFrame直接转换为栅格数据(内存处理版本)
+        
+        @param gdf: 输入的GeoDataFrame
+        @param template_tif: 用作模板的GeoTIFF文件路径
+        @param output_tif: 输出栅格化后的GeoTIFF文件路径
+        @param field: 用于栅格化的属性字段名
+        @param resolution_factor: 分辨率倍数因子
+        @param boundary_gdf: 边界GeoDataFrame,用于创建掩膜
+        @param interpolation_method: 插值方法 ('nearest', 'linear', 'cubic')
+        @param enable_interpolation: 是否启用空间插值,默认True
+        @return: 输出的GeoTIFF文件路径和统计信息
+        """
+        try:
+            self.logger.info(f"开始处理GeoDataFrame到栅格(内存处理)")
+            interpolation_status = "启用" if enable_interpolation else "禁用"
+            self.logger.info(f"分辨率因子: {resolution_factor}, 插值设置: {interpolation_status} (方法: {interpolation_method})")
+            
+            # 读取模板栅格
+            with rasterio.open(template_tif) as src:
+                template_meta = src.meta.copy()
+                
+                # 根据分辨率因子计算新的尺寸和变换参数
+                if resolution_factor != 1.0:
+                    width = int(src.width * resolution_factor)
+                    height = int(src.height * resolution_factor)
+                    
+                    transform = rasterio.Affine(
+                        src.transform.a / resolution_factor,
+                        src.transform.b,
+                        src.transform.c,
+                        src.transform.d,
+                        src.transform.e / resolution_factor,
+                        src.transform.f
+                    )
+                    self.logger.info(f"分辨率调整: {src.width}x{src.height} -> {width}x{height}")
+                else:
+                    width = src.width
+                    height = src.height
+                    transform = src.transform
+                    self.logger.info(f"保持原始分辨率: {width}x{height}")
+                
+                crs = src.crs
+            
+            # 投影矢量数据
+            if gdf.crs != crs:
+                gdf = gdf.to_crs(crs)
+            
+            # 栅格化
+            shapes = ((geom, value) for geom, value in zip(gdf.geometry, gdf[field]))
+            raster = rasterize(
+                shapes=shapes,
+                out_shape=(height, width),
+                transform=transform,
+                fill=np.nan,
+                dtype='float32'
+            )
+            
+            # 预备掩膜:优先使用行政区边界;若未提供边界,则使用点集凸包限制绘制范围
+            boundary_mask = None
+            if boundary_gdf is not None:
+                self.logger.info("应用边界掩膜: 使用直接提供的GeoDataFrame")
+                # 确保边界GeoDataFrame的CRS与栅格一致
+                if boundary_gdf.crs != crs:
+                    boundary_gdf = boundary_gdf.to_crs(crs)
+                boundary_mask = self.create_boundary_mask(raster, transform, boundary_gdf)
+                raster[~boundary_mask] = np.nan
+            else:
+                try:
+                    # 使用点集凸包作为默认掩膜,避免边界外着色
+                    hull = gdf.unary_union.convex_hull
+                    hull_gdf = gpd.GeoDataFrame(geometry=[hull], crs=crs)
+                    boundary_mask = self.create_boundary_mask(raster, transform, hull_gdf)
+                    raster[~boundary_mask] = np.nan
+                    self.logger.info("已使用点集凸包限制绘制范围")
+                except Exception as hull_err:
+                    self.logger.warning(f"生成点集凸包掩膜失败,可能会出现边界外着色: {str(hull_err)}")
+            
+            # 检查栅格数据状态并决定是否插值
+            nan_count = np.isnan(raster).sum()
+            total_pixels = raster.size
+            self.logger.info(f"栅格数据状态: 总像素数 {total_pixels}, NaN像素数 {nan_count} ({nan_count/total_pixels*100:.1f}%)")
+            
+            # 使用插值方法填充NaN值(如果启用)
+            if enable_interpolation and nan_count > 0:
+                self.logger.info(f"✓ 启用插值: 使用 {interpolation_method} 方法填充 {nan_count} 个NaN像素...")
+                raster = self.interpolate_nan_values(raster, method=interpolation_method)
+                # 关键修正:插值后再次应用掩膜,确保边界外不被填充
+                if boundary_mask is not None:
+                    raster[~boundary_mask] = np.nan
+                final_nan_count = np.isnan(raster).sum()
+                self.logger.info(f"插值完成: 剩余NaN像素数 {final_nan_count}")
+            elif enable_interpolation and nan_count == 0:
+                self.logger.info("✓ 插值已启用,但栅格数据无NaN值,无需插值")
+            elif not enable_interpolation and nan_count > 0:
+                self.logger.info(f"✗ 插值已禁用,保留 {nan_count} 个NaN像素")
+            else:
+                self.logger.info("✓ 栅格数据完整,无需插值")
+            
+            # 创建输出目录
+            os.makedirs(os.path.dirname(output_tif), exist_ok=True)
+            
+            # 更新元数据
+            template_meta.update({
+                "count": 1,
+                "dtype": 'float32',
+                "nodata": np.nan,
+                "width": width,
+                "height": height,
+                "transform": transform
+            })
+            
+            # 保存栅格文件
+            with rasterio.open(output_tif, 'w', **template_meta) as dst:
+                dst.write(raster, 1)
+            
+            # 计算统计信息
+            valid_data = raster[~np.isnan(raster)]
+            stats = None
+            if len(valid_data) > 0:
+                stats = {
+                    'min': float(np.min(valid_data)),
+                    'max': float(np.max(valid_data)),
+                    'mean': float(np.mean(valid_data)),
+                    'std': float(np.std(valid_data)),
+                    'valid_pixels': int(len(valid_data)),
+                    'total_pixels': int(raster.size)
+                }
+                self.logger.info(f"统计信息: 有效像素 {stats['valid_pixels']}/{stats['total_pixels']}")
+                self.logger.info(f"数值范围: {stats['min']:.4f} - {stats['max']:.4f}")
+            else:
+                self.logger.warning("没有有效数据")
+            
+            self.logger.info(f"✓ 成功保存: {output_tif}")
+            return output_tif, stats
+            
+        except Exception as e:
+            self.logger.error(f"GeoDataFrame转栅格失败: {str(e)}")
+            raise
+    
     def vector_to_raster(self, input_shapefile, template_tif, output_tif, field, 
                         resolution_factor=16.0, boundary_shp=None, boundary_gdf=None, interpolation_method='nearest', enable_interpolation=True):
         """
@@ -702,12 +888,59 @@ def get_available_colormaps():
     return COLORMAPS.copy()
 
 
+def dataframe_to_raster_workflow(df, template_tif, output_dir, 
+                                boundary_gdf=None, resolution_factor=16.0,
+                                interpolation_method='nearest', field_name='Prediction',
+                                lon_col=0, lat_col=1, value_col=2, enable_interpolation=False):
+    """
+    DataFrame到栅格转换工作流(内存处理优化版本)
+    
+    @param df: pandas DataFrame
+    @param template_tif: 模板GeoTIFF文件路径
+    @param output_dir: 输出目录
+    @param boundary_gdf: 边界GeoDataFrame(可选)
+    @param resolution_factor: 分辨率因子
+    @param interpolation_method: 插值方法
+    @param field_name: 字段名称
+    @param lon_col: 经度列
+    @param lat_col: 纬度列
+    @param value_col: 数值列
+    @param enable_interpolation: 是否启用空间插值,默认False
+    @return: 输出文件路径字典
+    """
+    mapper = MappingUtils()
+    
+    # 确保输出目录存在
+    os.makedirs(output_dir, exist_ok=True)
+    
+    # 生成文件名(基于时间戳,避免冲突)
+    import time
+    timestamp = str(int(time.time()))
+    raster_path = os.path.join(output_dir, f"memory_raster_{timestamp}.tif")
+    
+    # 1. DataFrame直接转GeoDataFrame(内存处理)
+    gdf = mapper.dataframe_to_geodataframe(df, lon_col, lat_col, value_col, field_name)
+    
+    # 2. GeoDataFrame直接转栅格(内存处理)
+    raster_path, stats = mapper.geodataframe_to_raster(
+        gdf, template_tif, raster_path, field_name,
+        resolution_factor, boundary_gdf, interpolation_method, enable_interpolation
+    )
+    
+    return {
+        'shapefile': None,  # 内存处理不生成shapefile
+        'raster': raster_path,
+        'statistics': stats,
+        'geodataframe': gdf  # 返回GeoDataFrame供调试使用
+    }
+
+
 def csv_to_raster_workflow(csv_file, template_tif, output_dir, 
                           boundary_shp=None, boundary_gdf=None, resolution_factor=16.0,
                           interpolation_method='nearest', field_name='Prediction',
                           lon_col=0, lat_col=1, value_col=2, enable_interpolation=False):
     """
-    完整的CSV到栅格转换工作流
+    完整的CSV到栅格转换工作流(原版本,保持兼容性)
     
     @param csv_file: CSV文件路径
     @param template_tif: 模板GeoTIFF文件路径