Browse Source

Merge branch 'zsj_0816' of Ding/AcidMap into master

Ding 5 days ago
parent
commit
25ed84c627

+ 31 - 33
Water/Python_codes/Calculate.py

@@ -1,33 +1,35 @@
 import os
-import geopandas as gpd
+import sys
 import pandas as pd
-from pyproj import Transformer
 from shapely.geometry import Point
 import numpy as np
+from pathlib import Path
 
+# 添加项目根目录到Python路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.append(str(project_root))
 
+# 导入数据库服务
+from app.services.land_use_service import land_use_service
 
-# 读取 Shp 文件
-shp_file = r"..\Raster\四县三种用电.shp"
-gdf_shp = gpd.read_file(shp_file)
-
+# 设置要处理的土地类型
 str = "旱地"
-# 筛选出字段 DLMC 为指定类型的面要素
-gdf_shp = gdf_shp[gdf_shp['DLMC'] == str]
 
-# 定义坐标系转换对象(从原始坐标系转换为WGS84)
-transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True)
+# 从数据库获取土地利用数据,替代原来的SHP文件读取
+print(f"从数据库获取 '{str}' 类型的土地利用数据...")
+land_centers_df = land_use_service.get_land_centers_for_processing(str)
+
+if land_centers_df is None or land_centers_df.empty:
+    print(f"数据库中没有找到 '{str}' 类型的土地利用数据")
+    exit(1)
+
+print(f"从数据库获取到 {len(land_centers_df)} 个 '{str}' 类型的土地数据")
 
 # 读取 xls 文件
 xls_file = r"..\Data\Irrigation_water_SamplingPoint.xlsx"
 df_xls = pd.read_excel(xls_file)
 
-# 新建重金属含量字段,指定数据类型为双精度浮点数
-gdf_shp['Cd'] = pd.Series(dtype=np.float64)
-gdf_shp['Hg'] = pd.Series(dtype=np.float64)
-gdf_shp['Cr'] = pd.Series(dtype=np.float64)
-gdf_shp['Pb'] = pd.Series(dtype=np.float64)
-gdf_shp['As'] = pd.Series(dtype=np.float64)
+# 不再需要为GeoDataFrame添加字段,直接处理数据即可
 
 # 根据面要素类型设置系数
 A = 711 * 0.524
@@ -45,11 +47,11 @@ else:
 # 创建一个空列表来存储数据行
 data_rows = []
 
-# 遍历面要素
-for index, row in gdf_shp.iterrows():
-    # 获取面要素的中心并转换坐标
-    center_original = row['geometry'].centroid
-    center_lon, center_lat = transformer.transform(center_original.x, center_original.y)
+# 遍历面要素,使用数据库中的中心点坐标
+for index, row in land_centers_df.iterrows():
+    # 直接使用数据库中已经转换好的WGS84坐标
+    center_lon = row['center_lon']
+    center_lat = row['center_lat']
     center_transformed = Point(center_lon, center_lat)
     
     # 计算面要素中心与采样点的距离
@@ -59,24 +61,17 @@ for index, row in gdf_shp.iterrows():
     min_distance_index = distances.idxmin()
     nearest_sample = df_xls.loc[min_distance_index]
     
-    # 将最近采样点的值赋给面要素,并乘以系数Num
+    # 计算最近采样点的重金属含量,并乘以系数Num
     cd_value = nearest_sample['Cd (ug/L)'] * Num
     hg_value = nearest_sample['Hg (ug/L)'] * Num
     cr_value = nearest_sample['Cr (ug/L)'] * Num
     pb_value = nearest_sample['Pb (ug/L)'] * Num
     as_value = nearest_sample['As (ug/L)'] * Num
     
