Browse Source

Merge remote-tracking branch 'origin/zsj_0823_' into ding

yangtaodemon 5 days ago
parent
commit
59c1dd5960

+ 0 - 1
Water/Raster/lechang.cpg

@@ -1 +0,0 @@
-UTF-8

BIN
Water/Raster/lechang.dbf


+ 0 - 1
Water/Raster/lechang.prj

@@ -1 +0,0 @@
-GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]

BIN
Water/Raster/lechang.shp


BIN
Water/Raster/lechang.shx


BIN
Water/Raster/lechang.zip


+ 83 - 32
app/api/water.py

@@ -4,7 +4,7 @@ import os
 import logging
 import logging
 import tempfile
 import tempfile
 import shutil
 import shutil
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, List
 from datetime import datetime
 from datetime import datetime
 import json
 import json
 from pathlib import Path
 from pathlib import Path
@@ -46,18 +46,29 @@ def cleanup_temp_path(path: str) -> None:
 
 
 @router.get("/default-map",
 @router.get("/default-map",
             summary="获取默认地图图片",
             summary="获取默认地图图片",
-            description="返回指定土地类型的默认地图图片")
+            description="返回指定土地类型的默认地图图片,支持多个土地类型用下划线连接")
 async def get_default_map(
 async def get_default_map(
-        land_type: str = Query("水田", description="土地类型,如:'水田'、'旱地'或'水浇地'")
+        land_type: str = Query("水田", description="土地类型,支持单个或多个(用下划线连接),如:'水田'、'旱地_水浇地'")
 ) -> FileResponse:
 ) -> FileResponse:
     """返回默认地图图片"""
     """返回默认地图图片"""
     try:
     try:
         logger.info(f"获取默认地图: {land_type}")
         logger.info(f"获取默认地图: {land_type}")
 
 
+        # 标准化土地类型顺序以匹配文件名
+        from ..services.water_service import standardize_land_types_order
+        if '_' in land_type:
+            # 多个土地类型,按标准顺序排列
+            types_list = land_type.split('_')
+            standardized_types = standardize_land_types_order(types_list)
+            standardized_land_type = '_'.join(standardized_types)
+        else:
+            # 单个土地类型
+            standardized_land_type = land_type.strip()
+
         # 获取基础目录
         # 获取基础目录
         base_dir = get_base_dir()
         base_dir = get_base_dir()
         raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
         raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
-        map_path = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
+        map_path = os.path.join(raster_dir, f"{standardized_land_type}_Cd含量地图.jpg")
 
 
         if not os.path.exists(map_path):
         if not os.path.exists(map_path):
             logger.warning(f"默认地图文件不存在: {map_path}")
             logger.warning(f"默认地图文件不存在: {map_path}")
