Browse Source

优化水资源API,支持多个土地类型的处理

drggboy 3 days ago
parent
commit
a1c32f7d5b
3 changed files with 327 additions and 45 deletions
  1. 68 29
      app/api/water.py
  2. 46 0
      app/services/land_use_service.py
  3. 213 16
      app/services/water_service.py

+ 68 - 29
app/api/water.py

@@ -4,7 +4,7 @@ import os
 import logging
 import logging
 import tempfile
 import tempfile
 import shutil
 import shutil
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, List
 from datetime import datetime
 from datetime import datetime
 import json
 import json
 from pathlib import Path
 from pathlib import Path
@@ -46,18 +46,29 @@ def cleanup_temp_path(path: str) -> None:
 
 
 @router.get("/default-map",
 @router.get("/default-map",
             summary="获取默认地图图片",
             summary="获取默认地图图片",
-            description="返回指定土地类型的默认地图图片")
+            description="返回指定土地类型的默认地图图片,支持多个土地类型用下划线连接")
 async def get_default_map(
 async def get_default_map(
-        land_type: str = Query("水田", description="土地类型,如:'水田'、'旱地'或'水浇地'")
+        land_type: str = Query("水田", description="土地类型,支持单个或多个(用下划线连接),如:'水田'、'旱地_水浇地'")
 ) -> FileResponse:
 ) -> FileResponse:
     """返回默认地图图片"""
     """返回默认地图图片"""
     try:
     try:
         logger.info(f"获取默认地图: {land_type}")
         logger.info(f"获取默认地图: {land_type}")
 
 
+        # 标准化土地类型顺序以匹配文件名
+        from ..services.water_service import standardize_land_types_order
+        if '_' in land_type:
+            # 多个土地类型,按标准顺序排列
+            types_list = land_type.split('_')
+            standardized_types = standardize_land_types_order(types_list)
+            standardized_land_type = '_'.join(standardized_types)
+        else:
+            # 单个土地类型
+            standardized_land_type = land_type.strip()
+
         # 获取基础目录
         # 获取基础目录
         base_dir = get_base_dir()
         base_dir = get_base_dir()
         raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
         raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
-        map_path = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
+        map_path = os.path.join(raster_dir, f"{standardized_land_type}_Cd含量地图.jpg")
 
 
         if not os.path.exists(map_path):
         if not os.path.exists(map_path):
             logger.warning(f"默认地图文件不存在: {map_path}")
             logger.warning(f"默认地图文件不存在: {map_path}")