-    # 更新面要素的重金属含量
-    gdf_shp.at[index, 'Cd'] = cd_value
-    gdf_shp.at[index, 'Hg'] = hg_value
-    gdf_shp.at[index, 'Cr'] = cr_value
-    gdf_shp.at[index, 'Pb'] = pb_value
-    gdf_shp.at[index, 'As'] = as_value
-    
     # 将数据行添加到列表中
     data_rows.append({
-        '面要素ID': index,
-        'DLMC': row['DLMC'],
+        '面要素ID': row['gid'],  # 使用数据库中的gid
+        'DLMC': row['dlmc'],    # 使用数据库中的dlmc字段
         '中心点经度': center_lon,
         '中心点纬度': center_lat,
         '最近采样点ID': nearest_sample.name,
@@ -92,8 +87,11 @@ for index, row in gdf_shp.iterrows():
 # 从列表创建DataFrame
 csv_data = pd.DataFrame(data_rows)
 
-# 保存结果为CSV文件 - 修复路径中的转义字符问题
-csv_file_path = os.path.join('D:\土壤-大创\Water\Data\四县三用地&旱地.csv')
+# 保存结果为CSV文件
+output_dir = Path(__file__).parent.parent / "Data"
+output_dir.mkdir(exist_ok=True)
+csv_file_path = output_dir / f"四县三用地&{str}_DB.csv"
 csv_data.to_csv(csv_file_path, index=False, encoding='utf-8-sig')
 
 print(f"CSV文件已保存至: {csv_file_path}")
+print(f"总共处理了 {len(csv_data)} 个 {str} 面要素")

+ 27 - 27
Water/Python_codes/Counter.py

@@ -1,28 +1,29 @@
 import os
+import sys
 import pandas as pd
-from pyproj import Transformer
 from shapely.geometry import Point
 import numpy as np
+from pathlib import Path
 
+# 添加项目根目录到Python路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.append(str(project_root))
 
+# 导入数据库服务
+from app.services.land_use_service import land_use_service
 
-# 读取耕地中心点CSV文件
-csv_cultivated_land = r"..\Data\四县三用地&水田.csv"
-df_land = pd.read_csv(csv_cultivated_land)
+# 设置要处理的土地类型
+land_type = "旱地"  # 避免使用str作为变量名,因为它是Python内置函数
 
-# 确保CSV文件包含必要的字段
-required_fields = ['中心点经度', '中心点纬度', 'DLMC']
-for field in required_fields:
-    if field not in df_land.columns:
-        raise ValueError(f"耕地CSV文件缺少必要的字段: {field}")
+# 从数据库获取土地利用数据,替代原来的CSV文件读取
+print(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
+land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
 
-# 筛选出旱地
-land_type = "旱地"  # 避免使用str作为变量名,因为它是Python内置函数
-df_land = df_land[df_land['DLMC'] == land_type]
+if land_centers_df is None or land_centers_df.empty:
+    print(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
+    exit(1)
 
-# 定义坐标系转换(假设输入已是WGS84,根据实际情况修改)
-source_crs = "EPSG:4326"
-transformer = Transformer.from_crs(source_crs, "EPSG:4326", always_xy=True)
+print(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
 
 # 读取采样点数据
 xls_file = r"..\Data\Irrigation_water_SamplingPoint.xlsx"
@@ -45,15 +46,11 @@ else:
 # 存储结果的列表
 data_rows = []
 
-# 遍历耕地中心点
-for index, row in df_land.iterrows():
-    # 获取正确的坐标字段
-    lon = row['中心点经度']
-    lat = row['中心点纬度']
-    
-    # 坐标转换(如果需要)
-    if source_crs != "EPSG:4326":
-        lon, lat = transformer.transform(lon, lat)
+# 遍历耕地中心点,使用数据库中的中心点坐标
+for index, row in land_centers_df.iterrows():
+    # 直接使用数据库中已经转换好的WGS84坐标
+    lon = row['center_lon']
+    lat = row['center_lat']
     
     # 创建中心点几何对象
     center_point = Point(lon, lat)
@@ -73,7 +70,7 @@ for index, row in df_land.iterrows():
     
     # 添加到结果列表
     data_rows.append({
-        'DLMC': row['DLMC'],
+        'DLMC': row['dlmc'],  # 使用数据库中的字段名
         '中心点经度': lon,
         '中心点纬度': lat,
         'Cd (ug/L)': cd_value
@@ -83,7 +80,10 @@ for index, row in df_land.iterrows():
 csv_data = pd.DataFrame(data_rows)
 
 # 保存结果
-csv_file_path = r"D:\土壤-大创\Water\Data\旱地_Cd.csv"
+output_dir = Path(__file__).parent.parent / "Data"
+output_dir.mkdir(exist_ok=True)
+csv_file_path = output_dir / f"{land_type}_Cd_DB.csv"
 csv_data.to_csv(csv_file_path, index=False, encoding='utf-8-sig')
 
-print(f"CSV文件已保存至: {csv_file_path}")
+print(f"CSV文件已保存至: {csv_file_path}")
+print(f"总共处理了 {len(csv_data)} 个 {land_type} 面要素")

+ 41 - 10
Water/Python_codes/point_match.py

@@ -118,18 +118,48 @@ def write_csv(data, output_file):
         print(f"错误: 写入文件 {output_file} 时发生错误: {e}")
 
 def main():
-    # 配置文件路径
-    farmland_file = r'D:\17417\Documents\backend\Water\Data\四县三用地&水浇地.csv'  # 耕地中心点数据文件
-    sample_file = r'D:\17417\Documents\backend\Water\Data\SamplingPoint.csv'    # 采样点数据文件
-    output_file = r'D:\17417\Documents\backend\Water\Data\matched_data.csv'  # 输出文件
+    """主函数 - 使用数据库数据进行点匹配"""
+    import sys
+    from pathlib import Path
+    
+    # 添加项目根目录到Python路径
+    project_root = Path(__file__).parent.parent.parent
+    sys.path.append(str(project_root))
+    
+    # 导入数据库服务
+    from app.services.land_use_service import land_use_service
+    
+    # 设置要处理的土地类型
+    land_type = "水浇地"
+    
+    # 从数据库获取土地利用数据
+    print(f"正在从数据库读取 {land_type} 数据...")
+    land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
     
-    # 读取数据
-    print("正在读取耕地数据...")
-    farmland_data = read_csv(farmland_file)
-    print(f"已读取 {len(farmland_data)} 条耕地记录")
+    if land_centers_df is None or land_centers_df.empty:
+        print(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
+        return
+    
+    print(f"已从数据库读取 {len(land_centers_df)} 条 {land_type} 记录")
+    
+    # 转换为字典格式以兼容现有函数
+    farmland_data = []
+    for _, row in land_centers_df.iterrows():
+        farmland_data.append({
+            'id': row['gid'],
+            '中心点经度': row['center_lon'],
+            '中心点纬度': row['center_lat'],
+            'DLMC': row['dlmc']
+        })
+    
+    # 配置文件路径
+    sample_file = Path(__file__).parent.parent / "Data" / "SamplingPoint.csv"
+    output_dir = Path(__file__).parent.parent / "Data"
+    output_dir.mkdir(exist_ok=True)
+    output_file = output_dir / f"matched_data_{land_type}_DB.csv"
     
     print("正在读取采样点数据...")
-    sample_data = read_csv(sample_file)
+    sample_data = read_csv(str(sample_file))
     print(f"已读取 {len(sample_data)} 条采样点记录")
     
     if not farmland_data or not sample_data:
@@ -142,7 +172,8 @@ def main():
     
     # 写入结果
     print("正在写入结果...")
-    write_csv(merged_data, output_file)
+    write_csv(merged_data, str(output_file))
+    print(f"总共处理了 {len(merged_data)} 个 {land_type} 面要素")
 
 if __name__ == "__main__":
     main()

+ 148 - 0
app/api/agricultural_input.py

@@ -4,9 +4,11 @@
 """
 
 from fastapi import APIRouter, HTTPException, Query, Body
+from fastapi.responses import FileResponse
 from pydantic import BaseModel, Field
 from typing import Dict, Any, Optional
 import logging
+import os
 from ..services.agricultural_input_service import AgriculturalInputService
 
 router = APIRouter()
@@ -15,6 +17,18 @@ router = APIRouter()
 # 数据模型定义
 # =============================================================================
 
+class VisualizationResponse(BaseModel):
+    """
+    可视化结果响应模型
+    
+    @description: 绘图接口的响应格式
+    """
+    success: bool = Field(..., description="是否成功")
+    message: str = Field(..., description="响应消息")
+    data: Optional[Dict[str, Any]] = Field(None, description="可视化结果数据")
+    files: Optional[Dict[str, str]] = Field(None, description="生成的文件路径")
+
+
 class CdFluxCalculationRequest(BaseModel):
     """
     农业投入Cd通量计算请求模型
@@ -259,4 +273,138 @@ async def get_calculation_formula() -> Dict[str, Any]:
         raise HTTPException(
             status_code=500, 
             detail=f"获取失败: {str(e)}"
+        )
+
+
+# =============================================================================
+# 农业投入Cd通量可视化接口
+# =============================================================================
+
+@router.get("/visualize",
+           summary="生成农业投入Cd通量可视化图表",
+           description="计算农业投入Cd通量并生成栅格地图,参数固定使用韶关数据,area用于地图边界")
+async def visualize_agricultural_input(
+    area: str = Query(..., description="地区名称,如:乐昌市(用于地图边界,参数固定使用韶关)"),
+    level: str = Query(..., description="行政层级,必须为: county | city | province"),
+    colormap: str = Query("green_yellow_red_purple", description="色彩方案"),
+    resolution_factor: float = Query(4.0, description="分辨率因子(默认4.0,更快)"),
+    enable_interpolation: bool = Query(False, description="是否启用空间插值(默认关闭以提升性能)"),
+    cleanup_intermediate: bool = Query(True, description="是否清理中间文件(默认是)")
+):
+    """
+    生成农业投入Cd通量可视化图表
+    
+    @param area: 地区名称(用于地图边界,参数数据固定使用韶关)
+    @param level: 行政层级
+    @returns: 栅格地图文件
+    
+    功能包括:
+    1. 计算农业投入Cd通量(参数固定使用韶关)
+    2. 生成指定地区边界的栅格地图
+    3. 直接返回图片文件
+    """
+    try:
+        service = AgriculturalInputService()
+
+        # 行政层级校验(不允许模糊)
+        if level not in ("county", "city", "province"):
+            raise HTTPException(status_code=400, detail="参数 level 必须为 'county' | 'city' | 'province'")
+        
+        # 计算农业投入Cd通量(使用韶关参数,area仅用于边界)
+        calc_result = service.calculate_cd_flux_for_visualization(area)
+        
+        if not calc_result["success"]:
+            raise HTTPException(
+                status_code=404, 
+                detail=calc_result["message"]
+            )
+        
+        # 获取包含坐标的结果数据
+        results_with_coords = service.get_coordinates_for_results(calc_result["data"], area)
+        
+        if not results_with_coords:
+            raise HTTPException(
+                status_code=404,
+                detail=f"未找到地区 '{area}' 的坐标数据,无法生成可视化"
+            )
+        
+        # 创建可视化
+        visualization_files = service.create_agricultural_input_visualization(
+            area=area,
+            level=level,
+            calculation_type="agricultural_input",
+            results_with_coords=results_with_coords,
+            colormap=colormap,
+            resolution_factor=resolution_factor,
+            enable_interpolation=enable_interpolation,
+            cleanup_intermediate=cleanup_intermediate
+        )
+        
+        # 检查地图文件是否生成成功
+        map_file = visualization_files.get("map")
+        if not map_file or not os.path.exists(map_file):
+            raise HTTPException(status_code=500, detail="地图文件生成失败")
+        
+        return FileResponse(
+            path=map_file,
+            filename=f"{area}_agricultural_input_cd_flux_map.jpg",
+            media_type="image/jpeg"
+        )
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"生成地区 '{area}' 的农业投入Cd通量可视化失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"可视化生成失败: {str(e)}"
+        )
+
+
+@router.get("/export-data",
+           summary="导出农业投入Cd通量计算数据",
+           description="导出指定地区的农业投入Cd通量计算结果为CSV文件")
+async def export_agricultural_input_data(
+    area: str = Query(..., description="地区名称,如:韶关")
+) -> Dict[str, Any]:
+    """
+    导出农业投入Cd通量计算数据
+    
+    @param area: 地区名称
+    @returns: 导出结果和文件路径
+    """
+    try:
+        service = AgriculturalInputService()
+        
+        # 计算农业投入Cd通量(使用韶关参数,area仅用于边界)
+        calc_result = service.calculate_cd_flux_for_visualization(area)
+        
+        if not calc_result["success"]:
+            raise HTTPException(
+                status_code=404, 
+                detail=calc_result["message"]
+            )
+        
+        # 导出数据
+        csv_path = service.export_results_to_csv(calc_result["data"])
+        
+        return {
+            "success": True,
+            "message": f"地区 '{area}' 的农业投入Cd通量数据导出成功",
+            "data": {
+                "area": area,
+                "calculation_type": "agricultural_input",
+                "exported_file": csv_path,
+                "total_cd_flux": calc_result["data"]["total_cd_flux"],
+                "unit": calc_result["data"]["unit"]
+            }
+        }
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"导出地区 '{area}' 的农业投入Cd通量数据失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"数据导出失败: {str(e)}"
         ) 

+ 10 - 6
app/api/cd_flux.py

@@ -19,13 +19,17 @@ DEFAULT_MAP_FILENAME = "fluxcd_input_map.jpg"
 DEFAULT_HIST_FILENAME = "fluxcd_input_histogram.jpg"
 DEFAULT_CSV_FILENAME = "fluxcd_input.csv"
 LATEST_RESULT_FILENAME = "latest_result.json"
-# 配置日志
+# 配置日志 - 避免重复日志输出
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-handler = logging.StreamHandler()
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-handler.setFormatter(formatter)
-logger.addHandler(handler)
+# 避免重复添加处理器导致重复日志
+if not logger.handlers:
+    logger.setLevel(logging.INFO)
+    handler = logging.StreamHandler()
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    # 关闭日志传播,避免与父级日志处理器重复输出
+    logger.propagate = False
 
 router = APIRouter()
 

+ 42 - 12
app/api/water.py

@@ -16,13 +16,17 @@ from ..services.water_service import (
     get_base_dir
 )
 
-# 配置日志
+# 配置日志 - 避免重复日志输出
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-handler = logging.StreamHandler()
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-handler.setFormatter(formatter)
-logger.addHandler(handler)
+# 避免重复添加处理器导致重复日志
+if not logger.handlers:
+    logger.setLevel(logging.INFO)
+    handler = logging.StreamHandler()
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    # 关闭日志传播,避免与父级日志处理器重复输出
+    logger.propagate = False
 
 router = APIRouter()
 
@@ -110,18 +114,29 @@ async def get_default_histogram(
 
 @router.post("/calculate",
              summary="重新计算土地数据",
-             description="根据输入的土地类型和系数重新计算土地数据")
+             description="根据输入的土地类型和系数重新计算土地数据,支持通过area和level参数控制地图边界,支持插值控制")
 async def recalculate_land_data(
         background_tasks: BackgroundTasks,
         land_type: str = Form(..., description="土地类型,如:'水田'、'旱地'或'水浇地'"),
         param1: float = Form(711, description="土地类型系数的第一个参数"),
         param2: float = Form(0.524, description="土地类型系数的第二个参数"),
         color_map_name: str = Form("绿-黄-红-紫", description="使用的色彩方案"),
-        output_size: int = Form(8, description="输出图片的尺寸")
+        output_size: int = Form(8, description="输出图片的尺寸"),
+        area: Optional[str] = Form(None, description="可选的地区名称,如:'乐昌市',用于动态控制地图边界"),
+        level: Optional[str] = Form(None, description="可选的行政层级,必须为: county | city | province"),
+        enable_interpolation: Optional[bool] = Form(False, description="是否启用空间插值,默认启用"),
+        interpolation_method: Optional[str] = Form("linear", description="插值方法: nearest | linear | cubic"),
+        resolution_factor: Optional[float] = Form(4.0, description="分辨率因子,默认4.0,越大分辨率越高"),
+        save_csv: Optional[bool] = Form(True, description="是否生成CSV文件,默认生成")
 ) -> Dict[str, Any]:
-    """重新计算土地数据并返回结果路径"""
+    """重新计算土地数据并返回结果路径,支持动态边界控制和插值控制"""
     try:
         logger.info(f"重新计算土地数据: {land_type}")
+        if area and level:
+            logger.info(f"使用动态边界: {area} ({level})")
+        else:
+            logger.info("使用默认边界")
+        logger.info(f"插值设置: 启用={enable_interpolation}, 方法={interpolation_method}, 分辨率因子={resolution_factor}")
 
         # 获取默认目录
         base_dir = get_base_dir()
@@ -135,15 +150,21 @@ async def recalculate_land_data(
             land_type: (param1, param2)
         }
 
-        # 调用服务层函数处理土地数据
+        # 调用统一的处理函数,CSV生成作为可选参数
         results = process_land_to_visualization(
             land_type=land_type,
             coefficient_params=coefficient_params,
             color_map_name=color_map_name,
-            output_size=output_size
+            output_size=output_size,
+            area=area,
+            level=level,
+            enable_interpolation=enable_interpolation,
+            interpolation_method=interpolation_method,
+            resolution_factor=resolution_factor,
+            save_csv=save_csv  # 将CSV生成选项传递给处理函数
         )
 
-        if not results or any(r is None for r in results):
+        if not results:
             logger.error(f"重新计算土地数据失败: {land_type}")
             raise HTTPException(
                 status_code=500,
@@ -151,6 +172,15 @@ async def recalculate_land_data(
             )
 
         cleaned_csv, shapefile, tif_file, map_output, hist_output, used_coeff = results
+        
+        # 检查关键文件是否存在(shapefile可以为None,因为使用的是内存处理)
+        if not tif_file or not map_output or not hist_output:
+            logger.error(f"重新计算土地数据失败,关键文件缺失: {land_type}")
+            logger.error(f"GeoTIFF: {tif_file}, 地图: {map_output}, 直方图: {hist_output}")
+            raise HTTPException(
+                status_code=500,
+                detail=f"重新计算土地数据失败: {land_type}"
+            )
 
         # 定义默认路径
         default_map_path = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")

+ 5 - 6
app/database.py

@@ -5,12 +5,11 @@ import os
 from dotenv import load_dotenv # type: ignore
 import logging
 from sqlalchemy.exc import SQLAlchemyError
-import logging
-# 开启SQLAlchemy的SQL执行日志(会打印所有执行的SQL语句和错误)
-logging.basicConfig()
-logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
-# 配置日志
-logging.basicConfig(level=logging.INFO)
+# 配置日志系统 - 检查是否已经配置过,避免重复配置
+if not logging.getLogger().handlers:
+    logging.basicConfig(level=logging.INFO)
+# 关闭SQLAlchemy的详细SQL执行日志,只保留错误日志
+logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING)
 logger = logging.getLogger(__name__)
 
 # 创建Base类

+ 3 - 2
app/main.py

@@ -6,8 +6,9 @@ import logging
 import sys
 import os
 
-# 设置日志
-logging.basicConfig(level=logging.INFO)
+# 设置日志 - 检查是否已经配置过,避免重复配置
+if not logging.getLogger().handlers:
+    logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
 def safe_create_tables():

+ 1 - 0
app/models/__init__.py

@@ -21,3 +21,4 @@ from app.models.agricultural import *
 from app.models.cross_section import *
 from app.models.province import *
 from app.models.city import *
+from app.models.land_use import *  # 导入四县三种用电土地利用类型模型

+ 90 - 0
app/models/land_use.py

@@ -0,0 +1,90 @@
+# coding: utf-8
+"""
+四县三种用电土地利用类型模型
+Four County Three Type Land Use Model
+"""
+
+from sqlalchemy import Column, Integer, String, Float, Numeric
+from geoalchemy2.types import Geometry
+from app.models.base import Base
+
+
+class FourCountyLandUse(Base):
+    """
+    四县三种用电土地利用类型表
+    Four County Three Type Land Use Table
+    """
+    __tablename__ = 'four_county_land'
+    __table_args__ = {'comment': '四县三种用电土地利用类型数据'}
+
+    # 主键
+    gid = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')
+    
+    # 对象标识
+    objectid = Column(Integer, name='objectid', comment='对象标识符')
+    
+    # 标识码
+    bsm = Column(String(18), name='bsm', comment='标识码')
+    
+    # 要素代码
+    ysdm = Column(String(10), name='ysdm', comment='要素代码')
+    
+    # 图斑编号
+    tbybh = Column(String(18), name='tbybh', comment='图斑预编号')
+    tbbh = Column(String(8), name='tbbh', comment='图斑编号')
+    
+    # 地类信息
+    dlbm = Column(String(5), name='dlbm', comment='地类编码')
+    dlmc = Column(String(60), name='dlmc', comment='地类名称')
+    
+    # 权属信息
+    qsxz = Column(String(2), name='qsxz', comment='权属性质')
+    qsdwdm = Column(String(19), name='qsdwdm', comment='权属单位代码')
+    qsdwmc = Column(String(60), name='qsdwmc', comment='权属单位名称')
+    
+    # 坐落单位
+    zldwdm = Column(String(19), name='zldwdm', comment='坐落单位代码')
+    zldwmc = Column(String(60), name='zldwmc', comment='坐落单位名称')
+    
+    # 面积信息
+    tbmj = Column(Numeric(19, 11), name='tbmj', comment='图斑面积')
+    kcdlbm = Column(String(5), name='kcdlbm', comment='扣除地类编码')
+    kcxs = Column(Numeric(19, 11), name='kcxs', comment='扣除系数')
+    kcmj = Column(Numeric(19, 11), name='kcmj', comment='扣除面积')
+    tbdlmj = Column(Numeric(19, 11), name='tbdlmj', comment='图斑地类面积')
+    
+    # 耕地信息
+    gdlx = Column(String(2), name='gdlx', comment='耕地类型')
+    gdpdjb = Column(String(2), name='gdpdjb', comment='耕地坡度级别')
+    xzdwkd = Column(Numeric(19, 11), name='xzdwkd', comment='线状地物宽度')
+    
+    # 图斑细化
+    tbxhdm = Column(String(6), name='tbxhdm', comment='图斑细化代码')
+    tbxhmc = Column(String(20), name='tbxhmc', comment='图斑细化名称')
+    
+    # 种植属性
+    zzsxdm = Column(String(6), name='zzsxdm', comment='种植属性代码')
+    zzsxmc = Column(String(20), name='zzsxmc', comment='种植属性名称')
+    
+    # 耕地等别
+    gddb = Column(Integer, name='gddb', comment='耕地等别')
+    
+    # 其他属性
+    frdbs = Column(String(1), name='frdbs', comment='非入等标识')
+    czcsxm = Column(String(4), name='czcsxm', comment='城镇村属性码')
+    sjnf = Column(Integer, name='sjnf', comment='数据年份')
+    mssm = Column(String(2), name='mssm', comment='模式识别码')
+    hdmc = Column(String(100), name='hdmc', comment='河道名称')
+    bz = Column(String(254), name='bz', comment='备注')
+    
+    # 形状信息
+    shape_leng = Column(Numeric(19, 11), name='shape_leng', comment='形状长度')
+    shape_le_1 = Column(Numeric(19, 11), name='shape_le_1', comment='形状长度1')
+    shape_area = Column(Numeric(19, 11), name='shape_area', comment='形状面积')
+    
+    # 几何信息 - 使用EPSG:4526坐标系
+    geom = Column(
+        Geometry('POLYGON', 4526, from_text='ST_GeomFromEWKT', name='geometry'), 
+        index=True, 
+        comment='几何位置信息-多边形'
+    )

+ 44 - 0
app/services/admin_boundary_service.py

@@ -1,6 +1,9 @@
 from sqlalchemy.orm import Session
 from sqlalchemy.sql import text
 import json
+import geopandas as gpd
+from shapely.geometry import shape
+from typing import Optional
 
 
 def get_boundary_geojson_by_name(db: Session, name: str, level: str = "auto") -> dict:
@@ -112,3 +115,44 @@ def get_boundary_geojson_by_name(db: Session, name: str, level: str = "auto") ->
     raise ValueError(f"未找到名称: {name}")
 
 
+def get_boundary_gdf_by_name(db: Session, name: str, level: str = "auto") -> Optional[gpd.GeoDataFrame]:
+    """根据名称获取边界GeoDataFrame,优化版本避免创建临时文件
+    
+    这是一个通用的边界数据获取函数,直接返回GeoDataFrame而不需要创建临时Shapefile文件。
+    建议在需要边界数据进行空间操作的场景中使用此函数。
+
+    Args:
+        db (Session): 数据库会话
+        name (str): 名称(县/市/省)
+        level (str): 层级,可选 "county"|"city"|"province"|"auto"
+
+    Returns:
+        Optional[gpd.GeoDataFrame]: 边界GeoDataFrame,如果未找到则返回None
+        
+    Example:
+        ```python
+        with SessionLocal() as db:
+            boundary_gdf = get_boundary_gdf_by_name(db, "乐昌市", "county")
+            if boundary_gdf is not None:
+                # 直接使用GeoDataFrame进行空间操作
+                boundary_union = boundary_gdf.unary_union
+        ```
+    """
+    try:
+        boundary_geojson = get_boundary_geojson_by_name(db, name, level)
+        if boundary_geojson:
+            geometry_obj = shape(boundary_geojson["geometry"])
+            gdf = gpd.GeoDataFrame([boundary_geojson["properties"]], 
+                                 geometry=[geometry_obj], 
+                                 crs="EPSG:4326")
+            return gdf
+    except ValueError:
+        # 未找到对应边界数据
+        pass
+    except Exception:
+        # 其他错误也返回None,让调用方处理
+        pass
+    
+    return None
+
+

+ 433 - 3
app/services/agricultural_input_service.py

@@ -6,11 +6,23 @@
 """
 
 import logging
+import math
+import os
+import pandas as pd
+import numpy as np
+from datetime import datetime
 from typing import Dict, Any, List, Optional
-from sqlalchemy.orm import sessionmaker
-from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker, Session
+from sqlalchemy import create_engine, and_
 from ..database import SessionLocal, engine
 from ..models.parameters import Parameters
+from ..models.farmland import FarmlandData
+from ..utils.mapping_utils import MappingUtils
+from .admin_boundary_service import get_boundary_geojson_by_name, get_boundary_gdf_by_name
+import geopandas as gpd
+from shapely.geometry import shape, Point
+import tempfile
+import json
 
 class AgriculturalInputService:
     """
@@ -332,4 +344,422 @@ class AgriculturalInputService:
             
         except Exception as e:
             self.logger.error(f"验证参数时发生错误: {str(e)}")
-            return False 
+            return False
+    
+    def calculate_cd_flux_for_visualization(self, area: str) -> Dict[str, Any]:
+        """
+        专门用于可视化的农业投入Cd通量计算(参数固定使用韶关)
+        
+        @param area: 地区名称(仅用于地图边界,参数固定使用"韶关")
+        @returns: 计算结果
+        """
+        try:
+            with SessionLocal() as db:
+                # 参数固定使用"韶关",area参数仅用于地图边界
+                parameter = db.query(Parameters).filter(Parameters.area == "韶关").first()
+                
+                if not parameter:
+                    return {
+                        "success": False,
+                        "message": f"未找到韶关地区的参数数据",
+                        "data": None
+                    }
+                
+                # 计算农业投入输入Cd通量
+                # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
+                cd_flux = (
+                    parameter.f3 * parameter.nf +  # 氮肥
+                    parameter.f4 * parameter.pf +  # 磷肥
+                    parameter.f5 * parameter.kf +  # 钾肥
+                    parameter.f6 * parameter.cf +  # 复合肥
+                    parameter.f7 * parameter.of +  # 有机肥
+                    parameter.f8 * parameter.p +   # 农药
+                    parameter.f9 * parameter.ff +  # 农家肥
+                    parameter.f10 * parameter.af   # 农膜
+                )
+                
+                # 计算各项明细
+                details = {
+                    "nitrogen_fertilizer": parameter.f3 * parameter.nf,      # 氮肥贡献
+                    "phosphorus_fertilizer": parameter.f4 * parameter.pf,    # 磷肥贡献
+                    "potassium_fertilizer": parameter.f5 * parameter.kf,     # 钾肥贡献
+                    "compound_fertilizer": parameter.f6 * parameter.cf,      # 复合肥贡献
+                    "organic_fertilizer": parameter.f7 * parameter.of,       # 有机肥贡献
+                    "pesticide": parameter.f8 * parameter.p,                 # 农药贡献
+                    "farmyard_manure": parameter.f9 * parameter.ff,          # 农家肥贡献
+                    "agricultural_film": parameter.f10 * parameter.af        # 农膜贡献
+                }
+                
+                return {
+                    "success": True,
+                    "message": f"成功计算地区 '{area}' 的农业投入输入Cd通量(使用韶关参数)",
+                    "data": {
+                        "area": area,
+                        "total_cd_flux": round(cd_flux, 6),
+                        "unit": "g/ha/a",
+                        "details": {key: round(value, 6) for key, value in details.items()},
+                        "parameters_used": {
+                            "f3_nitrogen_cd_content": parameter.f3,
+                            "f4_phosphorus_cd_content": parameter.f4,
+                            "f5_potassium_cd_content": parameter.f5,
+                            "f6_compound_cd_content": parameter.f6,
+                            "f7_organic_cd_content": parameter.f7,
+                            "f8_pesticide_cd_content": parameter.f8,
+                            "f9_farmyard_cd_content": parameter.f9,
+                            "f10_film_cd_content": parameter.f10,
+                            "nf_nitrogen_usage": parameter.nf,
+                            "pf_phosphorus_usage": parameter.pf,
+                            "kf_potassium_usage": parameter.kf,
+                            "cf_compound_usage": parameter.cf,
+                            "of_organic_usage": parameter.of,
+                            "p_pesticide_usage": parameter.p,
+                            "ff_farmyard_usage": parameter.ff,
+                            "af_film_usage": parameter.af
+                        }
+                    }
+                }
+                
+        except Exception as e:
+            self.logger.error(f"计算地区 '{area}' 的Cd通量时发生错误: {str(e)}")
+            return {
+                "success": False,
+                "message": f"计算失败: {str(e)}",
+                "data": None
+            }
+    
+    def get_coordinates_for_results(self, results_data: Dict[str, Any], area: str) -> List[Dict[str, Any]]:
+        """
+        获取农业投入计算结果对应的坐标信息
+        
+        @param results_data: 计算结果数据
+        @param area: 地区名称
+        @returns: 包含坐标的结果列表
+        """
+        try:
+            # 农业投入计算只有一个结果值,需要与所有农田数据关联
+            total_cd_flux = results_data.get("total_cd_flux", 0)
+            
+            with SessionLocal() as db:
+                # 查询所有农田数据获取坐标
+                farmland_data = db.query(FarmlandData).all()
+                
+                if not farmland_data:
+                    self.logger.warning("未找到农田坐标数据")
+                    return []
+                
+                coordinates_results = []
+                for farmland in farmland_data:
+                    coord_result = {
+                        "farmland_id": farmland.farmland_id,
+                        "sample_id": farmland.sample_id,
+                        "longitude": farmland.lon,
+                        "latitude": farmland.lan,
+                        "flux_value": total_cd_flux,  # 所有点使用相同的农业投入Cd通量值
+                        "area": area
+                    }
+                    coordinates_results.append(coord_result)
+                
+                self.logger.info(f"✓ 成功获取 {len(coordinates_results)} 个农田样点的坐标信息,农业投入Cd通量: {total_cd_flux}")
+                return coordinates_results
+                
+        except Exception as e:
+            self.logger.error(f"获取坐标信息失败: {str(e)}")
+            raise
+    
+    def export_results_to_csv(self, results_data: Dict[str, Any], output_dir: str = "app/static/agricultural_input") -> str:
+        """
+        将农业投入计算结果导出为CSV文件
+        
+        @param results_data: 计算结果数据
+        @param output_dir: 输出目录
+        @returns: CSV文件路径
+        """
+        try:
+            # 确保输出目录存在
+            os.makedirs(output_dir, exist_ok=True)
+            
+            # 生成时间戳
+            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+            
+            # 生成文件名
+            area = results_data.get("area", "unknown")
+            filename = f"agricultural_input_{area}_{timestamp}.csv"
+            csv_path = os.path.join(output_dir, filename)
+            
+            # 准备导出数据
+            export_data = {
+                "area": results_data.get("area"),
+                "total_cd_flux": results_data.get("total_cd_flux"),
+                "unit": results_data.get("unit"),
+                "nitrogen_fertilizer": results_data["details"]["nitrogen_fertilizer"],
+                "phosphorus_fertilizer": results_data["details"]["phosphorus_fertilizer"],
+                "potassium_fertilizer": results_data["details"]["potassium_fertilizer"],
+                "compound_fertilizer": results_data["details"]["compound_fertilizer"],
+                "organic_fertilizer": results_data["details"]["organic_fertilizer"],
+                "pesticide": results_data["details"]["pesticide"],
+                "farmyard_manure": results_data["details"]["farmyard_manure"],
+                "agricultural_film": results_data["details"]["agricultural_film"]
+            }
+            
+            # 转换为DataFrame
+            df = pd.DataFrame([export_data])
+            
+            # 保存CSV文件
+            df.to_csv(csv_path, index=False, encoding='utf-8-sig')
+            
+            self.logger.info(f"✓ 成功导出农业投入结果到: {csv_path}")
+            return csv_path
+            
+        except Exception as e:
+            self.logger.error(f"导出CSV文件失败: {str(e)}")
+            raise
+    
+    def create_agricultural_input_visualization(self, area: str, calculation_type: str,
+                                              results_with_coords: List[Dict[str, Any]],
+                                              level: str = None,
+                                              output_dir: str = "app/static/agricultural_input",
+                                              template_raster: str = "app/static/cd_flux/meanTemp.tif",
+                                              boundary_shp: str = None,
+                                              colormap: str = "green_yellow_red_purple",
+                                              resolution_factor: float = 4.0,
+                                              enable_interpolation: bool = False,
+                                              cleanup_intermediate: bool = True) -> Dict[str, str]:
+        """
+        创建农业投入Cd通量可视化图表
+        
+        @param area: 地区名称
+        @param calculation_type: 计算类型(agricultural_input)
+        @param results_with_coords: 包含坐标的结果数据
+        @param level: 行政层级
+        @param output_dir: 输出目录
+        @param template_raster: 模板栅格文件路径
+        @param boundary_shp: 边界shapefile路径
+        @param colormap: 色彩方案
+        @param resolution_factor: 分辨率因子
+        @param enable_interpolation: 是否启用空间插值
+        @param cleanup_intermediate: 是否清理中间文件
+        @returns: 生成的图片文件路径字典
+        """
+        try:
+            if not results_with_coords:
+                raise ValueError("没有包含坐标的结果数据")
+            
+            # 确保输出目录存在
+            os.makedirs(output_dir, exist_ok=True)
+            generated_files: Dict[str, str] = {}
+            
+            # 生成时间戳
+            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+            
+            # 创建CSV文件用于绘图
+            csv_filename = f"agricultural_input_{area}_temp_{timestamp}.csv"
+            csv_path = os.path.join(output_dir, csv_filename)
+            
+            # 准备绘图数据
+            plot_data = []
+            for result in results_with_coords:
+                plot_data.append({
+                    "longitude": result["longitude"],
+                    "latitude": result["latitude"],
+                    "flux_value": result["flux_value"]
+                })
+            
+            # 保存为CSV
+            df = pd.DataFrame(plot_data)
+            df.to_csv(csv_path, index=False, encoding='utf-8-sig')
+            
+            # 初始化绘图工具
+            mapper = MappingUtils()
+            
+            # 生成输出文件路径
+            map_output = os.path.join(output_dir, f"agricultural_input_{area}_map_{timestamp}")
+            histogram_output = os.path.join(output_dir, f"agricultural_input_{area}_histogram_{timestamp}")
+            
+            # 检查模板文件是否存在
+            if not os.path.exists(template_raster):
+                self.logger.warning(f"模板栅格文件不存在: {template_raster}")
+                template_raster = None
+            
+            # 动态获取边界数据(严格使用指定层级)
+            if not level:
+                raise ValueError("必须提供行政层级 level:county | city | province")
+            
+            # 直接从数据库获取边界GeoDataFrame
+            boundary_gdf = self._get_boundary_gdf_from_database(area, level)
+            boundary_shp = None  # 不再需要临时边界文件
+            
+            if boundary_gdf is None:
+                self.logger.warning(f"未找到地区 '{area}' 的边界数据,将不使用边界裁剪")
+            else:
+                # 在绘图前进行样点边界包含性统计
+                try:
+                    if boundary_gdf is not None and len(boundary_gdf) > 0:
+                        boundary_union = boundary_gdf.unary_union
+                        total_points = len(results_with_coords)
+                        inside_count = 0
+                        outside_points: List[Dict[str, Any]] = []
+                        for r in results_with_coords:
+                            pt = Point(float(r["longitude"]), float(r["latitude"]))
+                            if boundary_union.contains(pt) or boundary_union.touches(pt):
+                                inside_count += 1
+                            else:
+                                outside_points.append({
+                                    "farmland_id": r.get("farmland_id"),
+                                    "sample_id": r.get("sample_id"),
+                                    "longitude": r.get("longitude"),
+                                    "latitude": r.get("latitude"),
+                                    "flux_value": r.get("flux_value")
+                                })
+
+                        outside_count = total_points - inside_count
+                        inside_pct = (inside_count / total_points * 100.0) if total_points > 0 else 0.0
+
+                        self.logger.info(
+                            f"样点边界检查 - 总数: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), 边界外: {outside_count}")
+                        if outside_count > 0:
+                            sample_preview = outside_points[:10]
+                            self.logger.warning(
+                                f"存在 {outside_count} 个样点位于边界之外,绘图时将被掩膜隐藏。示例(最多10条): {sample_preview}")
+
+                        # 在日志中打印边界检查统计结果
+                        self.logger.info(
+                            f"边界检查统计 - 地区: {area}, 层级: {level}, 计算类型: {calculation_type}, "
+                            f"总样点: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), "
+                            f"边界外: {outside_count}"
+                        )
+                        if outside_count > 0 and len(outside_points) > 0:
+                            sample_preview = outside_points[:5]  # 只显示前5个样本
+                            self.logger.info(f"边界外样点示例(前5个): {sample_preview}")
+                    else:
+                        generated_files = {}
+                except Exception as check_err:
+                    self.logger.warning(f"样点边界包含性检查失败: {str(check_err)}")
+            
+            # 创建shapefile
+            shapefile_path = csv_path.replace('.csv', '_points.shp')
+            mapper.csv_to_shapefile(csv_path, shapefile_path, 
+                                  lon_col='longitude', lat_col='latitude', value_col='flux_value')
+            
+            # 合并已生成文件映射
+            generated_files.update({"csv": csv_path, "shapefile": shapefile_path})
+            
+            # 如果有模板栅格文件,创建栅格地图
+            if template_raster:
+                try:
+                    # 创建栅格
+                    raster_path = csv_path.replace('.csv', '_raster.tif')
+                    raster_path, stats = mapper.vector_to_raster(
+                        shapefile_path, template_raster, raster_path, 'flux_value',
+                        resolution_factor=resolution_factor, boundary_shp=boundary_shp, boundary_gdf=boundary_gdf,
+                        interpolation_method='nearest', enable_interpolation=enable_interpolation
+                    )
+                    generated_files["raster"] = raster_path
+                    
+                    # 创建栅格地图 - 使用英文标题避免中文乱码
+                    map_title = "Agricultural Input Cd Flux"
+                    
+                    map_file = mapper.create_raster_map(
+                        boundary_shp if boundary_shp else None,
+                        raster_path,
+                        map_output,
+                        colormap=colormap,
+                        title=map_title,
+                        output_size=12,
+                        dpi=300,
+                        resolution_factor=4.0,
+                        enable_interpolation=False,
+                        interpolation_method='nearest',
+                        boundary_gdf=boundary_gdf
+                    )
+                    generated_files["map"] = map_file
+                    
+                    # 创建直方图 - 使用英文标题避免中文乱码
+                    histogram_title = "Agricultural Input Cd Flux Distribution"
+                    
+                    histogram_file = mapper.create_histogram(
+                        raster_path,
+                        f"{histogram_output}.jpg",
+                        title=histogram_title,
+                        xlabel='Cd Flux (g/ha/a)',
+                        ylabel='Frequency Density'
+                    )
+                    generated_files["histogram"] = histogram_file
+                    
+                except Exception as viz_error:
+                    self.logger.warning(f"栅格可视化创建失败: {str(viz_error)}")
+                    # 即使栅格可视化失败,也返回已生成的文件
+            
+            # 清理中间文件(默认开启,仅保留最终可视化)
+            if cleanup_intermediate:
+                try:
+                    self._cleanup_intermediate_files(generated_files, None)
+                except Exception as cleanup_err:
+                    self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
+
+            self.logger.info(f"✓ 成功创建农业投入Cd通量可视化,生成文件: {list(generated_files.keys())}")
+            return generated_files
+            
+        except Exception as e:
+            self.logger.error(f"创建农业投入可视化失败: {str(e)}")
+            raise
+
+    def _cleanup_intermediate_files(self, generated_files: Dict[str, str], boundary_shp: Optional[str]) -> None:
+        """
+        清理中间文件:CSV、Shapefile 及其配套文件、栅格TIFF;若边界为临时目录,则一并删除
+        """
+        import shutil
+        import tempfile
+
+        def _safe_remove(path: str) -> None:
+            try:
+                if path and os.path.exists(path) and os.path.isfile(path):
+                    os.remove(path)
+            except Exception:
+                pass
+
+        # 删除 CSV
+        _safe_remove(generated_files.get("csv"))
+
+        # 删除栅格
+        _safe_remove(generated_files.get("raster"))
+
+        # 删除 Shapefile 全家桶
+        shp_path = generated_files.get("shapefile")
+        if shp_path:
+            base, _ = os.path.splitext(shp_path)
+            for ext in (".shp", ".shx", ".dbf", ".prj", ".cpg"):
+                _safe_remove(base + ext)
+
+        # 如果边界文件来自系统临时目录,删除其所在目录
+        if boundary_shp:
+            temp_root = tempfile.gettempdir()
+            try:
+                if os.path.commonprefix([os.path.abspath(boundary_shp), temp_root]) == temp_root:
+                    temp_dir = os.path.dirname(os.path.abspath(boundary_shp))
+                    if os.path.isdir(temp_dir):
+                        shutil.rmtree(temp_dir, ignore_errors=True)
+            except Exception:
+                pass
+    
+    def _get_boundary_gdf_from_database(self, area: str, level: str) -> Optional[gpd.GeoDataFrame]:
+        """
+        直接从数据库获取边界数据作为GeoDataFrame
+        
+        @param area: 地区名称
+        @param level: 行政层级
+        @returns: 边界GeoDataFrame或None
+        """
+        try:
+            with SessionLocal() as db:
+                norm_area = area.strip()
+                boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
+                if boundary_gdf is not None:
+                    self.logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
+                return boundary_gdf
+                    
+        except Exception as e:
+            self.logger.warning(f"从数据库获取边界数据失败: {str(e)}")
+            
+        return None
+    
+ 

+ 53 - 34
app/services/cd_flux_removal_service.py

@@ -18,7 +18,7 @@ from ..models.parameters import Parameters
 from ..models.CropCd_output import CropCdOutputData
 from ..models.farmland import FarmlandData
 from ..utils.mapping_utils import MappingUtils
-from .admin_boundary_service import get_boundary_geojson_by_name
+from .admin_boundary_service import get_boundary_geojson_by_name, get_boundary_gdf_by_name
 import geopandas as gpd
 from shapely.geometry import shape, Point
 import tempfile
@@ -43,20 +43,20 @@ class CdFluxRemovalService:
         """
         根据地区计算籽粒移除Cd通量
         
-        @param area: 地区名称
+        @param area: 地区名称(仅用于地图边界,参数固定使用"韶关")
         @returns: 计算结果字典
         
         计算公式:籽粒移除(g/ha/a) = EXP(LnCropCd) * F11 * 0.5 * 15 / 1000
         """
         try:
             with SessionLocal() as db:
-                # 查询指定地区的参数(严格等号匹配,不做任何映射)
-                parameter = db.query(Parameters).filter(Parameters.area == area).first()
+                # 参数固定使用"韶关",area参数仅用于地图边界
+                parameter = db.query(Parameters).filter(Parameters.area == "韶关").first()
                 
                 if not parameter:
                     return {
                         "success": False,
-                        "message": f"未找到地区 '{area}' 的参数数据",
+                        "message": f"未找到韶关地区的参数数据",
                         "data": None
                     }
                 
@@ -119,20 +119,20 @@ class CdFluxRemovalService:
         """
         根据地区计算秸秆移除Cd通量
         
-        @param area: 地区名称
+        @param area: 地区名称(仅用于地图边界,参数固定使用"韶关")
         @returns: 计算结果字典
         
         计算公式:秸秆移除(g/ha/a) = [EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000
         """
         try:
             with SessionLocal() as db:
-                # 查询指定地区的参数(严格等号匹配,不做任何映射)
-                parameter = db.query(Parameters).filter(Parameters.area == area).first()
+                # 参数固定使用"韶关",area参数仅用于地图边界
+                parameter = db.query(Parameters).filter(Parameters.area == "韶关").first()
                 
                 if not parameter:
                     return {
                         "success": False,
-                        "message": f"未找到地区 '{area}' 的参数数据",
+                        "message": f"未找到韶关地区的参数数据",
                         "data": None
                     }
                 
@@ -370,16 +370,20 @@ class CdFluxRemovalService:
                 self.logger.warning(f"模板栅格文件不存在: {template_raster}")
                 template_raster = None
             
-            # 动态获取边界文件(严格使用指定层级)
+            # 动态获取边界数据(严格使用指定层级)
             if not level:
                 raise ValueError("必须提供行政层级 level:county | city | province")
-            boundary_shp = self._get_boundary_file_for_area(area, level)
-            if not boundary_shp:
-                self.logger.warning(f"未找到地区 '{area}' 的边界文件,将不使用边界裁剪")
+            
+            # 优化:直接从数据库获取边界GeoDataFrame,避免创建临时shapefile文件
+            # 这样可以减少磁盘I/O操作和临时文件管理的开销
+            boundary_gdf = self._get_boundary_gdf_from_database(area, level)
+            boundary_shp = None  # 不再需要临时边界文件
+            
+            if boundary_gdf is None:
+                self.logger.warning(f"未找到地区 '{area}' 的边界数据,将不使用边界裁剪")
             else:
                 # 在绘图前进行样点边界包含性统计
                 try:
-                    boundary_gdf = gpd.read_file(boundary_shp)
                     if boundary_gdf is not None and len(boundary_gdf) > 0:
                         boundary_union = boundary_gdf.unary_union
                         total_points = len(results_with_coords)
@@ -408,24 +412,15 @@ class CdFluxRemovalService:
                             self.logger.warning(
                                 f"存在 {outside_count} 个样点位于边界之外,绘图时将被掩膜隐藏。示例(最多10条): {sample_preview}")
 
-                        report = {
-                            "area": area,
-                            "level": level,
-                            "calculation_type": calculation_type,
-                            "total_points": total_points,
-                            "inside_points": inside_count,
-                            "outside_points": outside_count,
-                            "inside_percentage": round(inside_pct, 2),
-                            "outside_samples": outside_points
-                        }
-                        os.makedirs(output_dir, exist_ok=True)
-                        report_path = os.path.join(
-                            output_dir,
-                            f"{calculation_type}_{area}_points_boundary_check_{timestamp}.json"
+                        # 在日志中打印边界检查统计结果
+                        self.logger.info(
+                            f"边界检查统计 - 地区: {area}, 层级: {level}, 计算类型: {calculation_type}, "
+                            f"总样点: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), "
+                            f"边界外: {outside_count}"
                         )
-                        with open(report_path, "w", encoding="utf-8") as f:
-                            json.dump(report, f, ensure_ascii=False, indent=2)
-                        generated_files["point_boundary_report"] = report_path
+                        if outside_count > 0 and len(outside_points) > 0:
+                            sample_preview = outside_points[:5]  # 只显示前5个样本
+                            self.logger.info(f"边界外样点示例(前5个): {sample_preview}")
                     else:
                         generated_files = {}
                 except Exception as check_err:
@@ -447,7 +442,7 @@ class CdFluxRemovalService:
                     raster_path = csv_path.replace('.csv', '_raster.tif')
                     raster_path, stats = mapper.vector_to_raster(
                         shapefile_path, template_raster, raster_path, 'flux_value',
-                        resolution_factor=resolution_factor, boundary_shp=boundary_shp,
+                        resolution_factor=resolution_factor, boundary_shp=boundary_shp, boundary_gdf=boundary_gdf,
                         interpolation_method='nearest', enable_interpolation=enable_interpolation
                     )
                     generated_files["raster"] = raster_path
@@ -469,7 +464,8 @@ class CdFluxRemovalService:
                         dpi=300,
                         resolution_factor=4.0,
                         enable_interpolation=False,
-                        interpolation_method='nearest'
+                        interpolation_method='nearest',
+                        boundary_gdf=boundary_gdf
                     )
                     generated_files["map"] = map_file
                     
@@ -496,7 +492,8 @@ class CdFluxRemovalService:
             # 清理中间文件(默认开启,仅保留最终可视化)
             if cleanup_intermediate:
                 try:
-                    self._cleanup_intermediate_files(generated_files, boundary_shp)
+                    # 由于不再创建临时边界文件,所以传递None
+                    self._cleanup_intermediate_files(generated_files, None)
                 except Exception as cleanup_err:
                     self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
 
@@ -566,12 +563,34 @@ class CdFluxRemovalService:
             self.logger.error(f"获取边界文件失败: {str(e)}")
             return None
     
+    def _get_boundary_gdf_from_database(self, area: str, level: str) -> Optional[gpd.GeoDataFrame]:
+        """
+        直接从数据库获取边界数据作为GeoDataFrame
+        
+        @param area: 地区名称
+        @param level: 行政层级
+        @returns: 边界GeoDataFrame或None
+        """
+        try:
+            with SessionLocal() as db:
+                norm_area = area.strip()
+                boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
+                if boundary_gdf is not None:
+                    self.logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
+                return boundary_gdf
+                    
+        except Exception as e:
+            self.logger.warning(f"从数据库获取边界数据失败: {str(e)}")
+            
+        return None
+
     def _create_boundary_from_database(self, area: str, level: str) -> Optional[str]:
         """
         从数据库获取边界数据并创建临时shapefile
         
         @param area: 地区名称
         @returns: 临时边界文件路径或None
+        @deprecated: 建议使用 _get_boundary_gdf_from_database 方法直接获取 GeoDataFrame
         """
         try:
             with SessionLocal() as db:

+ 10 - 6
app/services/cd_flux_service.py

@@ -14,13 +14,17 @@ from app.database import SessionLocal
 from app.models import FarmlandData, FluxCdOutputData, FluxCdInputData
 from app.utils.mapping_utils import MappingUtils, csv_to_raster_workflow
 
-# 配置日志
+# 配置日志 - 避免重复日志输出
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-handler = logging.StreamHandler()
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-handler.setFormatter(formatter)
-logger.addHandler(handler)
+# 避免重复添加处理器导致重复日志
+if not logger.handlers:
+    logger.setLevel(logging.INFO)
+    handler = logging.StreamHandler()
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    # 关闭日志传播,避免与父级日志处理器重复输出
+    logger.propagate = False
 
 def get_base_dir():
     """获取基础目录路径(与土地数据处理函数一致)"""

+ 331 - 0
app/services/land_use_service.py

@@ -0,0 +1,331 @@
+"""
+四县土地利用类型数据服务
+@description: 提供基于four_county_land表的土地利用类型数据查询和处理功能
+@author: AcidMap Team
+@version: 1.0.0
+"""
+
+import logging
+import pandas as pd
+import numpy as np
+from typing import Dict, Any, List, Optional, Tuple
+from sqlalchemy.orm import Session
+from sqlalchemy import text, and_, func
+from shapely.geometry import Point
+from shapely.wkt import loads
+from pyproj import Transformer
+import functools
+from datetime import datetime, timedelta
+
+from ..database import SessionLocal
+from ..models.land_use import FourCountyLandUse
+
+# 配置日志
+logger = logging.getLogger(__name__)
+
+
+class LandUseService:
+    """
+    四县土地利用类型数据服务类
+    
+    @description: 提供基于four_county_land表的土地利用类型数据查询和处理功能,包含性能优化
+    """
+    
+    def __init__(self):
+        """
+        初始化土地利用服务
+        """
+        self.logger = logging.getLogger(__name__)
+        self._cache = {}  # 简单的内存缓存
+        self._cache_timeout = timedelta(minutes=30)  # 缓存30分钟
+        
+    def get_land_use_by_type(self, land_type: str) -> Optional[List[Dict[str, Any]]]:
+        """
+        根据土地利用类型查询数据
+        
+        @param land_type: 土地利用类型,如:'水田'、'旱地'、'水浇地'
+        @returns: 土地利用数据列表,包含几何中心点坐标信息
+        """
+        try:
+            with SessionLocal() as db:
+                # 查询指定类型的土地利用数据
+                query = db.query(FourCountyLandUse).filter(
+                    FourCountyLandUse.dlmc == land_type
+                )
+                
+                land_records = query.all()
+                
+                if not land_records:
+                    self.logger.warning(f"未找到类型为 '{land_type}' 的土地利用数据")
+                    return None
+                
+                self.logger.info(f"查询到 {len(land_records)} 条 '{land_type}' 类型的土地利用数据")
+                
+                # 转换数据为列表
+                result = []
+                for record in land_records:
+                    # 从PostGIS几何数据中提取中心点坐标
+                    center_info = self._extract_geometry_center(db, record.gid)
+                    
+                    land_data = {
+                        'gid': record.gid,
+                        'objectid': record.objectid,
+                        'dlmc': record.dlmc,  # 地类名称
+                        'dlbm': record.dlbm,  # 地类编码
+                        'tbmj': float(record.tbmj) if record.tbmj else None,  # 图斑面积
+                        'center_lon': center_info['longitude'] if center_info else None,
+                        'center_lat': center_info['latitude'] if center_info else None,
+                        'shape_area': float(record.shape_area) if record.shape_area else None,
+                        # 其他可能需要的字段
+                        'qsdwmc': record.qsdwmc,  # 权属单位名称
+                        'zldwmc': record.zldwmc,  # 坐落单位名称
+                    }
+                    result.append(land_data)
+                
+                return result
+                
+        except Exception as e:
+            self.logger.error(f"查询土地利用数据失败: {str(e)}")
+            return None
+    
+    def _extract_geometry_center(self, db: Session, gid: int) -> Optional[Dict[str, float]]:
+        """
+        提取几何图形的中心点坐标
+        
+        @param db: 数据库会话
+        @param gid: 图形记录ID
+        @returns: 包含经纬度的字典
+        """
+        try:
+            # 使用PostGIS函数计算几何中心并转换为WGS84坐标系
+            sql = text("""
+                SELECT 
+                    ST_X(ST_Transform(ST_Centroid(geom), 4326)) as longitude,
+                    ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as latitude
+                FROM four_county_land 
+                WHERE gid = :gid
+            """)
+            
+            result = db.execute(sql, {'gid': gid}).fetchone()
+            
+            if result:
+                return {
+                    'longitude': float(result.longitude),
+                    'latitude': float(result.latitude)
+                }
+            else:
+                self.logger.warning(f"无法获取GID {gid} 的几何中心点")
+                return None
+                
+        except Exception as e:
+            self.logger.error(f"提取几何中心点失败 (GID: {gid}): {str(e)}")
+            return None
+    
+    def _is_cache_valid(self, cache_key: str) -> bool:
+        """检查缓存是否有效"""
+        if cache_key not in self._cache:
+            return False
+        cache_time = self._cache[cache_key].get('timestamp')
+        if not cache_time:
+            return False
+        return datetime.now() - cache_time < self._cache_timeout
+    
+    def _get_from_cache(self, cache_key: str):
+        """从缓存获取数据"""
+        if self._is_cache_valid(cache_key):
+            return self._cache[cache_key]['data']
+        return None
+    
+    def _set_cache(self, cache_key: str, data):
+        """设置缓存数据"""
+        self._cache[cache_key] = {
+            'data': data,
+            'timestamp': datetime.now()
+        }
+    
+    def get_land_centers_optimized(self, land_type: str, limit: Optional[int] = None, 
+                                  offset: Optional[int] = None) -> Optional[pd.DataFrame]:
+        """
+        高性能批量获取土地中心点数据
+        
+        @param land_type: 土地利用类型
+        @param limit: 限制返回数量,None表示返回所有
+        @param offset: 偏移量,用于分页
+        @returns: 包含中心点坐标的DataFrame
+        """
+        try:
+            # 构建缓存键
+            cache_key = f"centers_optimized_{land_type}_{limit}_{offset}"
+            
+            # 尝试从缓存获取
+            cached_data = self._get_from_cache(cache_key)
+            if cached_data is not None:
+                self.logger.info(f"从缓存获取 {land_type} 中心点数据,数量: {len(cached_data)}")
+                return cached_data
+            
+            with SessionLocal() as db:
+                # 使用一次性SQL查询获取所有必要数据,避免N+1查询问题
+                sql = text("""
+                    SELECT 
+                        gid,
+                        dlmc,
+                        shape_area,
+                        ST_X(ST_Transform(ST_Centroid(geom), 4326)) as center_lon,
+                        ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as center_lat
+                    FROM four_county_land 
+                    WHERE dlmc = :land_type
+                    AND geom IS NOT NULL
+                    ORDER BY gid
+                    %s
+                """ % (f"LIMIT {limit} OFFSET {offset or 0}" if limit else ""))
+                
+                results = db.execute(sql, {'land_type': land_type}).fetchall()
+                
+                if not results:
+                    self.logger.warning(f"未找到 '{land_type}' 类型的土地利用数据")
+                    return None
+                
+                # 直接构建DataFrame,避免中间转换
+                data = {
+                    'gid': [r.gid for r in results],
+                    'center_lon': [float(r.center_lon) for r in results],
+                    'center_lat': [float(r.center_lat) for r in results],
+                    'dlmc': [r.dlmc for r in results],
+                    'shape_area': [float(r.shape_area) if r.shape_area else None for r in results]
+                }
+                
+                df = pd.DataFrame(data)
+                
+                # 缓存结果
+                self._set_cache(cache_key, df)
+                
+                self.logger.info(f"高性能查询获取 {len(df)} 个 '{land_type}' 中心点")
+                return df
+                
+        except Exception as e:
+            self.logger.error(f"高性能获取土地中心点数据失败: {str(e)}")
+            return None
+    
+    def get_land_centers_for_processing(self, land_type: str) -> Optional[pd.DataFrame]:
+        """
+        获取用于处理的土地中心点数据(使用优化版本)
+        
+        @param land_type: 土地利用类型
+        @returns: 包含中心点坐标的DataFrame
+        """
+        # 使用优化版本的方法
+        return self.get_land_centers_optimized(land_type)
+    
+    def get_available_land_types(self) -> List[str]:
+        """
+        获取数据库中可用的土地利用类型(带缓存)
+        
+        @returns: 土地利用类型列表
+        """
+        cache_key = "available_land_types"
+        
+        # 尝试从缓存获取
+        cached_data = self._get_from_cache(cache_key)
+        if cached_data is not None:
+            return cached_data
+        
+        try:
+            with SessionLocal() as db:
+                # 查询所有不同的土地利用类型
+                query = db.query(FourCountyLandUse.dlmc).distinct()
+                results = query.all()
+                
+                land_types = [result.dlmc for result in results if result.dlmc]
+                land_types.sort()  # 排序便于查看
+                
+                # 缓存结果
+                self._set_cache(cache_key, land_types)
+                
+                self.logger.info(f"可用的土地利用类型: {land_types}")
+                return land_types
+                
+        except Exception as e:
+            self.logger.error(f"获取土地利用类型失败: {str(e)}")
+            return []
+    
+    def get_land_statistics_summary(self) -> Dict[str, Any]:
+        """
+        获取土地利用数据统计摘要(带缓存)
+        
+        @returns: 统计摘要信息
+        """
+        cache_key = "land_statistics_summary"
+        
+        # 尝试从缓存获取
+        cached_data = self._get_from_cache(cache_key)
+        if cached_data is not None:
+            return cached_data
+        
+        try:
+            with SessionLocal() as db:
+                # 统计各类型土地的数量
+                sql = text("""
+                    SELECT 
+                        dlmc,
+                        COUNT(*) as count,
+                        SUM(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE 0 END) as total_area,
+                        AVG(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE NULL END) as avg_area
+                    FROM four_county_land 
+                    WHERE dlmc IS NOT NULL
+                    GROUP BY dlmc
+                    ORDER BY count DESC
+                """)
+                
+                results = db.execute(sql).fetchall()
+                
+                summary = {
+                    'total_records': 0,
+                    'land_types': [],
+                    'statistics_by_type': {}
+                }
+                
+                for result in results:
+                    land_type = result.dlmc
+                    count = result.count
+                    total_area = float(result.total_area) if result.total_area else 0
+                    avg_area = float(result.avg_area) if result.avg_area else 0
+                    
+                    summary['total_records'] += count
+                    summary['land_types'].append(land_type)
+                    summary['statistics_by_type'][land_type] = {
+                        'count': count,
+                        'total_area': total_area,
+                        'average_area': avg_area
+                    }
+                
+                # 缓存结果
+                self._set_cache(cache_key, summary)
+                
+                self.logger.info(f"统计摘要: 总记录数 {summary['total_records']}, 土地类型数 {len(summary['land_types'])}")
+                return summary
+                
+        except Exception as e:
+            self.logger.error(f"获取统计摘要失败: {str(e)}")
+            return {
+                'total_records': 0,
+                'land_types': [],
+                'statistics_by_type': {}
+            }
+    
+    def clear_cache(self):
+        """清空缓存"""
+        self._cache.clear()
+        self.logger.info("缓存已清空")
+    
+    def get_cache_info(self) -> Dict[str, Any]:
+        """获取缓存信息"""
+        cache_info = {
+            'cache_size': len(self._cache),
+            'cache_keys': list(self._cache.keys()),
+            'cache_timeout_minutes': self._cache_timeout.total_seconds() / 60
+        }
+        return cache_info
+
+
+# 全局服务实例
+land_use_service = LandUseService()

+ 230 - 88
app/services/water_service.py

@@ -12,15 +12,17 @@ import shutil
 import sys
 
 # 导入MappingUtils
-from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow
+from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow, dataframe_to_raster_workflow
+# 导入土地利用服务
+from .land_use_service import land_use_service
+# 导入边界服务
+from .admin_boundary_service import get_boundary_gdf_by_name
+from ..database import SessionLocal
+import geopandas as gpd
+import tempfile
 
 # 配置日志
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-handler = logging.StreamHandler()
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-handler.setFormatter(formatter)
-logger.addHandler(handler)
 
 # 设置全局字体
 import matplotlib.pyplot as plt
@@ -61,30 +63,58 @@ def get_base_dir():
         return os.path.dirname(os.path.abspath(__file__))
 
 
+def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
+    """
+    直接从数据库获取边界数据作为GeoDataFrame
+    
+    @description: 与其他模块保持一致的边界获取方法
+    @param area: 地区名称
+    @param level: 行政层级(county | city | province)
+    @returns: 边界GeoDataFrame或None
+    """
+    try:
+        with SessionLocal() as db:
+            norm_area = area.strip()
+            boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
+            if boundary_gdf is not None:
+                logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
+            return boundary_gdf
+                
+    except Exception as e:
+        logger.warning(f"从数据库获取边界数据失败: {str(e)}")
+        
+    return None
+
+
+
+
+
 # 土地数据处理函数
-def process_land_data(land_type, coefficient_params=None):
+def process_land_data(land_type, coefficient_params=None, save_csv=True):
     """处理土地类型数据并生成清洗后的简化数据"""
     base_dir = get_base_dir()
-    shp_file = os.path.join(base_dir, "..", "static", "water", "Raster", "四县三种用电.shp")
     xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
 
     logger.info(f"处理土地类型: {land_type}")
-    logger.info(f"SHP文件: {shp_file}")
     logger.info(f"Excel文件: {xls_file}")
 
-    # 读取和处理SHP数据
-    gdf_shp = gpd.read_file(shp_file)
-    gdf_shp = gdf_shp[gdf_shp['DLMC'] == land_type]
-
-    if gdf_shp.empty:
-        logger.warning(f"没有找到 '{land_type}' 类型的要素")
-        return None, None
+    # 从数据库获取土地利用数据,替代原来的SHP文件读取
+    logger.info(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
+    land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
+    
+    if land_centers_df is None or land_centers_df.empty:
+        logger.warning(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
+        return None, None, None
 
-    # 坐标系转换器
-    transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True)
+    logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
 
     # 读取Excel采样点数据
+    if not os.path.exists(xls_file):
+        logger.error(f"采样点Excel文件不存在: {xls_file}")
+        return None, None, None
+        
     df_xls = pd.read_excel(xls_file)
+    logger.info(f"读取到 {len(df_xls)} 个采样点数据")
 
     # 设置土地类型系数
     default_params = {
@@ -98,14 +128,16 @@ def process_land_data(land_type, coefficient_params=None):
     Num = param1 * param2
     logger.info(f"系数: {param1} * {param2} = {Num}")
 
-    # 处理每个面要素
+    # 处理每个面要素,使用数据库中的中心点坐标
     cd_values = []
     centers = []
-    for index, row in gdf_shp.iterrows():
-        center_original = row['geometry'].centroid
-        center_lon, center_lat = transformer.transform(center_original.x, center_original.y)
+    
+    for index, row in land_centers_df.iterrows():
+        center_lon = row['center_lon']
+        center_lat = row['center_lat']
         centers.append((center_lon, center_lat))
 
+        # 计算到所有采样点的距离
         distances = df_xls.apply(
             lambda x: Point(center_lon, center_lat).distance(Point(x['经度'], x['纬度'])),
             axis=1
@@ -113,15 +145,10 @@ def process_land_data(land_type, coefficient_params=None):
         min_idx = distances.idxmin()
         nearest = df_xls.loc[min_idx]
 
-        # 计算Cd含量值
+        # 计算Cd含量值
         cd_value = nearest['Cd (ug/L)'] * Num
         cd_values.append(cd_value)
 
-    # 创建输出目录
-    data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
-    os.makedirs(data_dir, exist_ok=True)
-    logger.info(f"数据目录: {data_dir}")
-
     # 创建简化数据DataFrame
     simplified_data = pd.DataFrame({
         'lon': [c[0] for c in centers],
@@ -152,17 +179,37 @@ def process_land_data(land_type, coefficient_params=None):
         ]
     logger.info(f"清洗后数据点数: {len(cleaned_data)}")
 
-    # 保存清洗后的简化数据CSV
-    cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
-    cleaned_data.to_csv(cleaned_csv, index=False, encoding='utf-8-sig')
-    logger.info(f"保存CSV: {cleaned_csv}")
+    # 可选:保存CSV文件用于兼容性和调试
+    cleaned_csv_path = None
+    if save_csv:
+        # 创建输出目录
+        data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
+        os.makedirs(data_dir, exist_ok=True)
+        logger.info(f"数据目录: {data_dir}")
+        
+        cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
+        cleaned_data.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
+        logger.info(f"保存CSV: {cleaned_csv_path}")
+    else:
+        logger.info("跳过CSV文件生成(内存处理优化)")
 
-    return cleaned_csv, Num
+    return cleaned_data, Num, cleaned_csv_path  # 返回DataFrame, 系数, 和CSV路径(如果生成了)
 
 
-# 可视化函数(使用MappingUtils)
+# 可视化函数(完全使用统一的MappingUtils接口
 def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8):
-    """生成栅格地图可视化"""
+    """
+    生成栅格地图可视化
+    
+    @description: 使用统一的MappingUtils接口生成栅格地图
+    @param shp_path: 边界shapefile路径
+    @param tif_path: 栅格数据文件路径  
+    @param color_map_name: 色彩方案名称(中文)
+    @param title_name: 地图标题
+    @param output_path: 输出图片路径
+    @param output_size: 图片尺寸
+    @returns: 输出图片路径或None(失败时)
+    """
     try:
         logger.info(f"生成栅格地图: {title_name}")
 
@@ -175,11 +222,11 @@ def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path,
         # 转换颜色方案名称
         colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
 
-        # 调用MappingUtils的高级绘图功能
-        mapper.create_raster_map(
+        # 直接调用统一的绘图接口,无需额外包装
+        generated_path = mapper.create_raster_map(
             shp_path=shp_path,
             tif_path=tif_path,
-            output_path=os.path.splitext(output_path)[0],  # 去掉扩展名
+            output_path=os.path.splitext(output_path)[0],  # 去掉扩展名,MappingUtils会自动添加.jpg
             colormap=colormap_key,
             title=title_name,
             output_size=output_size,
@@ -189,13 +236,13 @@ def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path,
             interpolation_method='linear'
         )
 
-        # MappingUtils会自动添加.jpg扩展名,重命名文件以匹配原始输出路径
-        generated_path = os.path.splitext(output_path)[0] + ".jpg"
+        # 如果生成的路径与期望路径不同,则重命名
         if generated_path != output_path and os.path.exists(generated_path):
             shutil.move(generated_path, output_path)
             logger.info(f"重命名图片: {generated_path} -> {output_path}")
-
-        return output_path
+            return output_path
+        
+        return generated_path
 
     except Exception as e:
         logger.error(f"栅格地图生成失败: {str(e)}")
@@ -205,15 +252,26 @@ def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path,
 def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
                        xlabel='Cd(ug/L)', ylabel='frequency density',
                        title='Irrigation water input flux'):
-    """生成TIFF文件的直方图"""
+    """
+    生成TIFF文件的直方图
+    
+    @description: 使用统一的MappingUtils接口生成直方图
+    @param file_path: TIFF文件路径
+    @param output_path: 输出图片路径
+    @param figsize: 图片尺寸元组
+    @param xlabel: X轴标签
+    @param ylabel: Y轴标签  
+    @param title: 图表标题
+    @returns: 输出图片路径或None(失败时)
+    """
     try:
         logger.info(f"生成直方图: {file_path}")
 
         # 创建MappingUtils实例
         mapper = MappingUtils()
 
-        # 调用MappingUtils的高级直方图功能
-        mapper.create_histogram(
+        # 直接调用统一的绘图接口,无需额外包装
+        generated_path = mapper.create_histogram(
             file_path=file_path,
             save_path=output_path,
             figsize=figsize,
@@ -224,81 +282,165 @@ def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
             dpi=300
         )
 
-        return output_path
+        return generated_path
 
     except Exception as e:
         logger.error(f"直方图生成失败: {str(e)}")
         return None
 
 
-# 完整的处理流程
 def process_land_to_visualization(land_type, coefficient_params=None,
                                   color_map_name="绿-黄-红-紫",
-                                  output_size=8):
+                                  output_size=8,
+                                  area: Optional[str] = None,
+                                  level: Optional[str] = None,
+                                  enable_interpolation: Optional[bool] = True,
+                                  interpolation_method: Optional[str] = "linear",
+                                  resolution_factor: Optional[float] = 4.0,
+                                  save_csv: Optional[bool] = True):
     """
-    完整的土地数据处理可视化流程:
-    1. 生成清洗后CSV
-    2. 使用csv_to_raster_workflow转换为GeoTIFF
-    3. 生成栅格地图
-    4. 生成直方图
-
-    返回所有生成文件的路径
+    完整的土地数据处理可视化流程(使用统一的MappingUtils接口,支持动态边界和插值控制)
+    
+    @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
+                  支持通过area和level参数动态控制地图边界,支持插值控制参数
+    
+    工作流程:
+    1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量
+    2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
+    3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
+    4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
+    5. 生成直方图 - 直接调用MappingUtils.create_histogram()
+
+    @param land_type: 土地类型(水田、旱地、水浇地)
+    @param coefficient_params: 可选的系数参数字典
+    @param color_map_name: 色彩方案名称(中文)
+    @param output_size: 输出图片尺寸
+    @param area: 可选的地区名称,如"乐昌市",用于动态控制地图边界
+    @param level: 可选的行政层级,county | city | province
+    @param enable_interpolation: 是否启用空间插值,默认True
+    @param interpolation_method: 插值方法,nearest | linear | cubic,默认linear
+    @param resolution_factor: 分辨率因子,默认4.0,越大分辨率越高
+    @param save_csv: 是否生成CSV文件,默认True
+    @returns: 包含所有生成文件路径的元组
     """
     base_dir = get_base_dir()
     logger.info(f"开始处理: {land_type}")
 
-    # 1. 生成清洗后的CSV
-    cleaned_csv, used_coeff = process_land_data(land_type, coefficient_params)
-    if not cleaned_csv:
+    # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
+    cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
+        land_type, 
+        coefficient_params, 
+        save_csv=save_csv  # 根据参数决定是否生成CSV文件
+    )
+    if cleaned_data is None:
         logger.error(f"处理土地数据失败: {land_type}")
         return None, None, None, None, None, None
 
-    # 2. 使用csv_to_raster_workflow转换为GeoTIFF
+    # 2. 获取动态边界数据(提前获取用于栅格转换)
+    boundary_gdf = None
+    if area and level:
+        logger.info(f"尝试获取动态边界: area='{area}', level='{level}'")
+        boundary_gdf = get_boundary_gdf_from_database(area, level)
+        if boundary_gdf is not None:
+            logger.info(f"成功获取动态边界用于栅格转换: {area} ({level}) - 包含 {len(boundary_gdf)} 个几何体")
+            logger.info(f"边界数据范围: {boundary_gdf.bounds.to_dict('records')[0] if len(boundary_gdf) > 0 else 'N/A'}")
+        else:
+            logger.warning(f"未能获取动态边界,栅格转换将使用完整范围: {area} ({level})")
+            logger.warning("可能的原因: 1) 数据库中没有该地区数据 2) 地区名称拼写错误 3) level参数不正确")
+    else:
+        logger.info("未指定area和level参数,尝试获取默认边界(乐昌市)")
+        boundary_gdf = get_boundary_gdf_from_database("乐昌市", "county")
+        if boundary_gdf is not None:
+            logger.info(f"成功获取默认边界(乐昌市)用于栅格转换 - 包含 {len(boundary_gdf)} 个几何体")
+        else:
+            logger.info("未能获取默认边界,栅格转换将使用数据点范围")
+            
+    # 3. 使用dataframe_to_raster_workflow转换为GeoTIFF(内存处理,支持动态边界)
     raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
     template_tif = os.path.join(raster_dir, "meanTemp.tif")
-    output_dir = os.path.dirname(cleaned_csv)  # 使用CSV所在目录作为输出目录
+    output_dir = raster_dir  # 直接使用raster目录作为输出目录
 
-    # 调用csv_to_raster_workflow
-    workflow_result = csv_to_raster_workflow(
-        csv_file=cleaned_csv,
+    # 调用dataframe_to_raster_workflow,支持动态边界和插值控制(内存处理优化)
+    workflow_result = dataframe_to_raster_workflow(
+        df=cleaned_data,  # 直接使用DataFrame
         template_tif=template_tif,
         output_dir=output_dir,
-        boundary_shp=None,
-        resolution_factor=4.0,
-        interpolation_method='linear',
+        boundary_gdf=boundary_gdf,  # 直接使用GeoDataFrame
+        resolution_factor=resolution_factor,  # 可配置的分辨率因子
+        interpolation_method=interpolation_method,  # 可配置的插值方法
         field_name='Prediction',
-        lon_col=0,  # CSV中经度列索引
-        lat_col=1,  # CSV中纬度列索引
-        value_col=2,  # CSV中数值列索引
-        enable_interpolation=True
+        lon_col=0,  # DataFrame中经度列索引
+        lat_col=1,  # DataFrame中纬度列索引
+        value_col=2,  # DataFrame中数值列索引
+        enable_interpolation=enable_interpolation  # 可配置的插值开关
     )
 
     # 获取输出的栅格文件路径
     output_tif = workflow_result['raster']
     logger.info(f"生成栅格文件: {output_tif}")
 
-    # 3. 生成栅格地图
+    # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
     map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
-    county_shp = os.path.join(raster_dir, "Lechang.shp")  # 县界SHP
-
-    mapping_raster(
-        shp_path=county_shp,
-        tif_path=output_tif,
-        color_map_name=color_map_name,
-        title_name=f"Irrigation water input flux",
-        output_path=map_output,
-        output_size=output_size
-
-    )
+    
+    try:
+        # 创建MappingUtils实例并直接调用
+        mapper = MappingUtils()
+        
+        # 转换颜色方案名称
+        colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
+        
+        # 统一使用boundary_gdf,不再依赖Shapefile
+        map_result = mapper.create_raster_map(
+            shp_path=None,  # 不使用Shapefile
+            tif_path=output_tif,
+            output_path=os.path.splitext(map_output)[0],  # 去掉扩展名
+            colormap=colormap_key,
+            title="Irrigation water input flux",
+            output_size=output_size,
+            dpi=300,
+            enable_interpolation=enable_interpolation,  # 可配置的插值开关
+            interpolation_method=interpolation_method,  # 可配置的插值方法
+            boundary_gdf=boundary_gdf  # 使用从数据库获取的边界数据,如果为None则使用数据点范围
+        )
+        
+        # 如果生成的路径与期望不同,则重命名
+        if map_result != map_output and os.path.exists(map_result):
+            shutil.move(map_result, map_output)
+            logger.info(f"栅格地图已生成: {map_output}")
+            
+    except Exception as e:
+        logger.error(f"栅格地图生成失败: {str(e)}")
 
-    # 4. 生成直方图
+    # 5. 生成直方图(直接使用统一的MappingUtils接口)
     hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
-    plot_tif_histogram(
-        file_path=output_tif,
-        output_path=hist_output,
-        title=f"Irrigation water input flux"
-    )
+    
+    try:
+        # 直接调用统一绘图接口生成直方图
+        hist_result = mapper.create_histogram(
+            file_path=output_tif,
+            save_path=hist_output,
+            figsize=(8, 8),
+            xlabel='Cd(ug/L)',
+            ylabel='frequency density',
+            title='Irrigation water input flux',
+            bins=100,
+            dpi=300
+        )
+        
+        if hist_result:
+            logger.info(f"直方图已生成: {hist_output}")
+            
+    except Exception as e:
+        logger.error(f"直方图生成失败: {str(e)}")
 
+    # 使用实际的CSV路径(如果生成了)或生成兼容性路径
+    if cleaned_csv_path:
+        cleaned_csv = cleaned_csv_path
+    else:
+        # 为了兼容性,生成路径(即使文件不存在)
+        data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
+        cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
+    
     return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
 
 

+ 0 - 1
app/static/water/Raster/lechang.cpg

@@ -1 +0,0 @@
-UTF-8

BIN
app/static/water/Raster/lechang.dbf


+ 0 - 1
app/static/water/Raster/lechang.prj

@@ -1 +0,0 @@
-GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]

BIN
app/static/water/Raster/lechang.shp


BIN
app/static/water/Raster/lechang.shx


BIN
app/static/water/Raster/lechang.zip


+ 103 - 0
app/utils/logger_config.py

@@ -0,0 +1,103 @@
+"""
+统一的日志配置工具
+用于在整个项目中统一管理日志配置,避免重复日志输出
+"""
+
+import logging
+from typing import Optional
+
+
+def setup_logger(
+    name: str,
+    level: int = logging.INFO,
+    propagate: bool = False,
+    formatter_string: Optional[str] = None
+) -> logging.Logger:
+    """
+    设置并配置logger,避免重复日志输出
+    
+    @param name: logger名称,通常使用 __name__
+    @param level: 日志级别,默认为 INFO
+    @param propagate: 是否向父级logger传播,默认为False,避免重复输出
+    @param formatter_string: 自定义格式字符串,如果不提供则使用默认格式
+    @return: 配置好的logger对象
+    """
+    logger = logging.getLogger(name)
+    
+    # 避免重复添加处理器导致重复日志
+    if not logger.handlers:
+        logger.setLevel(level)
+        
+        # 创建控制台处理器
+        handler = logging.StreamHandler()
+        
+        # 设置日志格式
+        if formatter_string is None:
+            formatter_string = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+        formatter = logging.Formatter(formatter_string)
+        handler.setFormatter(formatter)
+        
+        # 添加处理器
+        logger.addHandler(handler)
+        
+        # 控制日志传播
+        logger.propagate = propagate
+    
+    return logger
+
+
+def get_api_logger(module_name: str) -> logging.Logger:
+    """
+    为API模块获取标准logger配置
+    
+    @param module_name: 模块名称,通常使用 __name__
+    @return: 配置好的logger对象
+    """
+    return setup_logger(
+        name=module_name,
+        level=logging.INFO,
+        propagate=False  # API层不传播到父级,避免与uvicorn日志冲突
+    )
+
+
+def get_service_logger(module_name: str) -> logging.Logger:
+    """
+    为Service模块获取标准logger配置
+    
+    @param module_name: 模块名称,通常使用 __name__
+    @return: 配置好的logger对象
+    """
+    return setup_logger(
+        name=module_name,
+        level=logging.INFO,
+        propagate=False  # Service层不传播到父级,保持独立
+    )
+
+
+def get_utils_logger(module_name: str) -> logging.Logger:
+    """
+    为工具模块获取标准logger配置
+    
+    @param module_name: 模块名称,通常使用 __name__
+    @return: 配置好的logger对象
+    """
+    return setup_logger(
+        name=module_name,
+        level=logging.INFO,
+        propagate=False  # 工具层不传播到父级,保持独立
+    )
+
+
+# 主应用日志配置函数
+def configure_root_logging(level: int = logging.INFO) -> None:
+    """
+    配置根日志系统,只在应用启动时调用一次
+    
+    @param level: 根日志级别
+    """
+    # 检查是否已经配置过,避免重复配置
+    if not logging.getLogger().handlers:
+        logging.basicConfig(
+            level=level,
+            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+        )

+ 355 - 46
app/utils/mapping_utils.py

@@ -65,11 +65,14 @@ class MappingUtils:
         self.logger = logging.getLogger(self.__class__.__name__)
         self.logger.setLevel(log_level)
         
+        # 避免重复添加处理器,并防止日志传播到父级处理器导致重复输出
         if not self.logger.handlers:
             handler = logging.StreamHandler()
             formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
             handler.setFormatter(formatter)
             self.logger.addHandler(handler)
+            # 关闭日志传播,避免与全局basicConfig冲突
+            self.logger.propagate = False
     
     def csv_to_shapefile(self, csv_file, shapefile_output, lon_col=0, lat_col=1, value_col=2):
         """
@@ -121,6 +124,50 @@ class MappingUtils:
             self.logger.error(f"CSV转Shapefile失败: {str(e)}")
             raise
     
+    def dataframe_to_geodataframe(self, df, lon_col=0, lat_col=1, value_col=2, field_name='Prediction'):
+        """
+        将DataFrame直接转换为GeoDataFrame(内存处理)
+        
+        @param df: pandas DataFrame
+        @param lon_col: 经度列索引或列名,默认第0列
+        @param lat_col: 纬度列索引或列名,默认第1列  
+        @param value_col: 数值列索引或列名,默认第2列
+        @param field_name: 值字段名称
+        @return: GeoDataFrame
+        """
+        try:
+            self.logger.info("开始将DataFrame转换为GeoDataFrame(内存处理)")
+            
+            # 支持列索引或列名
+            if isinstance(lon_col, int):
+                lon = df.iloc[:, lon_col]
+            else:
+                lon = df[lon_col]
+                
+            if isinstance(lat_col, int):
+                lat = df.iloc[:, lat_col]
+            else:
+                lat = df[lat_col]
+                
+            if isinstance(value_col, int):
+                val = df.iloc[:, value_col]
+            else:
+                val = df[value_col]
+            
+            # 创建几何对象
+            geometry = [Point(xy) for xy in zip(lon, lat)]
+            
+            # 创建新的DataFrame,只包含必要的列
+            data = {field_name: val}
+            gdf = gpd.GeoDataFrame(data, geometry=geometry, crs="EPSG:4326")
+            
+            self.logger.info(f"✓ 成功转换DataFrame到GeoDataFrame: {len(gdf)} 个点")
+            return gdf
+            
+        except Exception as e:
+            self.logger.error(f"DataFrame转GeoDataFrame失败: {str(e)}")
+            raise
+    
     def create_boundary_mask(self, raster, transform, gdf):
         """
         创建边界掩膜,只保留边界内的区域
@@ -199,8 +246,150 @@ class MappingUtils:
             self.logger.error(f"插值失败: {str(e)}")
             return raster
     
+    def geodataframe_to_raster(self, gdf, template_tif, output_tif, field, 
+                              resolution_factor=16.0, boundary_gdf=None, interpolation_method='nearest', enable_interpolation=True):
+        """
+        将GeoDataFrame直接转换为栅格数据(内存处理版本)
+        
+        @param gdf: 输入的GeoDataFrame
+        @param template_tif: 用作模板的GeoTIFF文件路径
+        @param output_tif: 输出栅格化后的GeoTIFF文件路径
+        @param field: 用于栅格化的属性字段名
+        @param resolution_factor: 分辨率倍数因子
+        @param boundary_gdf: 边界GeoDataFrame,用于创建掩膜
+        @param interpolation_method: 插值方法 ('nearest', 'linear', 'cubic')
+        @param enable_interpolation: 是否启用空间插值,默认True
+        @return: 输出的GeoTIFF文件路径和统计信息
+        """
+        try:
+            self.logger.info(f"开始处理GeoDataFrame到栅格(内存处理)")
+            interpolation_status = "启用" if enable_interpolation else "禁用"
+            self.logger.info(f"分辨率因子: {resolution_factor}, 插值设置: {interpolation_status} (方法: {interpolation_method})")
+            
+            # 读取模板栅格
+            with rasterio.open(template_tif) as src:
+                template_meta = src.meta.copy()
+                
+                # 根据分辨率因子计算新的尺寸和变换参数
+                if resolution_factor != 1.0:
+                    width = int(src.width * resolution_factor)
+                    height = int(src.height * resolution_factor)
+                    
+                    transform = rasterio.Affine(
+                        src.transform.a / resolution_factor,
+                        src.transform.b,
+                        src.transform.c,
+                        src.transform.d,
+                        src.transform.e / resolution_factor,
+                        src.transform.f
+                    )
+                    self.logger.info(f"分辨率调整: {src.width}x{src.height} -> {width}x{height}")
+                else:
+                    width = src.width
+                    height = src.height
+                    transform = src.transform
+                    self.logger.info(f"保持原始分辨率: {width}x{height}")
+                
+                crs = src.crs
+            
+            # 投影矢量数据
+            if gdf.crs != crs:
+                gdf = gdf.to_crs(crs)
+            
+            # 栅格化
+            shapes = ((geom, value) for geom, value in zip(gdf.geometry, gdf[field]))
+            raster = rasterize(
+                shapes=shapes,
+                out_shape=(height, width),
+                transform=transform,
+                fill=np.nan,
+                dtype='float32'
+            )
+            
+            # 预备掩膜:优先使用行政区边界;若未提供边界,则使用点集凸包限制绘制范围
+            boundary_mask = None
+            if boundary_gdf is not None:
+                self.logger.info("应用边界掩膜: 使用直接提供的GeoDataFrame")
+                # 确保边界GeoDataFrame的CRS与栅格一致
+                if boundary_gdf.crs != crs:
+                    boundary_gdf = boundary_gdf.to_crs(crs)
+                boundary_mask = self.create_boundary_mask(raster, transform, boundary_gdf)
+                raster[~boundary_mask] = np.nan
+            else:
+                try:
+                    # 使用点集凸包作为默认掩膜,避免边界外着色
+                    hull = gdf.unary_union.convex_hull
+                    hull_gdf = gpd.GeoDataFrame(geometry=[hull], crs=crs)
+                    boundary_mask = self.create_boundary_mask(raster, transform, hull_gdf)
+                    raster[~boundary_mask] = np.nan
+                    self.logger.info("已使用点集凸包限制绘制范围")
+                except Exception as hull_err:
+                    self.logger.warning(f"生成点集凸包掩膜失败,可能会出现边界外着色: {str(hull_err)}")
+            
+            # 检查栅格数据状态并决定是否插值
+            nan_count = np.isnan(raster).sum()
+            total_pixels = raster.size
+            self.logger.info(f"栅格数据状态: 总像素数 {total_pixels}, NaN像素数 {nan_count} ({nan_count/total_pixels*100:.1f}%)")
+            
+            # 使用插值方法填充NaN值(如果启用)
+            if enable_interpolation and nan_count > 0:
+                self.logger.info(f"✓ 启用插值: 使用 {interpolation_method} 方法填充 {nan_count} 个NaN像素...")
+                raster = self.interpolate_nan_values(raster, method=interpolation_method)
+                # 关键修正:插值后再次应用掩膜,确保边界外不被填充
+                if boundary_mask is not None:
+                    raster[~boundary_mask] = np.nan
+                final_nan_count = np.isnan(raster).sum()
+                self.logger.info(f"插值完成: 剩余NaN像素数 {final_nan_count}")
+            elif enable_interpolation and nan_count == 0:
+                self.logger.info("✓ 插值已启用,但栅格数据无NaN值,无需插值")
+            elif not enable_interpolation and nan_count > 0:
+                self.logger.info(f"✗ 插值已禁用,保留 {nan_count} 个NaN像素")
+            else:
+                self.logger.info("✓ 栅格数据完整,无需插值")
+            
+            # 创建输出目录
+            os.makedirs(os.path.dirname(output_tif), exist_ok=True)
+            
+            # 更新元数据
+            template_meta.update({
+                "count": 1,
+                "dtype": 'float32',
+                "nodata": np.nan,
+                "width": width,
+                "height": height,
+                "transform": transform
+            })
+            
+            # 保存栅格文件
+            with rasterio.open(output_tif, 'w', **template_meta) as dst:
+                dst.write(raster, 1)
+            
+            # 计算统计信息
+            valid_data = raster[~np.isnan(raster)]
+            stats = None
+            if len(valid_data) > 0:
+                stats = {
+                    'min': float(np.min(valid_data)),
+                    'max': float(np.max(valid_data)),
+                    'mean': float(np.mean(valid_data)),
+                    'std': float(np.std(valid_data)),
+                    'valid_pixels': int(len(valid_data)),
+                    'total_pixels': int(raster.size)
+                }
+                self.logger.info(f"统计信息: 有效像素 {stats['valid_pixels']}/{stats['total_pixels']}")
+                self.logger.info(f"数值范围: {stats['min']:.4f} - {stats['max']:.4f}")
+            else:
+                self.logger.warning("没有有效数据")
+            
+            self.logger.info(f"✓ 成功保存: {output_tif}")
+            return output_tif, stats
+            
+        except Exception as e:
+            self.logger.error(f"GeoDataFrame转栅格失败: {str(e)}")
+            raise
+    
     def vector_to_raster(self, input_shapefile, template_tif, output_tif, field, 
-                        resolution_factor=16.0, boundary_shp=None, interpolation_method='nearest', enable_interpolation=True):
+                        resolution_factor=16.0, boundary_shp=None, boundary_gdf=None, interpolation_method='nearest', enable_interpolation=True):
         """
         将点矢量数据转换为栅格数据
         
@@ -209,14 +398,16 @@ class MappingUtils:
         @param output_tif: 输出栅格化后的GeoTIFF文件路径
         @param field: 用于栅格化的属性字段名
         @param resolution_factor: 分辨率倍数因子
-        @param boundary_shp: 边界Shapefile文件路径,用于创建掩膜
+        @param boundary_shp: 边界Shapefile文件路径,用于创建掩膜(兼容性保留)
+        @param boundary_gdf: 边界GeoDataFrame,优先使用此参数而非boundary_shp
         @param interpolation_method: 插值方法 ('nearest', 'linear', 'cubic')
         @param enable_interpolation: 是否启用空间插值,默认True
         @return: 输出的GeoTIFF文件路径和统计信息
         """
         try:
             self.logger.info(f"开始处理: {input_shapefile}")
-            self.logger.info(f"分辨率因子: {resolution_factor}, 插值方法: {interpolation_method}")
+            interpolation_status = "启用" if enable_interpolation else "禁用"
+            self.logger.info(f"分辨率因子: {resolution_factor}, 插值设置: {interpolation_status} (方法: {interpolation_method})")
             
             # 读取矢量数据
             gdf = gpd.read_file(input_shapefile)
@@ -263,13 +454,20 @@ class MappingUtils:
             
             # 预备掩膜:优先使用行政区边界;若未提供边界,则使用点集凸包限制绘制范围
             boundary_mask = None
-            if boundary_shp and os.path.exists(boundary_shp):
-                self.logger.info(f"应用边界掩膜: {boundary_shp}")
-                boundary_gdf = gpd.read_file(boundary_shp)
+            if boundary_gdf is not None:
+                self.logger.info("应用边界掩膜: 使用直接提供的GeoDataFrame")
+                # 确保边界GeoDataFrame的CRS与栅格一致
                 if boundary_gdf.crs != crs:
                     boundary_gdf = boundary_gdf.to_crs(crs)
                 boundary_mask = self.create_boundary_mask(raster, transform, boundary_gdf)
                 raster[~boundary_mask] = np.nan
+            elif boundary_shp and os.path.exists(boundary_shp):
+                self.logger.info(f"应用边界掩膜: {boundary_shp}")
+                boundary_gdf_from_file = gpd.read_file(boundary_shp)
+                if boundary_gdf_from_file.crs != crs:
+                    boundary_gdf_from_file = boundary_gdf_from_file.to_crs(crs)
+                boundary_mask = self.create_boundary_mask(raster, transform, boundary_gdf_from_file)
+                raster[~boundary_mask] = np.nan
             else:
                 try:
                     # 使用点集凸包作为默认掩膜,避免边界外着色
@@ -281,15 +479,26 @@ class MappingUtils:
                 except Exception as hull_err:
                     self.logger.warning(f"生成点集凸包掩膜失败,可能会出现边界外着色: {str(hull_err)}")
             
+            # 检查栅格数据状态并决定是否插值
+            nan_count = np.isnan(raster).sum()
+            total_pixels = raster.size
+            self.logger.info(f"栅格数据状态: 总像素数 {total_pixels}, NaN像素数 {nan_count} ({nan_count/total_pixels*100:.1f}%)")
+            
             # 使用插值方法填充NaN值(如果启用)
-            if enable_interpolation and np.isnan(raster).any():
-                self.logger.info(f"使用 {interpolation_method} 方法进行插值...")
+            if enable_interpolation and nan_count > 0:
+                self.logger.info(f"✓ 启用插值: 使用 {interpolation_method} 方法填充 {nan_count} 个NaN像素...")
                 raster = self.interpolate_nan_values(raster, method=interpolation_method)
                 # 关键修正:插值后再次应用掩膜,确保边界外不被填充
                 if boundary_mask is not None:
                     raster[~boundary_mask] = np.nan
-            elif not enable_interpolation and np.isnan(raster).any():
-                self.logger.info("插值已禁用,保留原始栅格数据(包含NaN值)")
+                final_nan_count = np.isnan(raster).sum()
+                self.logger.info(f"插值完成: 剩余NaN像素数 {final_nan_count}")
+            elif enable_interpolation and nan_count == 0:
+                self.logger.info("✓ 插值已启用,但栅格数据无NaN值,无需插值")
+            elif not enable_interpolation and nan_count > 0:
+                self.logger.info(f"✗ 插值已禁用,保留 {nan_count} 个NaN像素")
+            else:
+                self.logger.info("✓ 栅格数据完整,无需插值")
             
             # 创建输出目录
             os.makedirs(os.path.dirname(output_tif), exist_ok=True)
@@ -336,11 +545,11 @@ class MappingUtils:
                          colormap='green_yellow_red_purple', title="Prediction Map",
                          output_size=12, figsize=None, dpi=300,
                          resolution_factor=1.0, enable_interpolation=True,
-                         interpolation_method='nearest'):
+                         interpolation_method='nearest', boundary_gdf=None):
         """
         创建栅格地图
         
-        @param shp_path: 输入的矢量数据路径
+        @param shp_path: 输入的矢量数据路径(兼容性保留)
         @param tif_path: 输入的栅格数据路径
         @param output_path: 输出图片路径(不包含扩展名)
         @param colormap: 色彩方案名称或颜色列表
@@ -351,14 +560,23 @@ class MappingUtils:
         @param resolution_factor: 分辨率因子,>1提高分辨率,<1降低分辨率
         @param enable_interpolation: 是否启用空间插值,用于处理NaN值或提高分辨率,默认True
         @param interpolation_method: 插值方法 ('nearest', 'linear', 'cubic')
-                @return: 输出图片文件路径
+        @param boundary_gdf: 边界GeoDataFrame(可选,优先使用)
+        @return: 输出图片文件路径
         """
         try:
             self.logger.info(f"开始创建栅格地图: {tif_path}")
             self.logger.info(f"分辨率因子: {resolution_factor}, 启用插值: {enable_interpolation}")
 
-            # 读取矢量边界
-            gdf = gpd.read_file(shp_path) if shp_path else None
+            # 读取矢量边界:优先使用boundary_gdf,否则从shp_path读取
+            if boundary_gdf is not None:
+                gdf = boundary_gdf
+                self.logger.info("使用直接提供的边界GeoDataFrame")
+            elif shp_path:
+                gdf = gpd.read_file(shp_path)
+                self.logger.info(f"从文件读取边界数据: {shp_path}")
+            else:
+                gdf = None
+                self.logger.info("未提供边界数据,将使用整个栅格范围")
 
             # 读取并裁剪栅格数据
             with rasterio.open(tif_path) as src:
@@ -406,21 +624,38 @@ class MappingUtils:
             if np.all(np.isnan(raster)):
                 raise ValueError("栅格数据中没有有效值")
 
-            # 根据分位数分为6个等级
-            bounds = np.nanpercentile(raster, [0, 20, 40, 60, 80, 90, 100])
-            norm = BoundaryNorm(bounds, ncolors=len(bounds) - 1)
-
-            # 获取色彩方案
-            if isinstance(colormap, str):
-                if colormap in COLORMAPS:
-                    color_list = COLORMAPS[colormap]
+            # 检查数据是否为相同值
+            valid_data = raster[~np.isnan(raster)]
+            data_min = np.min(valid_data)
+            data_max = np.max(valid_data)
+            
+            if data_min == data_max:
+                # 所有值相同的情况:创建简单的单色映射
+                self.logger.info(f"检测到所有值相同 ({data_min:.6f}),使用单色映射")
+                bounds = [data_min - 0.001, data_min + 0.001]  # 创建微小的范围
+                norm = BoundaryNorm(bounds, ncolors=1)
+                # 使用绿色系的第一个颜色作为单色
+                if isinstance(colormap, str) and colormap in COLORMAPS:
+                    single_color = COLORMAPS[colormap][0]  # 使用色彩方案的第一个颜色
                 else:
-                    self.logger.warning(f"未知色彩方案: {colormap},使用默认方案")
-                    color_list = COLORMAPS['green_yellow_red_purple']
+                    single_color = '#89AC46'  # 默认绿色
+                cmap = ListedColormap([single_color])
             else:
-                color_list = colormap
-
-            cmap = ListedColormap(color_list)
+                # 正常情况:根据分位数分为6个等级
+                bounds = np.nanpercentile(raster, [0, 20, 40, 60, 80, 90, 100])
+                norm = BoundaryNorm(bounds, ncolors=len(bounds) - 1)
+                
+                # 获取色彩方案
+                if isinstance(colormap, str):
+                    if colormap in COLORMAPS:
+                        color_list = COLORMAPS[colormap]
+                    else:
+                        self.logger.warning(f"未知色彩方案: {colormap},使用默认方案")
+                        color_list = COLORMAPS['green_yellow_red_purple']
+                else:
+                    color_list = colormap
+                
+                cmap = ListedColormap(color_list)
 
             # 设置图片尺寸
             if figsize is not None:
@@ -461,16 +696,29 @@ class MappingUtils:
             ax.tick_params(axis='y', labelrotation=90)
 
             # 添加色带
-            tick_labels = [f"{bounds[i]:.1f}" for i in range(len(bounds) - 1)]
-            cbar = plt.colorbar(
-                plt.cm.ScalarMappable(norm=norm, cmap=cmap),
-                ax=ax,
-                ticks=[(bounds[i] + bounds[i+1]) / 2 for i in range(len(bounds) - 1)],
-                shrink=0.6,
-                aspect=15
-            )
-            cbar.ax.set_yticklabels(tick_labels)
-            cbar.set_label("Values")
+            if data_min == data_max:
+                # 单值情况:简化的色带
+                cbar = plt.colorbar(
+                    plt.cm.ScalarMappable(norm=norm, cmap=cmap),
+                    ax=ax,
+                    ticks=[data_min],
+                    shrink=0.6,
+                    aspect=15
+                )
+                cbar.ax.set_yticklabels([f"{data_min:.6f}"])
+                cbar.set_label("Fixed Value")
+            else:
+                # 正常情况:分级色带
+                tick_labels = [f"{bounds[i]:.1f}" for i in range(len(bounds) - 1)]
+                cbar = plt.colorbar(
+                    plt.cm.ScalarMappable(norm=norm, cmap=cmap),
+                    ax=ax,
+                    ticks=[(bounds[i] + bounds[i+1]) / 2 for i in range(len(bounds) - 1)],
+                    shrink=0.6,
+                    aspect=15
+                )
+                cbar.ax.set_yticklabels(tick_labels)
+                cbar.set_label("Values")
 
             plt.tight_layout()
 
@@ -585,13 +833,26 @@ class MappingUtils:
             if len(band_flat) == 0:
                 raise ValueError("栅格数据中没有有效值")
             
+            # 检查是否所有值相同
+            data_min = np.min(band_flat)
+            data_max = np.max(band_flat)
+            
             # 创建图形
             plt.figure(figsize=figsize)
             
-            # 绘制直方图和密度曲线
-            sns.histplot(band_flat, bins=bins, color='steelblue', alpha=0.7, 
-                        edgecolor='black', stat='density')
-            sns.kdeplot(band_flat, color='red', linewidth=2)
+            if data_min == data_max:
+                # 所有值相同:创建特殊的单值直方图
+                self.logger.info(f"检测到所有值相同 ({data_min:.6f}),创建单值直方图")
+                plt.bar([data_min], [len(band_flat)], width=0.1*abs(data_min) if data_min != 0 else 0.1, 
+                       color='steelblue', alpha=0.7, edgecolor='black')
+                plt.axvline(x=data_min, color='red', linewidth=2, linestyle='--', 
+                           label=f'Fixed Value: {data_min:.6f}')
+                plt.legend()
+            else:
+                # 正常情况:绘制直方图和密度曲线
+                sns.histplot(band_flat, bins=bins, color='steelblue', alpha=0.7, 
+                            edgecolor='black', stat='density')
+                sns.kdeplot(band_flat, color='red', linewidth=2)
             
             # 设置标签和标题
             plt.xlabel(xlabel, fontsize=14)
@@ -627,17 +888,65 @@ def get_available_colormaps():
     return COLORMAPS.copy()
 
 
+def dataframe_to_raster_workflow(df, template_tif, output_dir, 
+                                boundary_gdf=None, resolution_factor=16.0,
+                                interpolation_method='nearest', field_name='Prediction',
+                                lon_col=0, lat_col=1, value_col=2, enable_interpolation=False):
+    """
+    DataFrame到栅格转换工作流(内存处理优化版本)
+    
+    @param df: pandas DataFrame
+    @param template_tif: 模板GeoTIFF文件路径
+    @param output_dir: 输出目录
+    @param boundary_gdf: 边界GeoDataFrame(可选)
+    @param resolution_factor: 分辨率因子
+    @param interpolation_method: 插值方法
+    @param field_name: 字段名称
+    @param lon_col: 经度列
+    @param lat_col: 纬度列
+    @param value_col: 数值列
+    @param enable_interpolation: 是否启用空间插值,默认False
+    @return: 输出文件路径字典
+    """
+    mapper = MappingUtils()
+    
+    # 确保输出目录存在
+    os.makedirs(output_dir, exist_ok=True)
+    
+    # 生成文件名(基于时间戳,避免冲突)
+    import time
+    timestamp = str(int(time.time()))
+    raster_path = os.path.join(output_dir, f"memory_raster_{timestamp}.tif")
+    
+    # 1. DataFrame直接转GeoDataFrame(内存处理)
+    gdf = mapper.dataframe_to_geodataframe(df, lon_col, lat_col, value_col, field_name)
+    
+    # 2. GeoDataFrame直接转栅格(内存处理)
+    raster_path, stats = mapper.geodataframe_to_raster(
+        gdf, template_tif, raster_path, field_name,
+        resolution_factor, boundary_gdf, interpolation_method, enable_interpolation
+    )
+    
+    return {
+        'shapefile': None,  # 内存处理不生成shapefile
+        'raster': raster_path,
+        'statistics': stats,
+        'geodataframe': gdf  # 返回GeoDataFrame供调试使用
+    }
+
+
 def csv_to_raster_workflow(csv_file, template_tif, output_dir, 
-                          boundary_shp=None, resolution_factor=16.0,
+                          boundary_shp=None, boundary_gdf=None, resolution_factor=16.0,
                           interpolation_method='nearest', field_name='Prediction',
                           lon_col=0, lat_col=1, value_col=2, enable_interpolation=False):
     """
-    完整的CSV到栅格转换工作流
+    完整的CSV到栅格转换工作流(原版本,保持兼容性)
     
     @param csv_file: CSV文件路径
     @param template_tif: 模板GeoTIFF文件路径
     @param output_dir: 输出目录
-    @param boundary_shp: 边界Shapefile文件路径(可选)
+    @param boundary_shp: 边界Shapefile文件路径(可选,兼容性保留)
+    @param boundary_gdf: 边界GeoDataFrame(可选,优先使用)
     @param resolution_factor: 分辨率因子
     @param interpolation_method: 插值方法
     @param field_name: 字段名称
@@ -663,7 +972,7 @@ def csv_to_raster_workflow(csv_file, template_tif, output_dir,
     # 2. Shapefile转栅格
     raster_path, stats = mapper.vector_to_raster(
         shapefile_path, template_tif, raster_path, field_name,
-        resolution_factor, boundary_shp, interpolation_method, enable_interpolation
+        resolution_factor, boundary_shp, boundary_gdf, interpolation_method, enable_interpolation
     )
     
     return {

+ 101 - 0
docs/logging_best_practices.md

@@ -0,0 +1,101 @@
+# 日志配置最佳实践
+
+## 问题背景
+
+在项目中出现了重复日志的问题,主要原因是:
+
+1. **多重日志处理器配置**:多个模块都手动配置了 `StreamHandler`
+2. **日志传播机制**:子 logger 的日志会传播到父 logger,如果都有处理器就会重复输出
+3. **缺乏统一配置**:各个模块独立配置日志,没有统一的配置模式
+
+## 解决方案
+
+### 1. 统一日志配置工具
+
+创建了 `app/utils/logger_config.py` 提供统一的日志配置功能:
+
+```python
+from app.utils.logger_config import get_api_logger
+
+# 在API模块中使用
+logger = get_api_logger(__name__)
+logger.info("这是API日志")
+```
+
+### 2. 避免重复配置的关键点
+
+```python
+# ✅ 正确的配置方式
+logger = logging.getLogger(__name__)
+if not logger.handlers:  # 检查是否已配置
+    logger.setLevel(logging.INFO)
+    handler = logging.StreamHandler()
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    logger.propagate = False  # 关闭传播,避免重复输出
+
+# ❌ 错误的配置方式(会导致重复日志)
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.INFO)
+handler = logging.StreamHandler()  # 每次都添加新的处理器
+logger.addHandler(handler)  # 重复添加处理器
+```
+
+### 3. 不同模块的使用方式
+
+#### API模块
+```python
+from app.utils.logger_config import get_api_logger
+
+logger = get_api_logger(__name__)
+logger.info("API操作日志")
+```
+
+#### Service模块
+```python
+from app.utils.logger_config import get_service_logger
+
+logger = get_service_logger(__name__)
+logger.info("业务逻辑日志")
+```
+
+#### 工具模块
+```python
+from app.utils.logger_config import get_utils_logger
+
+logger = get_utils_logger(__name__)
+logger.info("工具函数日志")
+```
+
+## 修复的文件
+
+已修复以下文件中的重复日志配置:
+
+1. `app/api/water.py` - 添加了处理器检查和传播控制
+2. `app/api/cd_flux.py` - 添加了处理器检查和传播控制  
+3. `app/services/cd_flux_service.py` - 添加了处理器检查和传播控制
+4. `app/utils/mapping_utils.py` - 已经有正确的配置
+
+## 测试方法
+
+运行演示脚本来验证修复效果:
+
+```bash
+conda activate GeoSys
+python scripts/demos/logging_demo.py
+```
+
+## 最佳实践总结
+
+1. **使用统一配置工具**:优先使用 `app/utils/logger_config.py` 中的函数
+2. **检查处理器**:配置前检查 `if not logger.handlers:`
+3. **控制传播**:设置 `logger.propagate = False` 避免重复输出
+4. **避免重复配置**:不要在多个地方配置同一个logger
+5. **根日志配置**:只在应用启动时配置一次根日志系统
+
+## 注意事项
+
+- 在类中使用日志时,建议在 `__init__` 方法中配置,避免每次调用都重新配置
+- 对于 FastAPI 应用,uvicorn 已经有自己的日志配置,避免与其冲突
+- 测试环境和生产环境可能需要不同的日志级别和输出方式

+ 74 - 0
migrations/versions/29a56094509c_add_four_county_land_use_table.py

@@ -0,0 +1,74 @@
+"""add_four_county_land_use_table
+
+Revision ID: 29a56094509c
+Revises: d89bb52ae481
+Create Date: 2025-08-17 10:01:27.121198
+
+"""
+from alembic import op
+import sqlalchemy as sa
+import geoalchemy2
+
+
+# revision identifiers, used by Alembic.
+revision = '29a56094509c'
+down_revision = 'd89bb52ae481'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """升级数据库到当前版本 - 创建四县三种用电土地利用类型表"""
+    # 创建四县三种用电土地利用类型表
+    op.create_table(
+        'four_county_land',
+        sa.Column('gid', sa.Integer(), autoincrement=True, nullable=False, comment='主键ID'),
+        sa.Column('objectid', sa.Integer(), nullable=True, comment='对象标识符'),
+        sa.Column('bsm', sa.String(length=18), nullable=True, comment='标识码'),
+        sa.Column('ysdm', sa.String(length=10), nullable=True, comment='要素代码'),
+        sa.Column('tbybh', sa.String(length=18), nullable=True, comment='图斑预编号'),
+        sa.Column('tbbh', sa.String(length=8), nullable=True, comment='图斑编号'),
+        sa.Column('dlbm', sa.String(length=5), nullable=True, comment='地类编码'),
+        sa.Column('dlmc', sa.String(length=60), nullable=True, comment='地类名称'),
+        sa.Column('qsxz', sa.String(length=2), nullable=True, comment='权属性质'),
+        sa.Column('qsdwdm', sa.String(length=19), nullable=True, comment='权属单位代码'),
+        sa.Column('qsdwmc', sa.String(length=60), nullable=True, comment='权属单位名称'),
+        sa.Column('zldwdm', sa.String(length=19), nullable=True, comment='坐落单位代码'),
+        sa.Column('zldwmc', sa.String(length=60), nullable=True, comment='坐落单位名称'),
+        sa.Column('tbmj', sa.Numeric(precision=19, scale=11), nullable=True, comment='图斑面积'),
+        sa.Column('kcdlbm', sa.String(length=5), nullable=True, comment='扣除地类编码'),
+        sa.Column('kcxs', sa.Numeric(precision=19, scale=11), nullable=True, comment='扣除系数'),
+        sa.Column('kcmj', sa.Numeric(precision=19, scale=11), nullable=True, comment='扣除面积'),
+        sa.Column('tbdlmj', sa.Numeric(precision=19, scale=11), nullable=True, comment='图斑地类面积'),
+        sa.Column('gdlx', sa.String(length=2), nullable=True, comment='耕地类型'),
+        sa.Column('gdpdjb', sa.String(length=2), nullable=True, comment='耕地坡度级别'),
+        sa.Column('xzdwkd', sa.Numeric(precision=19, scale=11), nullable=True, comment='线状地物宽度'),
+        sa.Column('tbxhdm', sa.String(length=6), nullable=True, comment='图斑细化代码'),
+        sa.Column('tbxhmc', sa.String(length=20), nullable=True, comment='图斑细化名称'),
+        sa.Column('zzsxdm', sa.String(length=6), nullable=True, comment='种植属性代码'),
+        sa.Column('zzsxmc', sa.String(length=20), nullable=True, comment='种植属性名称'),
+        sa.Column('gddb', sa.Integer(), nullable=True, comment='耕地等别'),
+        sa.Column('frdbs', sa.String(length=1), nullable=True, comment='非入等标识'),
+        sa.Column('czcsxm', sa.String(length=4), nullable=True, comment='城镇村属性码'),
+        sa.Column('sjnf', sa.Integer(), nullable=True, comment='数据年份'),
+        sa.Column('mssm', sa.String(length=2), nullable=True, comment='模式识别码'),
+        sa.Column('hdmc', sa.String(length=100), nullable=True, comment='河道名称'),
+        sa.Column('bz', sa.String(length=254), nullable=True, comment='备注'),
+        sa.Column('shape_leng', sa.Numeric(precision=19, scale=11), nullable=True, comment='形状长度'),
+        sa.Column('shape_le_1', sa.Numeric(precision=19, scale=11), nullable=True, comment='形状长度1'),
+        sa.Column('shape_area', sa.Numeric(precision=19, scale=11), nullable=True, comment='形状面积'),
+        sa.Column('geom', geoalchemy2.types.Geometry(geometry_type='POLYGON', srid=4526, from_text='ST_GeomFromEWKT', name='geometry'), nullable=True, comment='几何位置信息-多边形'),
+        sa.PrimaryKeyConstraint('gid'),
+        comment='四县三种用电土地利用类型数据'
+    )
+    
+    # 创建空间索引
+    op.execute("CREATE INDEX IF NOT EXISTS idx_four_county_land_geom ON four_county_land USING gist (geom)")
+
+
+def downgrade():
+    """将数据库降级到上一版本 - 删除四县三种用电土地利用类型表"""
+    # 删除空间索引
+    op.drop_index('idx_four_county_land_geom', table_name='four_county_land')
+    # 删除表
+    op.drop_table('four_county_land')