@@ -80,18 +91,29 @@ async def get_default_map(
 
 
 @router.get("/default-histogram",
 @router.get("/default-histogram",
             summary="获取默认直方图",
             summary="获取默认直方图",
-            description="返回指定土地类型的默认直方图图片")
+            description="返回指定土地类型的默认直方图图片,支持多个土地类型用下划线连接")
 async def get_default_histogram(
 async def get_default_histogram(
-        land_type: str = Query("水田", description="土地类型,如:'水田'、'旱地'或'水浇地'")
+        land_type: str = Query("水田", description="土地类型,支持单个或多个(用下划线连接),如:'水田'、'旱地_水浇地'")
 ) -> FileResponse:
 ) -> FileResponse:
     """返回默认直方图图片"""
     """返回默认直方图图片"""
     try:
     try:
         logger.info(f"获取默认直方图: {land_type}")
         logger.info(f"获取默认直方图: {land_type}")
 
 
+        # 标准化土地类型顺序以匹配文件名
+        from ..services.water_service import standardize_land_types_order
+        if '_' in land_type:
+            # 多个土地类型,按标准顺序排列
+            types_list = land_type.split('_')
+            standardized_types = standardize_land_types_order(types_list)
+            standardized_land_type = '_'.join(standardized_types)
+        else:
+            # 单个土地类型
+            standardized_land_type = land_type.strip()
+
         # 获取基础目录
         # 获取基础目录
         base_dir = get_base_dir()
         base_dir = get_base_dir()
         raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
         raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
-        hist_path = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
+        hist_path = os.path.join(raster_dir, f"{standardized_land_type}_Cd含量直方图.jpg")
 
 
         if not os.path.exists(hist_path):
         if not os.path.exists(hist_path):
             logger.warning(f"默认直方图文件不存在: {hist_path}")
             logger.warning(f"默认直方图文件不存在: {hist_path}")
@@ -114,10 +136,10 @@ async def get_default_histogram(
 
 
 @router.post("/calculate",
 @router.post("/calculate",
              summary="重新计算土地数据",
              summary="重新计算土地数据",
-             description="根据输入的土地类型和系数重新计算土地数据,支持通过area和level参数控制地图边界,支持插值控制")
+             description="根据输入的土地类型和系数重新计算土地数据,支持多个土地类型选择(复选框),支持通过area和level参数控制地图边界,支持插值控制")
 async def recalculate_land_data(
 async def recalculate_land_data(
         background_tasks: BackgroundTasks,
         background_tasks: BackgroundTasks,
-        land_type: str = Form(..., description="土地类型,如:'水田'、'旱地'或'水浇地'"),
+        land_types: List[str] = Form(..., description="土地类型,支持单个或多个选择,如:['水田']、['旱地', '水浇地']"),
         param1: float = Form(711, description="土地类型系数的第一个参数"),
         param1: float = Form(711, description="土地类型系数的第一个参数"),
         param2: float = Form(0.524, description="土地类型系数的第二个参数"),
         param2: float = Form(0.524, description="土地类型系数的第二个参数"),
         color_map_name: str = Form("绿-黄-红-紫", description="使用的色彩方案"),
         color_map_name: str = Form("绿-黄-红-紫", description="使用的色彩方案"),
@@ -127,11 +149,17 @@ async def recalculate_land_data(
         enable_interpolation: Optional[bool] = Form(False, description="是否启用空间插值,默认启用"),
         enable_interpolation: Optional[bool] = Form(False, description="是否启用空间插值,默认启用"),
         interpolation_method: Optional[str] = Form("linear", description="插值方法: nearest | linear | cubic"),
         interpolation_method: Optional[str] = Form("linear", description="插值方法: nearest | linear | cubic"),
         resolution_factor: Optional[float] = Form(4.0, description="分辨率因子,默认4.0,越大分辨率越高"),
         resolution_factor: Optional[float] = Form(4.0, description="分辨率因子,默认4.0,越大分辨率越高"),
-        save_csv: Optional[bool] = Form(True, description="是否生成CSV文件,默认生成")
+        save_csv: Optional[bool] = Form(True, description="是否生成CSV文件,默认生成"),
+        cleanup_temp_files: Optional[bool] = Form(True, description="是否清理临时文件,默认清理")
 ) -> Dict[str, Any]:
 ) -> Dict[str, Any]:
-    """重新计算土地数据并返回结果路径,支持动态边界控制和插值控制"""
+    """重新计算土地数据并返回结果路径,支持多个土地类型和动态边界控制和插值控制"""
     try:
     try:
-        logger.info(f"重新计算土地数据: {land_type}")
+        # 处理土地类型列表 - 去除空值和空格,并标准化顺序
+        from ..services.water_service import standardize_land_types_order
+        parsed_land_types = standardize_land_types_order(land_types)
+        land_types_str = ', '.join(parsed_land_types)
+        
+        logger.info(f"重新计算土地数据: {land_types_str} (共{len(parsed_land_types)}种类型)")
         if area and level:
         if area and level:
             logger.info(f"使用动态边界: {area} ({level})")
             logger.info(f"使用动态边界: {area} ({level})")
         else:
         else:
@@ -145,14 +173,14 @@ async def recalculate_land_data(
         # 确保目录存在
         # 确保目录存在
         os.makedirs(raster_dir, exist_ok=True)
         os.makedirs(raster_dir, exist_ok=True)
 
 
-        # 构建系数参数
-        coefficient_params = {
-            land_type: (param1, param2)
-        }
+        # 构建系数参数 - 为所有土地类型应用相同的系数
+        coefficient_params = {}
+        for land_type in parsed_land_types:
+            coefficient_params[land_type] = (param1, param2)
 
 
-        # 调用统一的处理函数,CSV生成作为可选参数
+        # 调用统一的处理函数,支持多个土地类型,CSV生成作为可选参数
         results = process_land_to_visualization(
         results = process_land_to_visualization(
-            land_type=land_type,
+            land_types=parsed_land_types,  # 传递解析后的土地类型列表
             coefficient_params=coefficient_params,
             coefficient_params=coefficient_params,
             color_map_name=color_map_name,
             color_map_name=color_map_name,
             output_size=output_size,
             output_size=output_size,
@@ -161,30 +189,42 @@ async def recalculate_land_data(
             enable_interpolation=enable_interpolation,
             enable_interpolation=enable_interpolation,
             interpolation_method=interpolation_method,
             interpolation_method=interpolation_method,
             resolution_factor=resolution_factor,
             resolution_factor=resolution_factor,
-            save_csv=save_csv  # 将CSV生成选项传递给处理函数
+            save_csv=save_csv,  # 将CSV生成选项传递给处理函数
+            cleanup_temp_files=cleanup_temp_files  # 将清理选项传递给处理函数
         )
         )
 
 
         if not results:
         if not results:
-            logger.error(f"重新计算土地数据失败: {land_type}")
+            logger.error(f"重新计算土地数据失败: {land_types_str}")
             raise HTTPException(
             raise HTTPException(
                 status_code=500,
                 status_code=500,
-                detail=f"重新计算土地数据失败: {land_type}"
+                detail=f"重新计算土地数据失败: {land_types_str}"
             )
             )
 
 
         cleaned_csv, shapefile, tif_file, map_output, hist_output, used_coeff = results
         cleaned_csv, shapefile, tif_file, map_output, hist_output, used_coeff = results
         
         
-        # 检查关键文件是否存在(shapefile可以为None,因为使用的是内存处理
-        if not tif_file or not map_output or not hist_output:
-            logger.error(f"重新计算土地数据失败,关键文件缺失: {land_type}")
-            logger.error(f"GeoTIFF: {tif_file}, 地图: {map_output}, 直方图: {hist_output}")
+        # 检查关键输出文件是否存在(只检查最终输出的地图和直方图
+        if not map_output or not hist_output:
+            logger.error(f"重新计算土地数据失败,关键输出文件缺失: {land_types_str}")
+            logger.error(f"地图: {map_output}, 直方图: {hist_output}")
             raise HTTPException(
             raise HTTPException(
                 status_code=500,
                 status_code=500,
-                detail=f"重新计算土地数据失败: {land_type}"
+                detail=f"重新计算土地数据失败: {land_types_str}"
+            )
+        
+        # 验证输出文件实际存在
+        if not os.path.exists(map_output) or not os.path.exists(hist_output):
+            logger.error(f"生成的输出文件不存在: {land_types_str}")
+            logger.error(f"地图文件存在: {os.path.exists(map_output) if map_output else False}")
+            logger.error(f"直方图文件存在: {os.path.exists(hist_output) if hist_output else False}")
+            raise HTTPException(
+                status_code=500,
+                detail=f"生成的输出文件不存在: {land_types_str}"
             )
             )
 
 
-        # 定义默认路径
-        default_map_path = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
-        default_hist_path = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
+        # 定义默认路径 - 使用组合的土地类型名称
+        combined_land_type_name = '_'.join(parsed_land_types)
+        default_map_path = os.path.join(raster_dir, f"{combined_land_type_name}_Cd含量地图.jpg")
+        default_hist_path = os.path.join(raster_dir, f"{combined_land_type_name}_Cd含量直方图.jpg")
 
 
         # 移动文件到默认目录(覆盖旧文件)
         # 移动文件到默认目录(覆盖旧文件)
         shutil.move(map_output, default_map_path)
         shutil.move(map_output, default_map_path)
@@ -207,16 +247,27 @@ async def recalculate_land_data(
 
 
 @router.get("/statistics",
 @router.get("/statistics",
             summary="获取土地类型统计数据",
             summary="获取土地类型统计数据",
-            description="返回指定土地类型的Cd预测结果统计信息")
+            description="返回指定土地类型的Cd预测结果统计信息,支持多个土地类型用下划线连接")
 async def get_land_statistics_endpoint(
 async def get_land_statistics_endpoint(
-        land_type: str = Query("水田", description="土地类型,如:'水田'、'旱地'或'水浇地'")
+        land_type: str = Query("水田", description="土地类型,支持单个或多个(用下划线连接),如:'水田'、'旱地_水浇地'")
 ) -> JSONResponse:
 ) -> JSONResponse:
     """返回土地类型Cd预测结果的统计信息"""
     """返回土地类型Cd预测结果的统计信息"""
     try:
     try:
         logger.info(f"获取土地类型统计数据: {land_type}")
         logger.info(f"获取土地类型统计数据: {land_type}")
 
 
+        # 标准化土地类型顺序以匹配文件名
+        from ..services.water_service import standardize_land_types_order
+        if '_' in land_type:
+            # 多个土地类型,按标准顺序排列
+            types_list = land_type.split('_')
+            standardized_types = standardize_land_types_order(types_list)
+            standardized_land_type = '_'.join(standardized_types)
+        else:
+            # 单个土地类型
+            standardized_land_type = land_type.strip()
+
         # 调用服务层函数获取统计数据
         # 调用服务层函数获取统计数据
-        stats = get_land_statistics(land_type)
+        stats = get_land_statistics(standardized_land_type)
 
 
         if not stats:
         if not stats:
             logger.warning(f"未找到{land_type}的土地类型统计数据")
             logger.warning(f"未找到{land_type}的土地类型统计数据")

+ 46 - 0
app/services/land_use_service.py

@@ -216,6 +216,52 @@ class LandUseService:
         # 使用优化版本的方法
         # 使用优化版本的方法
         return self.get_land_centers_optimized(land_type)
         return self.get_land_centers_optimized(land_type)
     
     
+    def get_multiple_land_centers_for_processing(self, land_types: List[str]) -> Optional[pd.DataFrame]:
+        """
+        获取多个土地类型的中心点数据并合并
+        
+        @param land_types: 土地利用类型列表
+        @returns: 包含所有土地类型中心点坐标的合并DataFrame
+        """
+        try:
+            if not land_types:
+                self.logger.warning("土地类型列表为空")
+                return None
+            
+            all_dataframes = []
+            
+            for land_type in land_types:
+                self.logger.info(f"获取土地类型 '{land_type}' 的中心点数据")
+                df = self.get_land_centers_optimized(land_type)
+                
+                if df is not None and not df.empty:
+                    # 添加土地类型标识列
+                    df = df.copy()
+                    df['land_type'] = land_type
+                    all_dataframes.append(df)
+                    self.logger.info(f"获取到 {len(df)} 个 '{land_type}' 中心点")
+                else:
+                    self.logger.warning(f"未获取到 '{land_type}' 类型的数据")
+            
+            if not all_dataframes:
+                self.logger.warning(f"所有土地类型 {land_types} 都没有获取到数据")
+                return None
+            
+            # 合并所有DataFrame
+            combined_df = pd.concat(all_dataframes, ignore_index=True)
+            
+            # 重新排序列,确保land_type列在合适位置
+            cols = ['gid', 'center_lon', 'center_lat', 'dlmc', 'land_type', 'shape_area']
+            combined_df = combined_df[[col for col in cols if col in combined_df.columns]]
+            
+            self.logger.info(f"成功合并 {len(land_types)} 种土地类型,总计 {len(combined_df)} 个中心点")
+            
+            return combined_df
+            
+        except Exception as e:
+            self.logger.error(f"获取多个土地类型中心点数据失败: {str(e)}")
+            return None
+    
     def get_available_land_types(self) -> List[str]:
     def get_available_land_types(self) -> List[str]:
         """
         """
         获取数据库中可用的土地利用类型(带缓存)
         获取数据库中可用的土地利用类型(带缓存)

+ 381 - 37
app/services/water_service.py

@@ -4,12 +4,14 @@ import pandas as pd
 from pyproj import Transformer
 from pyproj import Transformer
 from shapely.geometry import Point
 from shapely.geometry import Point
 import rasterio
 import rasterio
-from typing import Optional, Dict, Any
+from typing import Optional, Dict, Any, List
 from datetime import datetime
 from datetime import datetime
 import numpy as np
 import numpy as np
 import logging
 import logging
 import shutil
 import shutil
 import sys
 import sys
+from sklearn.neighbors import BallTree
+from time import time
 
 
 # 导入MappingUtils
 # 导入MappingUtils
 from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow, dataframe_to_raster_workflow
 from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow, dataframe_to_raster_workflow
@@ -63,6 +65,28 @@ def get_base_dir():
         return os.path.dirname(os.path.abspath(__file__))
         return os.path.dirname(os.path.abspath(__file__))
 
 
 
 
+def standardize_land_types_order(land_types: List[str]) -> List[str]:
+    """
+    标准化土地类型顺序,确保文件名一致性
+    
+    @param land_types: 土地类型列表
+    @returns: 按标准顺序排序的土地类型列表
+    """
+    # 定义标准顺序
+    standard_order = ["水田", "旱地", "水浇地"]
+    
+    # 清理并标准化
+    cleaned_types = [lt.strip() for lt in land_types if lt.strip()]
+    
+    # 按标准顺序排序
+    standardized_types = sorted(
+        cleaned_types, 
+        key=lambda x: standard_order.index(x) if x in standard_order else 999
+    )
+    
+    return standardized_types
+
+
 def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
 def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
     """
     """
     直接从数据库获取边界数据作为GeoDataFrame
     直接从数据库获取边界数据作为GeoDataFrame
@@ -86,12 +110,121 @@ def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDat
     return None
     return None
 
 
 
 
+def find_nearest_sampling_points_optimized(land_centers_df: pd.DataFrame, 
+                                          sampling_points_df: pd.DataFrame) -> np.ndarray:
+    """
+    使用BallTree高效计算每个土地中心点的最近采样点
+    
+    @description: 使用空间索引优化最近邻搜索,将O(n×m)复杂度降低到O(n×log(m))
+    
+    @param land_centers_df: 土地中心点数据,包含center_lon和center_lat列
+    @param sampling_points_df: 采样点数据,包含经度和纬度列
+    @returns: 每个土地中心点对应的最近采样点索引数组
+    """
+    logger.info("开始构建空间索引优化最近邻搜索...")
+    
+    start_time = time()
+    
+    # 1. 准备采样点坐标数据(转换为弧度用于BallTree)
+    sampling_coords = np.radians(sampling_points_df[['经度', '纬度']].values)
+    
+    # 2. 构建BallTree空间索引
+    logger.info(f"构建BallTree索引,采样点数量: {len(sampling_coords)}")
+    tree = BallTree(sampling_coords, metric='haversine')
+    
+    # 3. 准备土地中心点坐标数据
+    land_coords = np.radians(land_centers_df[['center_lon', 'center_lat']].values)
+    
+    # 4. 批量查询最近邻(k=1表示只找最近的一个点)
+    logger.info(f"批量查询最近邻,土地中心点数量: {len(land_coords)}")
+    distances, indices = tree.query(land_coords, k=1)
+    
+    # 5. 提取索引(indices是二维数组,我们只需要第一列)
+    nearest_indices = indices.flatten()
+    
+    elapsed_time = time() - start_time
+    logger.info(f"空间索引搜索完成,耗时: {elapsed_time:.2f}秒")
+    logger.info(f"平均每个点查询时间: {elapsed_time/len(land_coords)*1000:.2f}毫秒")
+    
+    return nearest_indices
 
 
+def cleanup_temporary_files(*file_paths):
+    """
+    清理临时文件
+    
+    @description: 安全地删除指定的临时文件,支持多种文件类型
+    @param file_paths: 要删除的文件路径(可变参数)
+    """
+    import tempfile
+    
+    for file_path in file_paths:
+        if not file_path:
+            continue
+            
+        try:
+            if os.path.exists(file_path) and os.path.isfile(file_path):
+                os.remove(file_path)
+                logger.info(f"已清理临时文件: {os.path.basename(file_path)}")
+                
+                # 如果是shapefile,也删除相关的配套文件
+                if file_path.endswith('.shp'):
+                    base_path = os.path.splitext(file_path)[0]
+                    for ext in ['.shx', '.dbf', '.prj', '.cpg']:
+                        related_file = base_path + ext
+                        if os.path.exists(related_file):
+                            os.remove(related_file)
+                            logger.info(f"已清理相关文件: {os.path.basename(related_file)}")
+                            
+        except Exception as e:
+            logger.warning(f"清理文件失败 {file_path}: {str(e)}")
+
+
+def cleanup_temp_files_in_directory(directory: str, patterns: List[str] = None) -> int:
+    """
+    清理指定目录下的临时文件
+    
+    @description: 根据文件名模式清理目录中的临时文件
+    @param directory: 要清理的目录路径
+    @param patterns: 文件名模式列表,默认为['memory_raster_', 'temp_', 'tmp_']
+    @returns: 清理的文件数量
+    """
+    if patterns is None:
+        patterns = ['memory_raster_', 'temp_', 'tmp_']
+    
+    if not os.path.exists(directory) or not os.path.isdir(directory):
+        logger.warning(f"目录不存在或不是有效目录: {directory}")
+        return 0
+    
+    cleaned_count = 0
+    
+    try:
+        for filename in os.listdir(directory):
+            file_path = os.path.join(directory, filename)
+            
+            # 检查是否是文件
+            if not os.path.isfile(file_path):
+                continue
+                
+            # 检查文件名是否匹配任何模式
+            should_clean = any(pattern in filename for pattern in patterns)
+            
+            if should_clean:
+                try:
+                    os.remove(file_path)
+                    logger.info(f"已清理临时文件: {filename}")
+                    cleaned_count += 1
+                except Exception as e:
+                    logger.warning(f"清理文件失败 {filename}: {str(e)}")
+                    
+    except Exception as e:
+        logger.error(f"清理目录失败 {directory}: {str(e)}")
+        
+    return cleaned_count
 
 
 
 
 # 土地数据处理函数
 # 土地数据处理函数
 def process_land_data(land_type, coefficient_params=None, save_csv=True):
 def process_land_data(land_type, coefficient_params=None, save_csv=True):
-    """处理土地类型数据并生成清洗后的简化数据"""
+    """处理单个土地类型数据并生成清洗后的简化数据"""
     base_dir = get_base_dir()
     base_dir = get_base_dir()
     xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
     xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
 
 
@@ -107,7 +240,157 @@ def process_land_data(land_type, coefficient_params=None, save_csv=True):
         return None, None, None
         return None, None, None
 
 
     logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
     logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
+    logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
+
+    # 调用辅助函数完成处理
+    return complete_process_land_data(land_type, land_centers_df, coefficient_params, save_csv, base_dir)
+
+
+def process_multiple_land_data(land_types, coefficient_params=None, save_csv=True):
+    """
+    处理多个土地类型数据并生成合并的清洗后简化数据
+    
+    @param land_types: 土地类型列表
+    @param coefficient_params: 系数参数字典,格式为 {land_type: (param1, param2)}
+    @param save_csv: 是否保存CSV文件
+    @returns: (cleaned_data, combined_coeff_info, cleaned_csv_path)
+    """
+    base_dir = get_base_dir()
+    xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
+
+    logger.info(f"处理多个土地类型: {', '.join(land_types)}")
+    logger.info(f"Excel文件: {xls_file}")
+
+    # 从数据库获取多个土地类型的合并数据
+    logger.info(f"从数据库获取多个土地类型的土地利用数据...")
+    land_centers_df = land_use_service.get_multiple_land_centers_for_processing(land_types)
+    
+    if land_centers_df is None or land_centers_df.empty:
+        logger.warning(f"数据库中没有找到任何指定土地类型的数据: {land_types}")
+        return None, None, None
+
+    logger.info(f"从数据库获取到 {len(land_centers_df)} 个合并的土地数据点")
+    logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
+
+    # 读取Excel采样点数据
+    if not os.path.exists(xls_file):
+        logger.error(f"采样点Excel文件不存在: {xls_file}")
+        return None, None, None
+        
+    df_xls = pd.read_excel(xls_file)
+    logger.info(f"读取到 {len(df_xls)} 个采样点数据")
+
+    # 设置默认土地类型系数
+    default_params = {
+        "水田": (711, 0.524),
+        "水浇地": (427, 0.599),
+        "旱地": (200, 0.7)
+    }
+
+    params = coefficient_params or default_params
+    
+    # 为多个土地类型构建系数信息
+    combined_coeff_info = {}
+    
+    # 高效处理:使用空间索引查找最近采样点
+    logger.info("开始高效距离计算和Cd值计算...")
+    start_time = time()
+    
+    # 使用优化的空间索引方法查找最近采样点
+    nearest_indices = find_nearest_sampling_points_optimized(land_centers_df, df_xls)
+    
+    # 批量计算Cd含量值,按土地类型应用不同系数
+    centers = list(zip(land_centers_df['center_lon'], land_centers_df['center_lat']))
+    cd_values = []
+    
+    for i, (_, row) in enumerate(land_centers_df.iterrows()):
+        land_type = row['land_type']
+        param1, param2 = params.get(land_type, (200, 0.7))
+        Num = param1 * param2
+        
+        # 记录每种土地类型使用的系数
+        if land_type not in combined_coeff_info:
+            combined_coeff_info[land_type] = {
+                'param1': param1,
+                'param2': param2,
+                'multiplier': Num,
+                'count': 0
+            }
+        combined_coeff_info[land_type]['count'] += 1
+        
+        # 计算该点的Cd值
+        cd_value = df_xls.iloc[nearest_indices[i]]['Cd (ug/L)'] * Num
+        cd_values.append(cd_value)
+    
+    calculation_time = time() - start_time
+    logger.info(f"Cd值计算完成,耗时: {calculation_time:.2f}秒")
+    logger.info(f"处理了 {len(centers)} 个土地中心点")
+    
+    # 记录各土地类型的系数使用情况
+    for land_type, info in combined_coeff_info.items():
+        logger.info(f"{land_type}: 系数 {info['param1']} * {info['param2']} = {info['multiplier']}, 应用于 {info['count']} 个点")
+
+    # 创建简化数据DataFrame
+    simplified_data = pd.DataFrame({
+        'lon': [c[0] for c in centers],
+        'lat': [c[1] for c in centers],
+        'Prediction': cd_values,
+        'land_type': land_centers_df['land_type'].values  # 保留土地类型信息用于分析
+    })
+
+    # 应用3σ原则检测异常值
+    mean_value = simplified_data['Prediction'].mean()
+    std_value = simplified_data['Prediction'].std()
+    lower_bound = mean_value - 3 * std_value
+    upper_bound = mean_value + 3 * std_value
+
+    logger.info(f"合并数据Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
+    logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
+
+    # 识别异常值
+    outliers = simplified_data[
+        (simplified_data['Prediction'] < lower_bound) |
+        (simplified_data['Prediction'] > upper_bound)
+        ]
+    logger.info(f"检测到异常值数量: {len(outliers)}")
 
 
+    # 创建清洗后的数据
+    cleaned_data = simplified_data[
+        (simplified_data['Prediction'] >= lower_bound) &
+        (simplified_data['Prediction'] <= upper_bound)
+        ]
+    logger.info(f"清洗后数据点数: {len(cleaned_data)}")
+
+    # 可选:保存CSV文件用于兼容性和调试
+    cleaned_csv_path = None
+    if save_csv:
+        # 创建输出目录
+        data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
+        os.makedirs(data_dir, exist_ok=True)
+        logger.info(f"数据目录: {data_dir}")
+        
+        # 使用组合的土地类型名称
+        combined_type_name = '_'.join(land_types)
+        cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{combined_type_name}_清洗.csv")
+        
+        # 移除land_type列(仅用于内部处理),保持与原始格式兼容
+        cleaned_data_for_csv = cleaned_data[['lon', 'lat', 'Prediction']].copy()
+        cleaned_data_for_csv.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
+        logger.info(f"保存CSV: {cleaned_csv_path}")
+    else:
+        logger.info("跳过CSV文件生成(内存处理优化)")
+
+    # 返回清洗后的数据(移除land_type列保持兼容性)
+    final_cleaned_data = cleaned_data[['lon', 'lat', 'Prediction']].copy()
+    
+    return final_cleaned_data, combined_coeff_info, cleaned_csv_path
+
+
+# 继续完成原始的process_land_data函数
+def complete_process_land_data(land_type, land_centers_df, coefficient_params, save_csv, base_dir):
+    """完成单个土地类型的数据处理"""
+    xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
+    
     # 读取Excel采样点数据
     # 读取Excel采样点数据
     if not os.path.exists(xls_file):
     if not os.path.exists(xls_file):
         logger.error(f"采样点Excel文件不存在: {xls_file}")
         logger.error(f"采样点Excel文件不存在: {xls_file}")
@@ -128,26 +411,20 @@ def process_land_data(land_type, coefficient_params=None, save_csv=True):
     Num = param1 * param2
     Num = param1 * param2
     logger.info(f"系数: {param1} * {param2} = {Num}")
     logger.info(f"系数: {param1} * {param2} = {Num}")
 
 
-    # 处理每个面要素,使用数据库中的中心点坐标
-    cd_values = []
-    centers = []
+    # 高效处理:使用空间索引查找最近采样点
+    logger.info("开始高效距离计算和Cd值计算...")
+    start_time = time()
     
     
-    for index, row in land_centers_df.iterrows():
-        center_lon = row['center_lon']
-        center_lat = row['center_lat']
-        centers.append((center_lon, center_lat))
-
-        # 计算到所有采样点的距离
-        distances = df_xls.apply(
-            lambda x: Point(center_lon, center_lat).distance(Point(x['经度'], x['纬度'])),
-            axis=1
-        )
-        min_idx = distances.idxmin()
-        nearest = df_xls.loc[min_idx]
-
-        # 计算Cd含量值
-        cd_value = nearest['Cd (ug/L)'] * Num
-        cd_values.append(cd_value)
+    # 使用优化的空间索引方法查找最近采样点
+    nearest_indices = find_nearest_sampling_points_optimized(land_centers_df, df_xls)
+    
+    # 批量计算Cd含量值
+    centers = list(zip(land_centers_df['center_lon'], land_centers_df['center_lat']))
+    cd_values = df_xls.iloc[nearest_indices]['Cd (ug/L)'].values * Num
+    
+    calculation_time = time() - start_time
+    logger.info(f"Cd值计算完成,耗时: {calculation_time:.2f}秒")
+    logger.info(f"处理了 {len(centers)} 个土地中心点")
 
 
     # 创建简化数据DataFrame
     # 创建简化数据DataFrame
     simplified_data = pd.DataFrame({
     simplified_data = pd.DataFrame({
@@ -289,7 +566,7 @@ def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
         return None
         return None
 
 
 
 
-def process_land_to_visualization(land_type, coefficient_params=None,
+def process_land_to_visualization(land_types=None, land_type=None, coefficient_params=None,
                                   color_map_name="绿-黄-红-紫",
                                   color_map_name="绿-黄-红-紫",
                                   output_size=8,
                                   output_size=8,
                                   area: Optional[str] = None,
                                   area: Optional[str] = None,
@@ -297,21 +574,23 @@ def process_land_to_visualization(land_type, coefficient_params=None,
                                   enable_interpolation: Optional[bool] = True,
                                   enable_interpolation: Optional[bool] = True,
                                   interpolation_method: Optional[str] = "linear",
                                   interpolation_method: Optional[str] = "linear",
                                   resolution_factor: Optional[float] = 4.0,
                                   resolution_factor: Optional[float] = 4.0,
-                                  save_csv: Optional[bool] = True):
+                                  save_csv: Optional[bool] = True,
+                                  cleanup_temp_files: Optional[bool] = True):
     """
     """
-    完整的土地数据处理可视化流程(使用统一的MappingUtils接口,支持动态边界和插值控制)
+    完整的土地数据处理可视化流程(支持单个或多个土地类型,使用统一的MappingUtils接口,支持动态边界和插值控制)
     
     
     @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
     @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
-                  支持通过area和level参数动态控制地图边界,支持插值控制参数
+                  支持通过area和level参数动态控制地图边界,支持插值控制参数,支持多个土地类型合并处理
     
     
     工作流程:
     工作流程:
-    1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量
+    1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量(支持多个类型合并)
     2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
     2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
     3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
     3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
     4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
     4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
     5. 生成直方图 - 直接调用MappingUtils.create_histogram()
     5. 生成直方图 - 直接调用MappingUtils.create_histogram()
 
 
-    @param land_type: 土地类型(水田、旱地、水浇地)
+    @param land_types: 土地类型列表(支持多个,如['水田', '旱地']),优先使用此参数
+    @param land_type: 单个土地类型(向后兼容,如果land_types为None则使用此参数)
     @param coefficient_params: 可选的系数参数字典
     @param coefficient_params: 可选的系数参数字典
     @param color_map_name: 色彩方案名称(中文)
     @param color_map_name: 色彩方案名称(中文)
     @param output_size: 输出图片尺寸
     @param output_size: 输出图片尺寸
@@ -321,19 +600,43 @@ def process_land_to_visualization(land_type, coefficient_params=None,
     @param interpolation_method: 插值方法,nearest | linear | cubic,默认linear
     @param interpolation_method: 插值方法,nearest | linear | cubic,默认linear
     @param resolution_factor: 分辨率因子,默认4.0,越大分辨率越高
     @param resolution_factor: 分辨率因子,默认4.0,越大分辨率越高
     @param save_csv: 是否生成CSV文件,默认True
     @param save_csv: 是否生成CSV文件,默认True
+    @param cleanup_temp_files: 是否清理临时文件,默认True
     @returns: 包含所有生成文件路径的元组
     @returns: 包含所有生成文件路径的元组
     """
     """
     base_dir = get_base_dir()
     base_dir = get_base_dir()
-    logger.info(f"开始处理: {land_type}")
+    
+    # 处理参数兼容性 - 支持单个和多个土地类型
+    if land_types is not None:
+        # 使用新的多类型参数,并标准化顺序
+        input_types = land_types if isinstance(land_types, list) else [land_types]
+        actual_land_types = standardize_land_types_order(input_types)
+        logger.info(f"开始处理多个土地类型: {', '.join(actual_land_types)}")
+        is_multiple_types = len(actual_land_types) > 1
+    elif land_type is not None:
+        # 向后兼容单个类型参数
+        actual_land_types = [land_type.strip()]
+        logger.info(f"开始处理单个土地类型: {land_type}")
+        is_multiple_types = False
+    else:
+        raise ValueError("必须提供land_types或land_type参数")
 
 
     # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
     # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
-    cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
-        land_type, 
-        coefficient_params, 
-        save_csv=save_csv  # 根据参数决定是否生成CSV文件
-    )
+    if is_multiple_types:
+        # 使用多类型处理函数
+        cleaned_data, used_coeff, cleaned_csv_path = process_multiple_land_data(
+            actual_land_types, 
+            coefficient_params, 
+            save_csv=save_csv
+        )
+    else:
+        # 使用单类型处理函数
+        cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
+            actual_land_types[0], 
+            coefficient_params, 
+            save_csv=save_csv
+        )
     if cleaned_data is None:
     if cleaned_data is None:
-        logger.error(f"处理土地数据失败: {land_type}")
+        logger.error(f"处理土地数据失败: {', '.join(actual_land_types)}")
         return None, None, None, None, None, None
         return None, None, None, None, None, None
 
 
     # 2. 获取动态边界数据(提前获取用于栅格转换)
     # 2. 获取动态边界数据(提前获取用于栅格转换)
@@ -380,7 +683,9 @@ def process_land_to_visualization(land_type, coefficient_params=None,
     logger.info(f"生成栅格文件: {output_tif}")
     logger.info(f"生成栅格文件: {output_tif}")
 
 
     # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
     # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
-    map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
+    # 生成输出文件名 - 支持多个土地类型
+    combined_type_name = '_'.join(actual_land_types)
+    map_output = os.path.join(raster_dir, f"{combined_type_name}_Cd含量地图.jpg")
     
     
     try:
     try:
         # 创建MappingUtils实例并直接调用
         # 创建MappingUtils实例并直接调用
@@ -412,7 +717,7 @@ def process_land_to_visualization(land_type, coefficient_params=None,
         logger.error(f"栅格地图生成失败: {str(e)}")
         logger.error(f"栅格地图生成失败: {str(e)}")
 
 
     # 5. 生成直方图(直接使用统一的MappingUtils接口)
     # 5. 生成直方图(直接使用统一的MappingUtils接口)
-    hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
+    hist_output = os.path.join(raster_dir, f"{combined_type_name}_Cd含量直方图.jpg")
     
     
     try:
     try:
         # 直接调用统一绘图接口生成直方图
         # 直接调用统一绘图接口生成直方图
@@ -439,7 +744,39 @@ def process_land_to_visualization(land_type, coefficient_params=None,
     else:
     else:
         # 为了兼容性,生成路径(即使文件不存在)
         # 为了兼容性,生成路径(即使文件不存在)
         data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
         data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
-        cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
+        cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{combined_type_name}_清洗.csv")
+    
+    # 清理临时文件(如果启用)
+    if cleanup_temp_files:
+        logger.info("开始清理临时文件...")
+        
+        # 要清理的临时文件列表
+        temp_files_to_cleanup = []
+        
+        # 添加临时栅格文件(如果是memory_raster_开头的)
+        if output_tif and 'memory_raster_' in os.path.basename(output_tif):
+            temp_files_to_cleanup.append(output_tif)
+            
+        # 添加临时shapefile(如果存在且是临时文件)
+        temp_shapefile = workflow_result.get('shapefile')
+        if temp_shapefile and ('temp' in temp_shapefile.lower() or 'memory' in temp_shapefile.lower()):
+            temp_files_to_cleanup.append(temp_shapefile)
+        
+        # 如果不保存CSV,也清理CSV文件
+        if not save_csv and cleaned_csv_path and os.path.exists(cleaned_csv_path):
+            temp_files_to_cleanup.append(cleaned_csv_path)
+            
+        # 执行清理
+        if temp_files_to_cleanup:
+            cleanup_temporary_files(*temp_files_to_cleanup)
+            logger.info(f"已清理 {len(temp_files_to_cleanup)} 个临时文件")
+            
+            # 如果清理了栅格文件,将返回路径设为None以避免引用已删除的文件
+            if output_tif in temp_files_to_cleanup:
+                output_tif = None
+                logger.info("注意:临时栅格文件已被清理,返回的栅格路径为None")
+        else:
+            logger.info("没有临时文件需要清理")
     
     
     return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
     return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
 
 
@@ -544,6 +881,13 @@ def main():
     except Exception as e:
     except Exception as e:
         logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
         logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
     finally:
     finally:
+        # 清理临时文件
+        base_dir = get_base_dir()
+        raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
+        cleaned_count = cleanup_temp_files_in_directory(raster_dir)
+        if cleaned_count > 0:
+            logger.info(f"已清理 {cleaned_count} 个临时文件")
+        
         logger.info("处理完成")
         logger.info("处理完成")