@@ -80,18 +91,29 @@ async def get_default_map(
 
 
 @router.get("/default-histogram",
 @router.get("/default-histogram",
             summary="获取默认直方图",
             summary="获取默认直方图",
-            description="返回指定土地类型的默认直方图图片")
+            description="返回指定土地类型的默认直方图图片,支持多个土地类型用下划线连接")
 async def get_default_histogram(
 async def get_default_histogram(
-        land_type: str = Query("水田", description="土地类型,如:'水田'、'旱地'或'水浇地'")
+        land_type: str = Query("水田", description="土地类型,支持单个或多个(用下划线连接),如:'水田'、'旱地_水浇地'")
 ) -> FileResponse:
 ) -> FileResponse:
     """返回默认直方图图片"""
     """返回默认直方图图片"""
     try:
     try:
         logger.info(f"获取默认直方图: {land_type}")
         logger.info(f"获取默认直方图: {land_type}")
 
 
+        # 标准化土地类型顺序以匹配文件名
+        from ..services.water_service import standardize_land_types_order
+        if '_' in land_type:
+            # 多个土地类型,按标准顺序排列
+            types_list = land_type.split('_')
+            standardized_types = standardize_land_types_order(types_list)
+            standardized_land_type = '_'.join(standardized_types)
+        else:
+            # 单个土地类型
+            standardized_land_type = land_type.strip()
+
         # 获取基础目录
         # 获取基础目录
         base_dir = get_base_dir()
         base_dir = get_base_dir()
         raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
         raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
-        hist_path = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
+        hist_path = os.path.join(raster_dir, f"{standardized_land_type}_Cd含量直方图.jpg")
 
 
         if not os.path.exists(hist_path):
         if not os.path.exists(hist_path):
             logger.warning(f"默认直方图文件不存在: {hist_path}")
             logger.warning(f"默认直方图文件不存在: {hist_path}")
@@ -114,10 +136,10 @@ async def get_default_histogram(
 
 
 @router.post("/calculate",
 @router.post("/calculate",
              summary="重新计算土地数据",
              summary="重新计算土地数据",
-             description="根据输入的土地类型和系数重新计算土地数据,支持通过area和level参数控制地图边界,支持插值控制")
+             description="根据输入的土地类型和系数重新计算土地数据,支持多个土地类型选择(复选框),支持通过area和level参数控制地图边界,支持插值控制")
 async def recalculate_land_data(
 async def recalculate_land_data(
         background_tasks: BackgroundTasks,
         background_tasks: BackgroundTasks,
-        land_type: str = Form(..., description="土地类型,如:'水田'、'旱地'或'水浇地'"),
+        land_types: List[str] = Form(..., description="土地类型,支持单个或多个选择,如:['水田']、['旱地', '水浇地']"),
         param1: float = Form(711, description="土地类型系数的第一个参数"),
         param1: float = Form(711, description="土地类型系数的第一个参数"),
         param2: float = Form(0.524, description="土地类型系数的第二个参数"),
         param2: float = Form(0.524, description="土地类型系数的第二个参数"),
         color_map_name: str = Form("绿-黄-红-紫", description="使用的色彩方案"),
         color_map_name: str = Form("绿-黄-红-紫", description="使用的色彩方案"),
@@ -130,9 +152,14 @@ async def recalculate_land_data(
         save_csv: Optional[bool] = Form(True, description="是否生成CSV文件,默认生成"),
         save_csv: Optional[bool] = Form(True, description="是否生成CSV文件,默认生成"),
         cleanup_temp_files: Optional[bool] = Form(True, description="是否清理临时文件,默认清理")
         cleanup_temp_files: Optional[bool] = Form(True, description="是否清理临时文件,默认清理")
 ) -> Dict[str, Any]:
 ) -> Dict[str, Any]:
-    """重新计算土地数据并返回结果路径,支持动态边界控制和插值控制"""
+    """重新计算土地数据并返回结果路径,支持多个土地类型和动态边界控制和插值控制"""
     try:
     try:
-        logger.info(f"重新计算土地数据: {land_type}")
+        # 处理土地类型列表 - 去除空值和空格,并标准化顺序
+        from ..services.water_service import standardize_land_types_order
+        parsed_land_types = standardize_land_types_order(land_types)
+        land_types_str = ', '.join(parsed_land_types)
+        
+        logger.info(f"重新计算土地数据: {land_types_str} (共{len(parsed_land_types)}种类型)")
         if area and level:
         if area and level:
             logger.info(f"使用动态边界: {area} ({level})")
             logger.info(f"使用动态边界: {area} ({level})")
         else:
         else:
@@ -146,14 +173,14 @@ async def recalculate_land_data(
         # 确保目录存在
         # 确保目录存在
         os.makedirs(raster_dir, exist_ok=True)
         os.makedirs(raster_dir, exist_ok=True)
 
 
-        # 构建系数参数
-        coefficient_params = {
-            land_type: (param1, param2)
-        }
+        # 构建系数参数 - 为所有土地类型应用相同的系数
+        coefficient_params = {}
+        for land_type in parsed_land_types:
+            coefficient_params[land_type] = (param1, param2)
 
 
-        # 调用统一的处理函数,CSV生成作为可选参数
+        # 调用统一的处理函数,支持多个土地类型,CSV生成作为可选参数
         results = process_land_to_visualization(
         results = process_land_to_visualization(
-            land_type=land_type,
+            land_types=parsed_land_types,  # 传递解析后的土地类型列表
             coefficient_params=coefficient_params,
             coefficient_params=coefficient_params,
             color_map_name=color_map_name,
             color_map_name=color_map_name,
             output_size=output_size,
             output_size=output_size,
@@ -167,36 +194,37 @@ async def recalculate_land_data(
         )
         )
 
 
         if not results:
         if not results:
-            logger.error(f"重新计算土地数据失败: {land_type}")
+            logger.error(f"重新计算土地数据失败: {land_types_str}")
             raise HTTPException(
             raise HTTPException(
                 status_code=500,
                 status_code=500,
-                detail=f"重新计算土地数据失败: {land_type}"
+                detail=f"重新计算土地数据失败: {land_types_str}"
             )
             )
 
 
         cleaned_csv, shapefile, tif_file, map_output, hist_output, used_coeff = results
         cleaned_csv, shapefile, tif_file, map_output, hist_output, used_coeff = results
         
         
         # 检查关键输出文件是否存在(只检查最终输出的地图和直方图)
         # 检查关键输出文件是否存在(只检查最终输出的地图和直方图)
         if not map_output or not hist_output:
         if not map_output or not hist_output:
-            logger.error(f"重新计算土地数据失败,关键输出文件缺失: {land_type}")
+            logger.error(f"重新计算土地数据失败,关键输出文件缺失: {land_types_str}")
             logger.error(f"地图: {map_output}, 直方图: {hist_output}")
             logger.error(f"地图: {map_output}, 直方图: {hist_output}")
             raise HTTPException(
             raise HTTPException(
                 status_code=500,
                 status_code=500,
-                detail=f"重新计算土地数据失败: {land_type}"
+                detail=f"重新计算土地数据失败: {land_types_str}"
             )
             )
         
         
         # 验证输出文件实际存在
         # 验证输出文件实际存在
         if not os.path.exists(map_output) or not os.path.exists(hist_output):
         if not os.path.exists(map_output) or not os.path.exists(hist_output):
-            logger.error(f"生成的输出文件不存在: {land_type}")
+            logger.error(f"生成的输出文件不存在: {land_types_str}")
             logger.error(f"地图文件存在: {os.path.exists(map_output) if map_output else False}")
             logger.error(f"地图文件存在: {os.path.exists(map_output) if map_output else False}")
             logger.error(f"直方图文件存在: {os.path.exists(hist_output) if hist_output else False}")
             logger.error(f"直方图文件存在: {os.path.exists(hist_output) if hist_output else False}")
             raise HTTPException(
             raise HTTPException(
                 status_code=500,
                 status_code=500,
-                detail=f"生成的输出文件不存在: {land_type}"
+                detail=f"生成的输出文件不存在: {land_types_str}"
             )
             )
 
 
-        # 定义默认路径
-        default_map_path = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
-        default_hist_path = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
+        # 定义默认路径 - 使用组合的土地类型名称
+        combined_land_type_name = '_'.join(parsed_land_types)
+        default_map_path = os.path.join(raster_dir, f"{combined_land_type_name}_Cd含量地图.jpg")
+        default_hist_path = os.path.join(raster_dir, f"{combined_land_type_name}_Cd含量直方图.jpg")
 
 
         # 移动文件到默认目录(覆盖旧文件)
         # 移动文件到默认目录(覆盖旧文件)
         shutil.move(map_output, default_map_path)
         shutil.move(map_output, default_map_path)
@@ -219,16 +247,27 @@ async def recalculate_land_data(
 
 
 @router.get("/statistics",
 @router.get("/statistics",
             summary="获取土地类型统计数据",
             summary="获取土地类型统计数据",
-            description="返回指定土地类型的Cd预测结果统计信息")
+            description="返回指定土地类型的Cd预测结果统计信息,支持多个土地类型用下划线连接")
 async def get_land_statistics_endpoint(
 async def get_land_statistics_endpoint(
-        land_type: str = Query("水田", description="土地类型,如:'水田'、'旱地'或'水浇地'")
+        land_type: str = Query("水田", description="土地类型,支持单个或多个(用下划线连接),如:'水田'、'旱地_水浇地'")
 ) -> JSONResponse:
 ) -> JSONResponse:
     """返回土地类型Cd预测结果的统计信息"""
     """返回土地类型Cd预测结果的统计信息"""
     try:
     try:
         logger.info(f"获取土地类型统计数据: {land_type}")
         logger.info(f"获取土地类型统计数据: {land_type}")
 
 
+        # 标准化土地类型顺序以匹配文件名
+        from ..services.water_service import standardize_land_types_order
+        if '_' in land_type:
+            # 多个土地类型,按标准顺序排列
+            types_list = land_type.split('_')
+            standardized_types = standardize_land_types_order(types_list)
+            standardized_land_type = '_'.join(standardized_types)
+        else:
+            # 单个土地类型
+            standardized_land_type = land_type.strip()
+
         # 调用服务层函数获取统计数据
         # 调用服务层函数获取统计数据
-        stats = get_land_statistics(land_type)
+        stats = get_land_statistics(standardized_land_type)
 
 
         if not stats:
         if not stats:
             logger.warning(f"未找到{land_type}的土地类型统计数据")
             logger.warning(f"未找到{land_type}的土地类型统计数据")

+ 46 - 0
app/services/land_use_service.py

@@ -216,6 +216,52 @@ class LandUseService:
         # 使用优化版本的方法
         # 使用优化版本的方法
         return self.get_land_centers_optimized(land_type)
         return self.get_land_centers_optimized(land_type)
     
     
+    def get_multiple_land_centers_for_processing(self, land_types: List[str]) -> Optional[pd.DataFrame]:
+        """
+        获取多个土地类型的中心点数据并合并
+        
+        @param land_types: 土地利用类型列表
+        @returns: 包含所有土地类型中心点坐标的合并DataFrame
+        """
+        try:
+            if not land_types:
+                self.logger.warning("土地类型列表为空")
+                return None
+            
+            all_dataframes = []
+            
+            for land_type in land_types:
+                self.logger.info(f"获取土地类型 '{land_type}' 的中心点数据")
+                df = self.get_land_centers_optimized(land_type)
+                
+                if df is not None and not df.empty:
+                    # 添加土地类型标识列
+                    df = df.copy()
+                    df['land_type'] = land_type
+                    all_dataframes.append(df)
+                    self.logger.info(f"获取到 {len(df)} 个 '{land_type}' 中心点")
+                else:
+                    self.logger.warning(f"未获取到 '{land_type}' 类型的数据")
+            
+            if not all_dataframes:
+                self.logger.warning(f"所有土地类型 {land_types} 都没有获取到数据")
+                return None
+            
+            # 合并所有DataFrame
+            combined_df = pd.concat(all_dataframes, ignore_index=True)
+            
+            # 重新排序列,确保land_type列在合适位置
+            cols = ['gid', 'center_lon', 'center_lat', 'dlmc', 'land_type', 'shape_area']
+            combined_df = combined_df[[col for col in cols if col in combined_df.columns]]
+            
+            self.logger.info(f"成功合并 {len(land_types)} 种土地类型,总计 {len(combined_df)} 个中心点")
+            
+            return combined_df
+            
+        except Exception as e:
+            self.logger.error(f"获取多个土地类型中心点数据失败: {str(e)}")
+            return None
+    
     def get_available_land_types(self) -> List[str]:
     def get_available_land_types(self) -> List[str]:
         """
         """
         获取数据库中可用的土地利用类型(带缓存)
         获取数据库中可用的土地利用类型(带缓存)

+ 213 - 16
app/services/water_service.py

@@ -65,6 +65,28 @@ def get_base_dir():
         return os.path.dirname(os.path.abspath(__file__))
         return os.path.dirname(os.path.abspath(__file__))
 
 
 
 
+def standardize_land_types_order(land_types: List[str]) -> List[str]:
+    """
+    标准化土地类型顺序,确保文件名一致性
+    
+    @param land_types: 土地类型列表
+    @returns: 按标准顺序排序的土地类型列表
+    """
+    # 定义标准顺序
+    standard_order = ["水田", "旱地", "水浇地"]
+    
+    # 清理并标准化
+    cleaned_types = [lt.strip() for lt in land_types if lt.strip()]
+    
+    # 按标准顺序排序
+    standardized_types = sorted(
+        cleaned_types, 
+        key=lambda x: standard_order.index(x) if x in standard_order else 999
+    )
+    
+    return standardized_types
+
+
 def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
 def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
     """
     """
     直接从数据库获取边界数据作为GeoDataFrame
     直接从数据库获取边界数据作为GeoDataFrame
@@ -202,7 +224,7 @@ def cleanup_temp_files_in_directory(directory: str, patterns: List[str] = None)
 
 
 # 土地数据处理函数
 # 土地数据处理函数
 def process_land_data(land_type, coefficient_params=None, save_csv=True):
 def process_land_data(land_type, coefficient_params=None, save_csv=True):
-    """处理土地类型数据并生成清洗后的简化数据"""
+    """处理单个土地类型数据并生成清洗后的简化数据"""
     base_dir = get_base_dir()
     base_dir = get_base_dir()
     xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
     xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
 
 
@@ -220,6 +242,155 @@ def process_land_data(land_type, coefficient_params=None, save_csv=True):
     logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
     logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
     logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
     logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
 
 
+    # 调用辅助函数完成处理
+    return complete_process_land_data(land_type, land_centers_df, coefficient_params, save_csv, base_dir)
+
+
+def process_multiple_land_data(land_types, coefficient_params=None, save_csv=True):
+    """
+    处理多个土地类型数据并生成合并的清洗后简化数据
+    
+    @param land_types: 土地类型列表
+    @param coefficient_params: 系数参数字典,格式为 {land_type: (param1, param2)}
+    @param save_csv: 是否保存CSV文件
+    @returns: (cleaned_data, combined_coeff_info, cleaned_csv_path)
+    """
+    base_dir = get_base_dir()
+    xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
+
+    logger.info(f"处理多个土地类型: {', '.join(land_types)}")
+    logger.info(f"Excel文件: {xls_file}")
+
+    # 从数据库获取多个土地类型的合并数据
+    logger.info(f"从数据库获取多个土地类型的土地利用数据...")
+    land_centers_df = land_use_service.get_multiple_land_centers_for_processing(land_types)
+    
+    if land_centers_df is None or land_centers_df.empty:
+        logger.warning(f"数据库中没有找到任何指定土地类型的数据: {land_types}")
+        return None, None, None
+
+    logger.info(f"从数据库获取到 {len(land_centers_df)} 个合并的土地数据点")
+    logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
+
+    # 读取Excel采样点数据
+    if not os.path.exists(xls_file):
+        logger.error(f"采样点Excel文件不存在: {xls_file}")
+        return None, None, None
+        
+    df_xls = pd.read_excel(xls_file)
+    logger.info(f"读取到 {len(df_xls)} 个采样点数据")
+
+    # 设置默认土地类型系数
+    default_params = {
+        "水田": (711, 0.524),
+        "水浇地": (427, 0.599),
+        "旱地": (200, 0.7)
+    }
+
+    params = coefficient_params or default_params
+    
+    # 为多个土地类型构建系数信息
+    combined_coeff_info = {}
+    
+    # 高效处理:使用空间索引查找最近采样点
+    logger.info("开始高效距离计算和Cd值计算...")
+    start_time = time()
+    
+    # 使用优化的空间索引方法查找最近采样点
+    nearest_indices = find_nearest_sampling_points_optimized(land_centers_df, df_xls)
+    
+    # 批量计算Cd含量值,按土地类型应用不同系数
+    centers = list(zip(land_centers_df['center_lon'], land_centers_df['center_lat']))
+    cd_values = []
+    
+    for i, (_, row) in enumerate(land_centers_df.iterrows()):
+        land_type = row['land_type']
+        param1, param2 = params.get(land_type, (200, 0.7))
+        Num = param1 * param2
+        
+        # 记录每种土地类型使用的系数
+        if land_type not in combined_coeff_info:
+            combined_coeff_info[land_type] = {
+                'param1': param1,
+                'param2': param2,
+                'multiplier': Num,
+                'count': 0
+            }
+        combined_coeff_info[land_type]['count'] += 1
+        
+        # 计算该点的Cd值
+        cd_value = df_xls.iloc[nearest_indices[i]]['Cd (ug/L)'] * Num
+        cd_values.append(cd_value)
+    
+    calculation_time = time() - start_time
+    logger.info(f"Cd值计算完成,耗时: {calculation_time:.2f}秒")
+    logger.info(f"处理了 {len(centers)} 个土地中心点")
+    
+    # 记录各土地类型的系数使用情况
+    for land_type, info in combined_coeff_info.items():
+        logger.info(f"{land_type}: 系数 {info['param1']} * {info['param2']} = {info['multiplier']}, 应用于 {info['count']} 个点")
+
+    # 创建简化数据DataFrame
+    simplified_data = pd.DataFrame({
+        'lon': [c[0] for c in centers],
+        'lat': [c[1] for c in centers],
+        'Prediction': cd_values,
+        'land_type': land_centers_df['land_type'].values  # 保留土地类型信息用于分析
+    })
+
+    # 应用3σ原则检测异常值
+    mean_value = simplified_data['Prediction'].mean()
+    std_value = simplified_data['Prediction'].std()
+    lower_bound = mean_value - 3 * std_value
+    upper_bound = mean_value + 3 * std_value
+
+    logger.info(f"合并数据Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
+    logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
+
+    # 识别异常值
+    outliers = simplified_data[
+        (simplified_data['Prediction'] < lower_bound) |
+        (simplified_data['Prediction'] > upper_bound)
+        ]
+    logger.info(f"检测到异常值数量: {len(outliers)}")
+
+    # 创建清洗后的数据
+    cleaned_data = simplified_data[
+        (simplified_data['Prediction'] >= lower_bound) &
+        (simplified_data['Prediction'] <= upper_bound)
+        ]
+    logger.info(f"清洗后数据点数: {len(cleaned_data)}")
+
+    # 可选:保存CSV文件用于兼容性和调试
+    cleaned_csv_path = None
+    if save_csv:
+        # 创建输出目录
+        data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
+        os.makedirs(data_dir, exist_ok=True)
+        logger.info(f"数据目录: {data_dir}")
+        
+        # 使用组合的土地类型名称
+        combined_type_name = '_'.join(land_types)
+        cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{combined_type_name}_清洗.csv")
+        
+        # 移除land_type列(仅用于内部处理),保持与原始格式兼容
+        cleaned_data_for_csv = cleaned_data[['lon', 'lat', 'Prediction']].copy()
+        cleaned_data_for_csv.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
+        logger.info(f"保存CSV: {cleaned_csv_path}")
+    else:
+        logger.info("跳过CSV文件生成(内存处理优化)")
+
+    # 返回清洗后的数据(移除land_type列保持兼容性)
+    final_cleaned_data = cleaned_data[['lon', 'lat', 'Prediction']].copy()
+    
+    return final_cleaned_data, combined_coeff_info, cleaned_csv_path
+
+
+# 继续完成原始的process_land_data函数
+def complete_process_land_data(land_type, land_centers_df, coefficient_params, save_csv, base_dir):
+    """完成单个土地类型的数据处理"""
+    xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
+    
     # 读取Excel采样点数据
     # 读取Excel采样点数据
     if not os.path.exists(xls_file):
     if not os.path.exists(xls_file):
         logger.error(f"采样点Excel文件不存在: {xls_file}")
         logger.error(f"采样点Excel文件不存在: {xls_file}")
@@ -395,7 +566,7 @@ def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
         return None
         return None
 
 
 
 
-def process_land_to_visualization(land_type, coefficient_params=None,
+def process_land_to_visualization(land_types=None, land_type=None, coefficient_params=None,
                                   color_map_name="绿-黄-红-紫",
                                   color_map_name="绿-黄-红-紫",
                                   output_size=8,
                                   output_size=8,
                                   area: Optional[str] = None,
                                   area: Optional[str] = None,
@@ -406,19 +577,20 @@ def process_land_to_visualization(land_type, coefficient_params=None,
                                   save_csv: Optional[bool] = True,
                                   save_csv: Optional[bool] = True,
                                   cleanup_temp_files: Optional[bool] = True):
                                   cleanup_temp_files: Optional[bool] = True):
     """
     """
-    完整的土地数据处理可视化流程(使用统一的MappingUtils接口,支持动态边界和插值控制)
+    完整的土地数据处理可视化流程(支持单个或多个土地类型,使用统一的MappingUtils接口,支持动态边界和插值控制)
     
     
     @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
     @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
-                  支持通过area和level参数动态控制地图边界,支持插值控制参数
+                  支持通过area和level参数动态控制地图边界,支持插值控制参数,支持多个土地类型合并处理
     
     
     工作流程:
     工作流程:
-    1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量
+    1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量(支持多个类型合并)
     2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
     2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
     3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
     3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
     4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
     4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
     5. 生成直方图 - 直接调用MappingUtils.create_histogram()
     5. 生成直方图 - 直接调用MappingUtils.create_histogram()
 
 
-    @param land_type: 土地类型(水田、旱地、水浇地)
+    @param land_types: 土地类型列表(支持多个,如['水田', '旱地']),优先使用此参数
+    @param land_type: 单个土地类型(向后兼容,如果land_types为None则使用此参数)
     @param coefficient_params: 可选的系数参数字典
     @param coefficient_params: 可选的系数参数字典
     @param color_map_name: 色彩方案名称(中文)
     @param color_map_name: 色彩方案名称(中文)
     @param output_size: 输出图片尺寸
     @param output_size: 输出图片尺寸
@@ -432,16 +604,39 @@ def process_land_to_visualization(land_type, coefficient_params=None,
     @returns: 包含所有生成文件路径的元组
     @returns: 包含所有生成文件路径的元组
     """
     """
     base_dir = get_base_dir()
     base_dir = get_base_dir()
-    logger.info(f"开始处理: {land_type}")
+    
+    # 处理参数兼容性 - 支持单个和多个土地类型
+    if land_types is not None:
+        # 使用新的多类型参数,并标准化顺序
+        input_types = land_types if isinstance(land_types, list) else [land_types]
+        actual_land_types = standardize_land_types_order(input_types)
+        logger.info(f"开始处理多个土地类型: {', '.join(actual_land_types)}")
+        is_multiple_types = len(actual_land_types) > 1
+    elif land_type is not None:
+        # 向后兼容单个类型参数
+        actual_land_types = [land_type.strip()]
+        logger.info(f"开始处理单个土地类型: {land_type}")
+        is_multiple_types = False
+    else:
+        raise ValueError("必须提供land_types或land_type参数")
 
 
     # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
     # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
-    cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
-        land_type, 
-        coefficient_params, 
-        save_csv=save_csv  # 根据参数决定是否生成CSV文件
-    )
+    if is_multiple_types:
+        # 使用多类型处理函数
+        cleaned_data, used_coeff, cleaned_csv_path = process_multiple_land_data(
+            actual_land_types, 
+            coefficient_params, 
+            save_csv=save_csv
+        )
+    else:
+        # 使用单类型处理函数
+        cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
+            actual_land_types[0], 
+            coefficient_params, 
+            save_csv=save_csv
+        )
     if cleaned_data is None:
     if cleaned_data is None:
-        logger.error(f"处理土地数据失败: {land_type}")
+        logger.error(f"处理土地数据失败: {', '.join(actual_land_types)}")
         return None, None, None, None, None, None
         return None, None, None, None, None, None
 
 
     # 2. 获取动态边界数据(提前获取用于栅格转换)
     # 2. 获取动态边界数据(提前获取用于栅格转换)
@@ -488,7 +683,9 @@ def process_land_to_visualization(land_type, coefficient_params=None,
     logger.info(f"生成栅格文件: {output_tif}")
     logger.info(f"生成栅格文件: {output_tif}")
 
 
     # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
     # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
-    map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
+    # 生成输出文件名 - 支持多个土地类型
+    combined_type_name = '_'.join(actual_land_types)
+    map_output = os.path.join(raster_dir, f"{combined_type_name}_Cd含量地图.jpg")
     
     
     try:
     try:
         # 创建MappingUtils实例并直接调用
         # 创建MappingUtils实例并直接调用
@@ -520,7 +717,7 @@ def process_land_to_visualization(land_type, coefficient_params=None,
         logger.error(f"栅格地图生成失败: {str(e)}")
         logger.error(f"栅格地图生成失败: {str(e)}")
 
 
     # 5. 生成直方图(直接使用统一的MappingUtils接口)
     # 5. 生成直方图(直接使用统一的MappingUtils接口)
-    hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
+    hist_output = os.path.join(raster_dir, f"{combined_type_name}_Cd含量直方图.jpg")
     
     
     try:
     try:
         # 直接调用统一绘图接口生成直方图
         # 直接调用统一绘图接口生成直方图
@@ -547,7 +744,7 @@ def process_land_to_visualization(land_type, coefficient_params=None,
     else:
     else:
         # 为了兼容性,生成路径(即使文件不存在)
         # 为了兼容性,生成路径(即使文件不存在)
         data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
         data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
-        cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
+        cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{combined_type_name}_清洗.csv")
     
     
     # 清理临时文件(如果启用)
     # 清理临时文件(如果启用)
     if cleanup_temp_files:
     if cleanup_temp_files: