2
0

6 コミット 5627bdbc59 ... 1d5f612c36

作者 SHA1 メッセージ 日付
  drggboy 1d5f612c36 优化土地数据处理接口,新增CSV文件生成选项,支持内存处理和直接从DataFrame转换为GeoDataFrame,提升数据处理效率和灵活性。同时更新日志记录,确保关键文件缺失时的错误处理更加完善。 4 日 前
  drggboy c27c0e0410 优化土地数据计算接口,支持动态边界和插值控制,更新日志记录信息,确保参数验证和错误处理更加完善。重构数据处理流程,直接从数据库获取边界数据 4 日 前
  drggboy 1ccde4b3d8 优化日志配置,添加统一的日志配置工具以避免重复日志输出,更新相关模块的日志设置,确保日志处理器不重复添加并控制日志传播。 4 日 前
  drggboy 6e8633a049 边界从数据库读取,删除lechang.shp文件 4 日 前
  drggboy 90b6150272 重构土地利用数据处理逻辑,替换原有SHP文件读取为数据库查询,优化数据获取和处理流程,更新相关文件路径和输出格式,提升代码可维护性和性能。 4 日 前
  drggboy c8617c1d13 添加四县三种用电土地利用类型模型及其数据库迁移脚本,更新模型导入路径。 4 日 前

+ 31 - 33
Water/Python_codes/Calculate.py

@@ -1,33 +1,35 @@
 import os
-import geopandas as gpd
+import sys
 import pandas as pd
-from pyproj import Transformer
 from shapely.geometry import Point
 import numpy as np
+from pathlib import Path
 
+# 添加项目根目录到Python路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.append(str(project_root))
 
+# 导入数据库服务
+from app.services.land_use_service import land_use_service
 
-# 读取 Shp 文件
-shp_file = r"..\Raster\四县三种用电.shp"
-gdf_shp = gpd.read_file(shp_file)
-
+# 设置要处理的土地类型
 str = "旱地"
-# 筛选出字段 DLMC 为指定类型的面要素
-gdf_shp = gdf_shp[gdf_shp['DLMC'] == str]
 
-# 定义坐标系转换对象(从原始坐标系转换为WGS84)
-transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True)
+# 从数据库获取土地利用数据,替代原来的SHP文件读取
+print(f"从数据库获取 '{str}' 类型的土地利用数据...")
+land_centers_df = land_use_service.get_land_centers_for_processing(str)
+
+if land_centers_df is None or land_centers_df.empty:
+    print(f"数据库中没有找到 '{str}' 类型的土地利用数据")
+    exit(1)
+
+print(f"从数据库获取到 {len(land_centers_df)} 个 '{str}' 类型的土地数据")
 
 # 读取 xls 文件
 xls_file = r"..\Data\Irrigation_water_SamplingPoint.xlsx"
 df_xls = pd.read_excel(xls_file)
 
-# 新建重金属含量字段,指定数据类型为双精度浮点数
-gdf_shp['Cd'] = pd.Series(dtype=np.float64)
-gdf_shp['Hg'] = pd.Series(dtype=np.float64)
-gdf_shp['Cr'] = pd.Series(dtype=np.float64)
-gdf_shp['Pb'] = pd.Series(dtype=np.float64)
-gdf_shp['As'] = pd.Series(dtype=np.float64)
+# 不再需要为GeoDataFrame添加字段,直接处理数据即可
 
 # 根据面要素类型设置系数
 A = 711 * 0.524
@@ -45,11 +47,11 @@ else:
 # 创建一个空列表来存储数据行
 data_rows = []
 
-# 遍历面要素
-for index, row in gdf_shp.iterrows():
-    # 获取面要素的中心并转换坐标
-    center_original = row['geometry'].centroid
-    center_lon, center_lat = transformer.transform(center_original.x, center_original.y)
+# 遍历面要素,使用数据库中的中心点坐标
+for index, row in land_centers_df.iterrows():
+    # 直接使用数据库中已经转换好的WGS84坐标
+    center_lon = row['center_lon']
+    center_lat = row['center_lat']
     center_transformed = Point(center_lon, center_lat)
     
     # 计算面要素中心与采样点的距离
@@ -59,24 +61,17 @@ for index, row in gdf_shp.iterrows():
     min_distance_index = distances.idxmin()
     nearest_sample = df_xls.loc[min_distance_index]
     
-    # 将最近采样点的值赋给面要素,并乘以系数Num
+    # 计算最近采样点的重金属含量,并乘以系数Num
     cd_value = nearest_sample['Cd (ug/L)'] * Num
     hg_value = nearest_sample['Hg (ug/L)'] * Num
     cr_value = nearest_sample['Cr (ug/L)'] * Num
     pb_value = nearest_sample['Pb (ug/L)'] * Num
     as_value = nearest_sample['As (ug/L)'] * Num
     
-    # 更新面要素的重金属含量
-    gdf_shp.at[index, 'Cd'] = cd_value
-    gdf_shp.at[index, 'Hg'] = hg_value
-    gdf_shp.at[index, 'Cr'] = cr_value
-    gdf_shp.at[index, 'Pb'] = pb_value
-    gdf_shp.at[index, 'As'] = as_value
-    
     # 将数据行添加到列表中
     data_rows.append({
-        '面要素ID': index,
-        'DLMC': row['DLMC'],
+        '面要素ID': row['gid'],  # 使用数据库中的gid
+        'DLMC': row['dlmc'],    # 使用数据库中的dlmc字段
         '中心点经度': center_lon,
         '中心点纬度': center_lat,
         '最近采样点ID': nearest_sample.name,
@@ -92,8 +87,11 @@ for index, row in gdf_shp.iterrows():
 # 从列表创建DataFrame
 csv_data = pd.DataFrame(data_rows)
 
-# 保存结果为CSV文件 - 修复路径中的转义字符问题
-csv_file_path = os.path.join('D:\土壤-大创\Water\Data\四县三用地&旱地.csv')
+# 保存结果为CSV文件
+output_dir = Path(__file__).parent.parent / "Data"
+output_dir.mkdir(exist_ok=True)
+csv_file_path = output_dir / f"四县三用地&{str}_DB.csv"
 csv_data.to_csv(csv_file_path, index=False, encoding='utf-8-sig')
 
 print(f"CSV文件已保存至: {csv_file_path}")
+print(f"总共处理了 {len(csv_data)} 个 {str} 面要素")

+ 27 - 27
Water/Python_codes/Counter.py

@@ -1,28 +1,29 @@
 import os
+import sys
 import pandas as pd
-from pyproj import Transformer
 from shapely.geometry import Point
 import numpy as np
+from pathlib import Path
 
+# 添加项目根目录到Python路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.append(str(project_root))
 
+# 导入数据库服务
+from app.services.land_use_service import land_use_service
 
-# 读取耕地中心点CSV文件
-csv_cultivated_land = r"..\Data\四县三用地&水田.csv"
-df_land = pd.read_csv(csv_cultivated_land)
+# 设置要处理的土地类型
+land_type = "旱地"  # 避免使用str作为变量名,因为它是Python内置函数
 
-# 确保CSV文件包含必要的字段
-required_fields = ['中心点经度', '中心点纬度', 'DLMC']
-for field in required_fields:
-    if field not in df_land.columns:
-        raise ValueError(f"耕地CSV文件缺少必要的字段: {field}")
+# 从数据库获取土地利用数据,替代原来的CSV文件读取
+print(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
+land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
 
-# 筛选出旱地
-land_type = "旱地"  # 避免使用str作为变量名,因为它是Python内置函数
-df_land = df_land[df_land['DLMC'] == land_type]
+if land_centers_df is None or land_centers_df.empty:
+    print(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
+    exit(1)
 
-# 定义坐标系转换(假设输入已是WGS84,根据实际情况修改)
-source_crs = "EPSG:4326"
-transformer = Transformer.from_crs(source_crs, "EPSG:4326", always_xy=True)
+print(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
 
 # 读取采样点数据
 xls_file = r"..\Data\Irrigation_water_SamplingPoint.xlsx"
@@ -45,15 +46,11 @@ else:
 # 存储结果的列表
 data_rows = []
 
-# 遍历耕地中心点
-for index, row in df_land.iterrows():
-    # 获取正确的坐标字段
-    lon = row['中心点经度']
-    lat = row['中心点纬度']
-    
-    # 坐标转换(如果需要)
-    if source_crs != "EPSG:4326":
-        lon, lat = transformer.transform(lon, lat)
+# 遍历耕地中心点,使用数据库中的中心点坐标
+for index, row in land_centers_df.iterrows():
+    # 直接使用数据库中已经转换好的WGS84坐标
+    lon = row['center_lon']
+    lat = row['center_lat']
     
     # 创建中心点几何对象
     center_point = Point(lon, lat)
@@ -73,7 +70,7 @@ for index, row in df_land.iterrows():
     
     # 添加到结果列表
     data_rows.append({
-        'DLMC': row['DLMC'],
+        'DLMC': row['dlmc'],  # 使用数据库中的字段名
         '中心点经度': lon,
         '中心点纬度': lat,
         'Cd (ug/L)': cd_value
@@ -83,7 +80,10 @@ for index, row in df_land.iterrows():
 csv_data = pd.DataFrame(data_rows)
 
 # 保存结果
-csv_file_path = r"D:\土壤-大创\Water\Data\旱地_Cd.csv"
+output_dir = Path(__file__).parent.parent / "Data"
+output_dir.mkdir(exist_ok=True)
+csv_file_path = output_dir / f"{land_type}_Cd_DB.csv"
 csv_data.to_csv(csv_file_path, index=False, encoding='utf-8-sig')
 
-print(f"CSV文件已保存至: {csv_file_path}")
+print(f"CSV文件已保存至: {csv_file_path}")
+print(f"总共处理了 {len(csv_data)} 个 {land_type} 面要素")

+ 41 - 10
Water/Python_codes/point_match.py

@@ -118,18 +118,48 @@ def write_csv(data, output_file):
         print(f"错误: 写入文件 {output_file} 时发生错误: {e}")
 
 def main():
-    # 配置文件路径
-    farmland_file = r'D:\17417\Documents\backend\Water\Data\四县三用地&水浇地.csv'  # 耕地中心点数据文件
-    sample_file = r'D:\17417\Documents\backend\Water\Data\SamplingPoint.csv'    # 采样点数据文件
-    output_file = r'D:\17417\Documents\backend\Water\Data\matched_data.csv'  # 输出文件
+    """主函数 - 使用数据库数据进行点匹配"""
+    import sys
+    from pathlib import Path
+    
+    # 添加项目根目录到Python路径
+    project_root = Path(__file__).parent.parent.parent
+    sys.path.append(str(project_root))
+    
+    # 导入数据库服务
+    from app.services.land_use_service import land_use_service
+    
+    # 设置要处理的土地类型
+    land_type = "水浇地"
+    
+    # 从数据库获取土地利用数据
+    print(f"正在从数据库读取 {land_type} 数据...")
+    land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
     
-    # 读取数据
-    print("正在读取耕地数据...")
-    farmland_data = read_csv(farmland_file)
-    print(f"已读取 {len(farmland_data)} 条耕地记录")
+    if land_centers_df is None or land_centers_df.empty:
+        print(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
+        return
+    
+    print(f"已从数据库读取 {len(land_centers_df)} 条 {land_type} 记录")
+    
+    # 转换为字典格式以兼容现有函数
+    farmland_data = []
+    for _, row in land_centers_df.iterrows():
+        farmland_data.append({
+            'id': row['gid'],
+            '中心点经度': row['center_lon'],
+            '中心点纬度': row['center_lat'],
+            'DLMC': row['dlmc']
+        })
+    
+    # 配置文件路径
+    sample_file = Path(__file__).parent.parent / "Data" / "SamplingPoint.csv"
+    output_dir = Path(__file__).parent.parent / "Data"
+    output_dir.mkdir(exist_ok=True)
+    output_file = output_dir / f"matched_data_{land_type}_DB.csv"
     
     print("正在读取采样点数据...")
-    sample_data = read_csv(sample_file)
+    sample_data = read_csv(str(sample_file))
     print(f"已读取 {len(sample_data)} 条采样点记录")
     
     if not farmland_data or not sample_data:
@@ -142,7 +172,8 @@ def main():
     
     # 写入结果
     print("正在写入结果...")
-    write_csv(merged_data, output_file)
+    write_csv(merged_data, str(output_file))
+    print(f"总共处理了 {len(merged_data)} 个 {land_type} 面要素")
 
 if __name__ == "__main__":
     main()

+ 10 - 6
app/api/cd_flux.py

@@ -19,13 +19,17 @@ DEFAULT_MAP_FILENAME = "fluxcd_input_map.jpg"
 DEFAULT_HIST_FILENAME = "fluxcd_input_histogram.jpg"
 DEFAULT_CSV_FILENAME = "fluxcd_input.csv"
 LATEST_RESULT_FILENAME = "latest_result.json"
-# 配置日志
+# 配置日志 - 避免重复日志输出
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-handler = logging.StreamHandler()
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-handler.setFormatter(formatter)
-logger.addHandler(handler)
+# 避免重复添加处理器导致重复日志
+if not logger.handlers:
+    logger.setLevel(logging.INFO)
+    handler = logging.StreamHandler()
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    # 关闭日志传播,避免与父级日志处理器重复输出
+    logger.propagate = False
 
 router = APIRouter()
 

+ 42 - 12
app/api/water.py

@@ -16,13 +16,17 @@ from ..services.water_service import (
     get_base_dir
 )
 
-# 配置日志
+# 配置日志 - 避免重复日志输出
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-handler = logging.StreamHandler()
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-handler.setFormatter(formatter)
-logger.addHandler(handler)
+# 避免重复添加处理器导致重复日志
+if not logger.handlers:
+    logger.setLevel(logging.INFO)
+    handler = logging.StreamHandler()
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    # 关闭日志传播,避免与父级日志处理器重复输出
+    logger.propagate = False
 
 router = APIRouter()
 
@@ -110,18 +114,29 @@ async def get_default_histogram(
 
 @router.post("/calculate",
              summary="重新计算土地数据",
-             description="根据输入的土地类型和系数重新计算土地数据")
+             description="根据输入的土地类型和系数重新计算土地数据,支持通过area和level参数控制地图边界,支持插值控制")
 async def recalculate_land_data(
         background_tasks: BackgroundTasks,
         land_type: str = Form(..., description="土地类型,如:'水田'、'旱地'或'水浇地'"),
         param1: float = Form(711, description="土地类型系数的第一个参数"),
         param2: float = Form(0.524, description="土地类型系数的第二个参数"),
         color_map_name: str = Form("绿-黄-红-紫", description="使用的色彩方案"),
-        output_size: int = Form(8, description="输出图片的尺寸")
+        output_size: int = Form(8, description="输出图片的尺寸"),
+        area: Optional[str] = Form(None, description="可选的地区名称,如:'乐昌市',用于动态控制地图边界"),
+        level: Optional[str] = Form(None, description="可选的行政层级,必须为: county | city | province"),
+        enable_interpolation: Optional[bool] = Form(False, description="是否启用空间插值,默认启用"),
+        interpolation_method: Optional[str] = Form("linear", description="插值方法: nearest | linear | cubic"),
+        resolution_factor: Optional[float] = Form(4.0, description="分辨率因子,默认4.0,越大分辨率越高"),
+        save_csv: Optional[bool] = Form(True, description="是否生成CSV文件,默认生成")
 ) -> Dict[str, Any]:
-    """重新计算土地数据并返回结果路径"""
+    """重新计算土地数据并返回结果路径,支持动态边界控制和插值控制"""
     try:
         logger.info(f"重新计算土地数据: {land_type}")
+        if area and level:
+            logger.info(f"使用动态边界: {area} ({level})")
+        else:
+            logger.info("使用默认边界")
+        logger.info(f"插值设置: 启用={enable_interpolation}, 方法={interpolation_method}, 分辨率因子={resolution_factor}")
 
         # 获取默认目录
         base_dir = get_base_dir()
@@ -135,15 +150,21 @@ async def recalculate_land_data(
             land_type: (param1, param2)
         }
 
-        # 调用服务层函数处理土地数据
+        # 调用统一的处理函数,CSV生成作为可选参数
         results = process_land_to_visualization(
             land_type=land_type,
             coefficient_params=coefficient_params,
             color_map_name=color_map_name,
-            output_size=output_size
+            output_size=output_size,
+            area=area,
+            level=level,
+            enable_interpolation=enable_interpolation,
+            interpolation_method=interpolation_method,
+            resolution_factor=resolution_factor,
+            save_csv=save_csv  # 将CSV生成选项传递给处理函数
         )
 
-        if not results or any(r is None for r in results):
+        if not results:
             logger.error(f"重新计算土地数据失败: {land_type}")
             raise HTTPException(
                 status_code=500,
@@ -151,6 +172,15 @@ async def recalculate_land_data(
             )
 
         cleaned_csv, shapefile, tif_file, map_output, hist_output, used_coeff = results
+        
+        # 检查关键文件是否存在(shapefile可以为None,因为使用的是内存处理)
+        if not tif_file or not map_output or not hist_output:
+            logger.error(f"重新计算土地数据失败,关键文件缺失: {land_type}")
+            logger.error(f"GeoTIFF: {tif_file}, 地图: {map_output}, 直方图: {hist_output}")
+            raise HTTPException(
+                status_code=500,
+                detail=f"重新计算土地数据失败: {land_type}"
+            )
 
         # 定义默认路径
         default_map_path = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")

+ 1 - 0
app/models/__init__.py

@@ -21,3 +21,4 @@ from app.models.agricultural import *
 from app.models.cross_section import *
 from app.models.province import *
 from app.models.city import *
+from app.models.land_use import *  # 导入四县三种用电土地利用类型模型

+ 90 - 0
app/models/land_use.py

@@ -0,0 +1,90 @@
+# coding: utf-8
+"""
+四县三种用电土地利用类型模型
+Four County Three Type Land Use Model
+"""
+
+from sqlalchemy import Column, Integer, String, Float, Numeric
+from geoalchemy2.types import Geometry
+from app.models.base import Base
+
+
+class FourCountyLandUse(Base):
+    """
+    四县三种用电土地利用类型表
+    Four County Three Type Land Use Table
+    """
+    __tablename__ = 'four_county_land'
+    __table_args__ = {'comment': '四县三种用电土地利用类型数据'}
+
+    # 主键
+    gid = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')
+    
+    # 对象标识
+    objectid = Column(Integer, name='objectid', comment='对象标识符')
+    
+    # 标识码
+    bsm = Column(String(18), name='bsm', comment='标识码')
+    
+    # 要素代码
+    ysdm = Column(String(10), name='ysdm', comment='要素代码')
+    
+    # 图斑编号
+    tbybh = Column(String(18), name='tbybh', comment='图斑预编号')
+    tbbh = Column(String(8), name='tbbh', comment='图斑编号')
+    
+    # 地类信息
+    dlbm = Column(String(5), name='dlbm', comment='地类编码')
+    dlmc = Column(String(60), name='dlmc', comment='地类名称')
+    
+    # 权属信息
+    qsxz = Column(String(2), name='qsxz', comment='权属性质')
+    qsdwdm = Column(String(19), name='qsdwdm', comment='权属单位代码')
+    qsdwmc = Column(String(60), name='qsdwmc', comment='权属单位名称')
+    
+    # 坐落单位
+    zldwdm = Column(String(19), name='zldwdm', comment='坐落单位代码')
+    zldwmc = Column(String(60), name='zldwmc', comment='坐落单位名称')
+    
+    # 面积信息
+    tbmj = Column(Numeric(19, 11), name='tbmj', comment='图斑面积')
+    kcdlbm = Column(String(5), name='kcdlbm', comment='扣除地类编码')
+    kcxs = Column(Numeric(19, 11), name='kcxs', comment='扣除系数')
+    kcmj = Column(Numeric(19, 11), name='kcmj', comment='扣除面积')
+    tbdlmj = Column(Numeric(19, 11), name='tbdlmj', comment='图斑地类面积')
+    
+    # 耕地信息
+    gdlx = Column(String(2), name='gdlx', comment='耕地类型')
+    gdpdjb = Column(String(2), name='gdpdjb', comment='耕地坡度级别')
+    xzdwkd = Column(Numeric(19, 11), name='xzdwkd', comment='线状地物宽度')
+    
+    # 图斑细化
+    tbxhdm = Column(String(6), name='tbxhdm', comment='图斑细化代码')
+    tbxhmc = Column(String(20), name='tbxhmc', comment='图斑细化名称')
+    
+    # 种植属性
+    zzsxdm = Column(String(6), name='zzsxdm', comment='种植属性代码')
+    zzsxmc = Column(String(20), name='zzsxmc', comment='种植属性名称')
+    
+    # 耕地等别
+    gddb = Column(Integer, name='gddb', comment='耕地等别')
+    
+    # 其他属性
+    frdbs = Column(String(1), name='frdbs', comment='非入等标识')
+    czcsxm = Column(String(4), name='czcsxm', comment='城镇村属性码')
+    sjnf = Column(Integer, name='sjnf', comment='数据年份')
+    mssm = Column(String(2), name='mssm', comment='模式识别码')
+    hdmc = Column(String(100), name='hdmc', comment='河道名称')
+    bz = Column(String(254), name='bz', comment='备注')
+    
+    # 形状信息
+    shape_leng = Column(Numeric(19, 11), name='shape_leng', comment='形状长度')
+    shape_le_1 = Column(Numeric(19, 11), name='shape_le_1', comment='形状长度1')
+    shape_area = Column(Numeric(19, 11), name='shape_area', comment='形状面积')
+    
+    # 几何信息 - 使用EPSG:4526坐标系
+    geom = Column(
+        Geometry('POLYGON', 4526, from_text='ST_GeomFromEWKT', name='geometry'), 
+        index=True, 
+        comment='几何位置信息-多边形'
+    )

+ 10 - 6
app/services/cd_flux_service.py

@@ -14,13 +14,17 @@ from app.database import SessionLocal
 from app.models import FarmlandData, FluxCdOutputData, FluxCdInputData
 from app.utils.mapping_utils import MappingUtils, csv_to_raster_workflow
 
-# 配置日志
+# 配置日志 - 避免重复日志输出
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-handler = logging.StreamHandler()
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-handler.setFormatter(formatter)
-logger.addHandler(handler)
+# 避免重复添加处理器导致重复日志
+if not logger.handlers:
+    logger.setLevel(logging.INFO)
+    handler = logging.StreamHandler()
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    # 关闭日志传播,避免与父级日志处理器重复输出
+    logger.propagate = False
 
 def get_base_dir():
     """获取基础目录路径(与土地数据处理函数一致)"""

+ 331 - 0
app/services/land_use_service.py

@@ -0,0 +1,331 @@
+"""
+四县土地利用类型数据服务
+@description: 提供基于four_county_land表的土地利用类型数据查询和处理功能
+@author: AcidMap Team
+@version: 1.0.0
+"""
+
+import logging
+import pandas as pd
+import numpy as np
+from typing import Dict, Any, List, Optional, Tuple
+from sqlalchemy.orm import Session
+from sqlalchemy import text, and_, func
+from shapely.geometry import Point
+from shapely.wkt import loads
+from pyproj import Transformer
+import functools
+from datetime import datetime, timedelta
+
+from ..database import SessionLocal
+from ..models.land_use import FourCountyLandUse
+
+# 配置日志
+logger = logging.getLogger(__name__)
+
+
+class LandUseService:
+    """
+    四县土地利用类型数据服务类
+    
+    @description: 提供基于four_county_land表的土地利用类型数据查询和处理功能,包含性能优化
+    """
+    
+    def __init__(self):
+        """
+        初始化土地利用服务
+        """
+        self.logger = logging.getLogger(__name__)
+        self._cache = {}  # 简单的内存缓存
+        self._cache_timeout = timedelta(minutes=30)  # 缓存30分钟
+        
+    def get_land_use_by_type(self, land_type: str) -> Optional[List[Dict[str, Any]]]:
+        """
+        根据土地利用类型查询数据
+        
+        @param land_type: 土地利用类型,如:'水田'、'旱地'、'水浇地'
+        @returns: 土地利用数据列表,包含几何中心点坐标信息
+        """
+        try:
+            with SessionLocal() as db:
+                # 查询指定类型的土地利用数据
+                query = db.query(FourCountyLandUse).filter(
+                    FourCountyLandUse.dlmc == land_type
+                )
+                
+                land_records = query.all()
+                
+                if not land_records:
+                    self.logger.warning(f"未找到类型为 '{land_type}' 的土地利用数据")
+                    return None
+                
+                self.logger.info(f"查询到 {len(land_records)} 条 '{land_type}' 类型的土地利用数据")
+                
+                # 转换数据为列表
+                result = []
+                for record in land_records:
+                    # 从PostGIS几何数据中提取中心点坐标
+                    center_info = self._extract_geometry_center(db, record.gid)
+                    
+                    land_data = {
+                        'gid': record.gid,
+                        'objectid': record.objectid,
+                        'dlmc': record.dlmc,  # 地类名称
+                        'dlbm': record.dlbm,  # 地类编码
+                        'tbmj': float(record.tbmj) if record.tbmj else None,  # 图斑面积
+                        'center_lon': center_info['longitude'] if center_info else None,
+                        'center_lat': center_info['latitude'] if center_info else None,
+                        'shape_area': float(record.shape_area) if record.shape_area else None,
+                        # 其他可能需要的字段
+                        'qsdwmc': record.qsdwmc,  # 权属单位名称
+                        'zldwmc': record.zldwmc,  # 坐落单位名称
+                    }
+                    result.append(land_data)
+                
+                return result
+                
+        except Exception as e:
+            self.logger.error(f"查询土地利用数据失败: {str(e)}")
+            return None
+    
+    def _extract_geometry_center(self, db: Session, gid: int) -> Optional[Dict[str, float]]:
+        """
+        提取几何图形的中心点坐标
+        
+        @param db: 数据库会话
+        @param gid: 图形记录ID
+        @returns: 包含经纬度的字典
+        """
+        try:
+            # 使用PostGIS函数计算几何中心并转换为WGS84坐标系
+            sql = text("""
+                SELECT 
+                    ST_X(ST_Transform(ST_Centroid(geom), 4326)) as longitude,
+                    ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as latitude
+                FROM four_county_land 
+                WHERE gid = :gid
+            """)
+            
+            result = db.execute(sql, {'gid': gid}).fetchone()
+            
+            if result:
+                return {
+                    'longitude': float(result.longitude),
+                    'latitude': float(result.latitude)
+                }
+            else:
+                self.logger.warning(f"无法获取GID {gid} 的几何中心点")
+                return None
+                
+        except Exception as e:
+            self.logger.error(f"提取几何中心点失败 (GID: {gid}): {str(e)}")
+            return None
+    
+    def _is_cache_valid(self, cache_key: str) -> bool:
+        """检查缓存是否有效"""
+        if cache_key not in self._cache:
+            return False
+        cache_time = self._cache[cache_key].get('timestamp')
+        if not cache_time:
+            return False
+        return datetime.now() - cache_time < self._cache_timeout
+    
+    def _get_from_cache(self, cache_key: str):
+        """从缓存获取数据"""
+        if self._is_cache_valid(cache_key):
+            return self._cache[cache_key]['data']
+        return None
+    
+    def _set_cache(self, cache_key: str, data):
+        """设置缓存数据"""
+        self._cache[cache_key] = {
+            'data': data,
+            'timestamp': datetime.now()
+        }
+    
+    def get_land_centers_optimized(self, land_type: str, limit: Optional[int] = None, 
+                                  offset: Optional[int] = None) -> Optional[pd.DataFrame]:
+        """
+        高性能批量获取土地中心点数据
+        
+        @param land_type: 土地利用类型
+        @param limit: 限制返回数量,None表示返回所有
+        @param offset: 偏移量,用于分页
+        @returns: 包含中心点坐标的DataFrame
+        """
+        try:
+            # 构建缓存键
+            cache_key = f"centers_optimized_{land_type}_{limit}_{offset}"
+            
+            # 尝试从缓存获取
+            cached_data = self._get_from_cache(cache_key)
+            if cached_data is not None:
+                self.logger.info(f"从缓存获取 {land_type} 中心点数据,数量: {len(cached_data)}")
+                return cached_data
+            
+            with SessionLocal() as db:
+                # 使用一次性SQL查询获取所有必要数据,避免N+1查询问题
+                sql = text("""
+                    SELECT 
+                        gid,
+                        dlmc,
+                        shape_area,
+                        ST_X(ST_Transform(ST_Centroid(geom), 4326)) as center_lon,
+                        ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as center_lat
+                    FROM four_county_land 
+                    WHERE dlmc = :land_type
+                    AND geom IS NOT NULL
+                    ORDER BY gid
+                    %s
+                """ % (f"LIMIT {limit} OFFSET {offset or 0}" if limit else ""))
+                
+                results = db.execute(sql, {'land_type': land_type}).fetchall()
+                
+                if not results:
+                    self.logger.warning(f"未找到 '{land_type}' 类型的土地利用数据")
+                    return None
+                
+                # 直接构建DataFrame,避免中间转换
+                data = {
+                    'gid': [r.gid for r in results],
+                    'center_lon': [float(r.center_lon) for r in results],
+                    'center_lat': [float(r.center_lat) for r in results],
+                    'dlmc': [r.dlmc for r in results],
+                    'shape_area': [float(r.shape_area) if r.shape_area else None for r in results]
+                }
+                
+                df = pd.DataFrame(data)
+                
+                # 缓存结果
+                self._set_cache(cache_key, df)
+                
+                self.logger.info(f"高性能查询获取 {len(df)} 个 '{land_type}' 中心点")
+                return df
+                
+        except Exception as e:
+            self.logger.error(f"高性能获取土地中心点数据失败: {str(e)}")
+            return None
+    
+    def get_land_centers_for_processing(self, land_type: str) -> Optional[pd.DataFrame]:
+        """
+        获取用于处理的土地中心点数据(使用优化版本)
+        
+        @param land_type: 土地利用类型
+        @returns: 包含中心点坐标的DataFrame
+        """
+        # 使用优化版本的方法
+        return self.get_land_centers_optimized(land_type)
+    
+    def get_available_land_types(self) -> List[str]:
+        """
+        获取数据库中可用的土地利用类型(带缓存)
+        
+        @returns: 土地利用类型列表
+        """
+        cache_key = "available_land_types"
+        
+        # 尝试从缓存获取
+        cached_data = self._get_from_cache(cache_key)
+        if cached_data is not None:
+            return cached_data
+        
+        try:
+            with SessionLocal() as db:
+                # 查询所有不同的土地利用类型
+                query = db.query(FourCountyLandUse.dlmc).distinct()
+                results = query.all()
+                
+                land_types = [result.dlmc for result in results if result.dlmc]
+                land_types.sort()  # 排序便于查看
+                
+                # 缓存结果
+                self._set_cache(cache_key, land_types)
+                
+                self.logger.info(f"可用的土地利用类型: {land_types}")
+                return land_types
+                
+        except Exception as e:
+            self.logger.error(f"获取土地利用类型失败: {str(e)}")
+            return []
+    
+    def get_land_statistics_summary(self) -> Dict[str, Any]:
+        """
+        获取土地利用数据统计摘要(带缓存)
+        
+        @returns: 统计摘要信息
+        """
+        cache_key = "land_statistics_summary"
+        
+        # 尝试从缓存获取
+        cached_data = self._get_from_cache(cache_key)
+        if cached_data is not None:
+            return cached_data
+        
+        try:
+            with SessionLocal() as db:
+                # 统计各类型土地的数量
+                sql = text("""
+                    SELECT 
+                        dlmc,
+                        COUNT(*) as count,
+                        SUM(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE 0 END) as total_area,
+                        AVG(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE NULL END) as avg_area
+                    FROM four_county_land 
+                    WHERE dlmc IS NOT NULL
+                    GROUP BY dlmc
+                    ORDER BY count DESC
+                """)
+                
+                results = db.execute(sql).fetchall()
+                
+                summary = {
+                    'total_records': 0,
+                    'land_types': [],
+                    'statistics_by_type': {}
+                }
+                
+                for result in results:
+                    land_type = result.dlmc
+                    count = result.count
+                    total_area = float(result.total_area) if result.total_area else 0
+                    avg_area = float(result.avg_area) if result.avg_area else 0
+                    
+                    summary['total_records'] += count
+                    summary['land_types'].append(land_type)
+                    summary['statistics_by_type'][land_type] = {
+                        'count': count,
+                        'total_area': total_area,
+                        'average_area': avg_area
+                    }
+                
+                # 缓存结果
+                self._set_cache(cache_key, summary)
+                
+                self.logger.info(f"统计摘要: 总记录数 {summary['total_records']}, 土地类型数 {len(summary['land_types'])}")
+                return summary
+                
+        except Exception as e:
+            self.logger.error(f"获取统计摘要失败: {str(e)}")
+            return {
+                'total_records': 0,
+                'land_types': [],
+                'statistics_by_type': {}
+            }
+    
+    def clear_cache(self):
+        """清空缓存"""
+        self._cache.clear()
+        self.logger.info("缓存已清空")
+    
+    def get_cache_info(self) -> Dict[str, Any]:
+        """获取缓存信息"""
+        cache_info = {
+            'cache_size': len(self._cache),
+            'cache_keys': list(self._cache.keys()),
+            'cache_timeout_minutes': self._cache_timeout.total_seconds() / 60
+        }
+        return cache_info
+
+
+# 全局服务实例
+land_use_service = LandUseService()

+ 230 - 88
app/services/water_service.py

@@ -12,15 +12,17 @@ import shutil
 import sys
 
 # 导入MappingUtils
-from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow
+from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow, dataframe_to_raster_workflow
+# 导入土地利用服务
+from .land_use_service import land_use_service
+# 导入边界服务
+from .admin_boundary_service import get_boundary_gdf_by_name
+from ..database import SessionLocal
+import geopandas as gpd
+import tempfile
 
 # 配置日志
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-handler = logging.StreamHandler()
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-handler.setFormatter(formatter)
-logger.addHandler(handler)
 
 # 设置全局字体
 import matplotlib.pyplot as plt
@@ -61,30 +63,58 @@ def get_base_dir():
         return os.path.dirname(os.path.abspath(__file__))
 
 
+def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
+    """
+    直接从数据库获取边界数据作为GeoDataFrame
+    
+    @description: 与其他模块保持一致的边界获取方法
+    @param area: 地区名称
+    @param level: 行政层级(county | city | province)
+    @returns: 边界GeoDataFrame或None
+    """
+    try:
+        with SessionLocal() as db:
+            norm_area = area.strip()
+            boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
+            if boundary_gdf is not None:
+                logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
+            return boundary_gdf
+                
+    except Exception as e:
+        logger.warning(f"从数据库获取边界数据失败: {str(e)}")
+        
+    return None
+
+
+
+
+
 # 土地数据处理函数
-def process_land_data(land_type, coefficient_params=None):
+def process_land_data(land_type, coefficient_params=None, save_csv=True):
     """处理土地类型数据并生成清洗后的简化数据"""
     base_dir = get_base_dir()
-    shp_file = os.path.join(base_dir, "..", "static", "water", "Raster", "四县三种用电.shp")
     xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
 
     logger.info(f"处理土地类型: {land_type}")
-    logger.info(f"SHP文件: {shp_file}")
     logger.info(f"Excel文件: {xls_file}")
 
-    # 读取和处理SHP数据
-    gdf_shp = gpd.read_file(shp_file)
-    gdf_shp = gdf_shp[gdf_shp['DLMC'] == land_type]
-
-    if gdf_shp.empty:
-        logger.warning(f"没有找到 '{land_type}' 类型的要素")
-        return None, None
+    # 从数据库获取土地利用数据,替代原来的SHP文件读取
+    logger.info(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
+    land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
+    
+    if land_centers_df is None or land_centers_df.empty:
+        logger.warning(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
+        return None, None, None
 
-    # 坐标系转换器
-    transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True)
+    logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
 
     # 读取Excel采样点数据
+    if not os.path.exists(xls_file):
+        logger.error(f"采样点Excel文件不存在: {xls_file}")
+        return None, None, None
+        
     df_xls = pd.read_excel(xls_file)
+    logger.info(f"读取到 {len(df_xls)} 个采样点数据")
 
     # 设置土地类型系数
     default_params = {
@@ -98,14 +128,16 @@ def process_land_data(land_type, coefficient_params=None):
     Num = param1 * param2
     logger.info(f"系数: {param1} * {param2} = {Num}")
 
-    # 处理每个面要素
+    # 处理每个面要素,使用数据库中的中心点坐标
     cd_values = []
     centers = []
-    for index, row in gdf_shp.iterrows():
-        center_original = row['geometry'].centroid
-        center_lon, center_lat = transformer.transform(center_original.x, center_original.y)
+    
+    for index, row in land_centers_df.iterrows():
+        center_lon = row['center_lon']
+        center_lat = row['center_lat']
         centers.append((center_lon, center_lat))
 
+        # 计算到所有采样点的距离
         distances = df_xls.apply(
             lambda x: Point(center_lon, center_lat).distance(Point(x['经度'], x['纬度'])),
             axis=1
@@ -113,15 +145,10 @@ def process_land_data(land_type, coefficient_params=None):
         min_idx = distances.idxmin()
         nearest = df_xls.loc[min_idx]
 
-        # 计算Cd含量值
+        # 计算Cd含量值
         cd_value = nearest['Cd (ug/L)'] * Num
         cd_values.append(cd_value)
 
-    # 创建输出目录
-    data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
-    os.makedirs(data_dir, exist_ok=True)
-    logger.info(f"数据目录: {data_dir}")
-
     # 创建简化数据DataFrame
     simplified_data = pd.DataFrame({
         'lon': [c[0] for c in centers],
@@ -152,17 +179,37 @@ def process_land_data(land_type, coefficient_params=None):
         ]
     logger.info(f"清洗后数据点数: {len(cleaned_data)}")
 
-    # 保存清洗后的简化数据CSV
-    cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
-    cleaned_data.to_csv(cleaned_csv, index=False, encoding='utf-8-sig')
-    logger.info(f"保存CSV: {cleaned_csv}")
+    # 可选:保存CSV文件用于兼容性和调试
+    cleaned_csv_path = None
+    if save_csv:
+        # 创建输出目录
+        data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
+        os.makedirs(data_dir, exist_ok=True)
+        logger.info(f"数据目录: {data_dir}")
+        
+        cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
+        cleaned_data.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
+        logger.info(f"保存CSV: {cleaned_csv_path}")
+    else:
+        logger.info("跳过CSV文件生成(内存处理优化)")
 
-    return cleaned_csv, Num
+    return cleaned_data, Num, cleaned_csv_path  # 返回DataFrame, 系数, 和CSV路径(如果生成了)
 
 
-# 可视化函数(使用MappingUtils)
+# 可视化函数(完全使用统一的MappingUtils接口
 def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8):
-    """生成栅格地图可视化"""
+    """
+    生成栅格地图可视化
+    
+    @description: 使用统一的MappingUtils接口生成栅格地图
+    @param shp_path: 边界shapefile路径
+    @param tif_path: 栅格数据文件路径  
+    @param color_map_name: 色彩方案名称(中文)
+    @param title_name: 地图标题
+    @param output_path: 输出图片路径
+    @param output_size: 图片尺寸
+    @returns: 输出图片路径或None(失败时)
+    """
     try:
         logger.info(f"生成栅格地图: {title_name}")
 
@@ -175,11 +222,11 @@ def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path,
         # 转换颜色方案名称
         colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
 
-        # 调用MappingUtils的高级绘图功能
-        mapper.create_raster_map(
+        # 直接调用统一的绘图接口,无需额外包装
+        generated_path = mapper.create_raster_map(
             shp_path=shp_path,
             tif_path=tif_path,
-            output_path=os.path.splitext(output_path)[0],  # 去掉扩展名
+            output_path=os.path.splitext(output_path)[0],  # 去掉扩展名,MappingUtils会自动添加.jpg
             colormap=colormap_key,
             title=title_name,
             output_size=output_size,
@@ -189,13 +236,13 @@ def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path,
             interpolation_method='linear'
         )
 
-        # MappingUtils会自动添加.jpg扩展名,重命名文件以匹配原始输出路径
-        generated_path = os.path.splitext(output_path)[0] + ".jpg"
+        # 如果生成的路径与期望路径不同,则重命名
         if generated_path != output_path and os.path.exists(generated_path):
             shutil.move(generated_path, output_path)
             logger.info(f"重命名图片: {generated_path} -> {output_path}")
-
-        return output_path
+            return output_path
+        
+        return generated_path
 
     except Exception as e:
         logger.error(f"栅格地图生成失败: {str(e)}")
@@ -205,15 +252,26 @@ def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path,
 def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
                        xlabel='Cd(ug/L)', ylabel='frequency density',
                        title='Irrigation water input flux'):
-    """生成TIFF文件的直方图"""
+    """
+    生成TIFF文件的直方图
+    
+    @description: 使用统一的MappingUtils接口生成直方图
+    @param file_path: TIFF文件路径
+    @param output_path: 输出图片路径
+    @param figsize: 图片尺寸元组
+    @param xlabel: X轴标签
+    @param ylabel: Y轴标签  
+    @param title: 图表标题
+    @returns: 输出图片路径或None(失败时)
+    """
     try:
         logger.info(f"生成直方图: {file_path}")
 
         # 创建MappingUtils实例
         mapper = MappingUtils()
 
-        # 调用MappingUtils的高级直方图功能
-        mapper.create_histogram(
+        # 直接调用统一的绘图接口,无需额外包装
+        generated_path = mapper.create_histogram(
             file_path=file_path,
             save_path=output_path,
             figsize=figsize,
@@ -224,81 +282,165 @@ def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
             dpi=300
         )
 
-        return output_path
+        return generated_path
 
     except Exception as e:
         logger.error(f"直方图生成失败: {str(e)}")
         return None
 
 
-# 完整的处理流程
 def process_land_to_visualization(land_type, coefficient_params=None,
                                   color_map_name="绿-黄-红-紫",
-                                  output_size=8):
+                                  output_size=8,
+                                  area: Optional[str] = None,
+                                  level: Optional[str] = None,
+                                  enable_interpolation: Optional[bool] = True,
+                                  interpolation_method: Optional[str] = "linear",
+                                  resolution_factor: Optional[float] = 4.0,
+                                  save_csv: Optional[bool] = True):
     """
-    完整的土地数据处理可视化流程:
-    1. 生成清洗后CSV
-    2. 使用csv_to_raster_workflow转换为GeoTIFF
-    3. 生成栅格地图
-    4. 生成直方图
-
-    返回所有生成文件的路径
+    完整的土地数据处理可视化流程(使用统一的MappingUtils接口,支持动态边界和插值控制)
+    
+    @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
+                  支持通过area和level参数动态控制地图边界,支持插值控制参数
+    
+    工作流程:
+    1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量
+    2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
+    3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
+    4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
+    5. 生成直方图 - 直接调用MappingUtils.create_histogram()
+
+    @param land_type: 土地类型(水田、旱地、水浇地)
+    @param coefficient_params: 可选的系数参数字典
+    @param color_map_name: 色彩方案名称(中文)
+    @param output_size: 输出图片尺寸
+    @param area: 可选的地区名称,如"乐昌市",用于动态控制地图边界
+    @param level: 可选的行政层级,county | city | province
+    @param enable_interpolation: 是否启用空间插值,默认True
+    @param interpolation_method: 插值方法,nearest | linear | cubic,默认linear
+    @param resolution_factor: 分辨率因子,默认4.0,越大分辨率越高
+    @param save_csv: 是否生成CSV文件,默认True
+    @returns: 包含所有生成文件路径的元组
     """
     base_dir = get_base_dir()
     logger.info(f"开始处理: {land_type}")
 
-    # 1. 生成清洗后的CSV
-    cleaned_csv, used_coeff = process_land_data(land_type, coefficient_params)
-    if not cleaned_csv:
+    # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
+    cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
+        land_type, 
+        coefficient_params, 
+        save_csv=save_csv  # 根据参数决定是否生成CSV文件
+    )
+    if cleaned_data is None:
         logger.error(f"处理土地数据失败: {land_type}")
         return None, None, None, None, None, None
 
-    # 2. 使用csv_to_raster_workflow转换为GeoTIFF
+    # 2. 获取动态边界数据(提前获取用于栅格转换)
+    boundary_gdf = None
+    if area and level:
+        logger.info(f"尝试获取动态边界: area='{area}', level='{level}'")
+        boundary_gdf = get_boundary_gdf_from_database(area, level)
+        if boundary_gdf is not None:
+            logger.info(f"成功获取动态边界用于栅格转换: {area} ({level}) - 包含 {len(boundary_gdf)} 个几何体")
+            logger.info(f"边界数据范围: {boundary_gdf.bounds.to_dict('records')[0] if len(boundary_gdf) > 0 else 'N/A'}")
+        else:
+            logger.warning(f"未能获取动态边界,栅格转换将使用完整范围: {area} ({level})")
+            logger.warning("可能的原因: 1) 数据库中没有该地区数据 2) 地区名称拼写错误 3) level参数不正确")
+    else:
+        logger.info("未指定area和level参数,尝试获取默认边界(乐昌市)")
+        boundary_gdf = get_boundary_gdf_from_database("乐昌市", "county")
+        if boundary_gdf is not None:
+            logger.info(f"成功获取默认边界(乐昌市)用于栅格转换 - 包含 {len(boundary_gdf)} 个几何体")
+        else:
+            logger.info("未能获取默认边界,栅格转换将使用数据点范围")
+            
+    # 3. 使用dataframe_to_raster_workflow转换为GeoTIFF(内存处理,支持动态边界)
     raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
     template_tif = os.path.join(raster_dir, "meanTemp.tif")
-    output_dir = os.path.dirname(cleaned_csv)  # 使用CSV所在目录作为输出目录
+    output_dir = raster_dir  # 直接使用raster目录作为输出目录
 
-    # 调用csv_to_raster_workflow
-    workflow_result = csv_to_raster_workflow(
-        csv_file=cleaned_csv,
+    # 调用dataframe_to_raster_workflow,支持动态边界和插值控制(内存处理优化)
+    workflow_result = dataframe_to_raster_workflow(
+        df=cleaned_data,  # 直接使用DataFrame
         template_tif=template_tif,
         output_dir=output_dir,
-        boundary_shp=None,
-        resolution_factor=4.0,
-        interpolation_method='linear',
+        boundary_gdf=boundary_gdf,  # 直接使用GeoDataFrame
+        resolution_factor=resolution_factor,  # 可配置的分辨率因子
+        interpolation_method=interpolation_method,  # 可配置的插值方法
         field_name='Prediction',
-        lon_col=0,  # CSV中经度列索引
-        lat_col=1,  # CSV中纬度列索引
-        value_col=2,  # CSV中数值列索引
-        enable_interpolation=True
+        lon_col=0,  # DataFrame中经度列索引
+        lat_col=1,  # DataFrame中纬度列索引
+        value_col=2,  # DataFrame中数值列索引
+        enable_interpolation=enable_interpolation  # 可配置的插值开关
     )
 
     # 获取输出的栅格文件路径
     output_tif = workflow_result['raster']
     logger.info(f"生成栅格文件: {output_tif}")
 
-    # 3. 生成栅格地图
+    # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
     map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
-    county_shp = os.path.join(raster_dir, "Lechang.shp")  # 县界SHP
-
-    mapping_raster(
-        shp_path=county_shp,
-        tif_path=output_tif,
-        color_map_name=color_map_name,
-        title_name=f"Irrigation water input flux",
-        output_path=map_output,
-        output_size=output_size
-
-    )
+    
+    try:
+        # 创建MappingUtils实例并直接调用
+        mapper = MappingUtils()
+        
+        # 转换颜色方案名称
+        colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
+        
+        # 统一使用boundary_gdf,不再依赖Shapefile
+        map_result = mapper.create_raster_map(
+            shp_path=None,  # 不使用Shapefile
+            tif_path=output_tif,
+            output_path=os.path.splitext(map_output)[0],  # 去掉扩展名
+            colormap=colormap_key,
+            title="Irrigation water input flux",
+            output_size=output_size,
+            dpi=300,
+            enable_interpolation=enable_interpolation,  # 可配置的插值开关
+            interpolation_method=interpolation_method,  # 可配置的插值方法
+            boundary_gdf=boundary_gdf  # 使用从数据库获取的边界数据,如果为None则使用数据点范围
+        )
+        
+        # 如果生成的路径与期望不同,则重命名
+        if map_result != map_output and os.path.exists(map_result):
+            shutil.move(map_result, map_output)
+            logger.info(f"栅格地图已生成: {map_output}")
+            
+    except Exception as e:
+        logger.error(f"栅格地图生成失败: {str(e)}")
 
-    # 4. 生成直方图
+    # 5. 生成直方图(直接使用统一的MappingUtils接口)
     hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
-    plot_tif_histogram(
-        file_path=output_tif,
-        output_path=hist_output,
-        title=f"Irrigation water input flux"
-    )
+    
+    try:
+        # 直接调用统一绘图接口生成直方图
+        hist_result = mapper.create_histogram(
+            file_path=output_tif,
+            save_path=hist_output,
+            figsize=(8, 8),
+            xlabel='Cd(ug/L)',
+            ylabel='frequency density',
+            title='Irrigation water input flux',
+            bins=100,
+            dpi=300
+        )
+        
+        if hist_result:
+            logger.info(f"直方图已生成: {hist_output}")
+            
+    except Exception as e:
+        logger.error(f"直方图生成失败: {str(e)}")
 
+    # 使用实际的CSV路径(如果生成了)或生成兼容性路径
+    if cleaned_csv_path:
+        cleaned_csv = cleaned_csv_path
+    else:
+        # 为了兼容性,生成路径(即使文件不存在)
+        data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
+        cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
+    
     return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
 
 

+ 0 - 1
app/static/water/Raster/lechang.cpg

@@ -1 +0,0 @@
-UTF-8

BIN
app/static/water/Raster/lechang.dbf


+ 0 - 1
app/static/water/Raster/lechang.prj

@@ -1 +0,0 @@
-GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]

BIN
app/static/water/Raster/lechang.shp


BIN
app/static/water/Raster/lechang.shx


BIN
app/static/water/Raster/lechang.zip


+ 103 - 0
app/utils/logger_config.py

@@ -0,0 +1,103 @@
+"""
+统一的日志配置工具
+用于在整个项目中统一管理日志配置,避免重复日志输出
+"""
+
+import logging
+from typing import Optional
+
+
+def setup_logger(
+    name: str,
+    level: int = logging.INFO,
+    propagate: bool = False,
+    formatter_string: Optional[str] = None
+) -> logging.Logger:
+    """
+    设置并配置logger,避免重复日志输出
+    
+    @param name: logger名称,通常使用 __name__
+    @param level: 日志级别,默认为 INFO
+    @param propagate: 是否向父级logger传播,默认为False,避免重复输出
+    @param formatter_string: 自定义格式字符串,如果不提供则使用默认格式
+    @return: 配置好的logger对象
+    """
+    logger = logging.getLogger(name)
+    
+    # 避免重复添加处理器导致重复日志
+    if not logger.handlers:
+        logger.setLevel(level)
+        
+        # 创建控制台处理器
+        handler = logging.StreamHandler()
+        
+        # 设置日志格式
+        if formatter_string is None:
+            formatter_string = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+        formatter = logging.Formatter(formatter_string)
+        handler.setFormatter(formatter)
+        
+        # 添加处理器
+        logger.addHandler(handler)
+        
+        # 控制日志传播
+        logger.propagate = propagate
+    
+    return logger
+
+
+def get_api_logger(module_name: str) -> logging.Logger:
+    """
+    为API模块获取标准logger配置
+    
+    @param module_name: 模块名称,通常使用 __name__
+    @return: 配置好的logger对象
+    """
+    return setup_logger(
+        name=module_name,
+        level=logging.INFO,
+        propagate=False  # API层不传播到父级,避免与uvicorn日志冲突
+    )
+
+
+def get_service_logger(module_name: str) -> logging.Logger:
+    """
+    为Service模块获取标准logger配置
+    
+    @param module_name: 模块名称,通常使用 __name__
+    @return: 配置好的logger对象
+    """
+    return setup_logger(
+        name=module_name,
+        level=logging.INFO,
+        propagate=False  # Service层不传播到父级,保持独立
+    )
+
+
+def get_utils_logger(module_name: str) -> logging.Logger:
+    """
+    为工具模块获取标准logger配置
+    
+    @param module_name: 模块名称,通常使用 __name__
+    @return: 配置好的logger对象
+    """
+    return setup_logger(
+        name=module_name,
+        level=logging.INFO,
+        propagate=False  # 工具层不传播到父级,保持独立
+    )
+
+
+# 主应用日志配置函数
+def configure_root_logging(level: int = logging.INFO) -> None:
+    """
+    配置根日志系统,只在应用启动时调用一次
+    
+    @param level: 根日志级别
+    """
+    # 检查是否已经配置过,避免重复配置
+    if not logging.getLogger().handlers:
+        logging.basicConfig(
+            level=level,
+            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+        )

+ 234 - 1
app/utils/mapping_utils.py

@@ -124,6 +124,50 @@ class MappingUtils:
             self.logger.error(f"CSV转Shapefile失败: {str(e)}")
             raise
     
+    def dataframe_to_geodataframe(self, df, lon_col=0, lat_col=1, value_col=2, field_name='Prediction'):
+        """
+        将DataFrame直接转换为GeoDataFrame(内存处理)
+        
+        @param df: pandas DataFrame
+        @param lon_col: 经度列索引或列名,默认第0列
+        @param lat_col: 纬度列索引或列名,默认第1列  
+        @param value_col: 数值列索引或列名,默认第2列
+        @param field_name: 值字段名称
+        @return: GeoDataFrame
+        """
+        try:
+            self.logger.info("开始将DataFrame转换为GeoDataFrame(内存处理)")
+            
+            # 支持列索引或列名
+            if isinstance(lon_col, int):
+                lon = df.iloc[:, lon_col]
+            else:
+                lon = df[lon_col]
+                
+            if isinstance(lat_col, int):
+                lat = df.iloc[:, lat_col]
+            else:
+                lat = df[lat_col]
+                
+            if isinstance(value_col, int):
+                val = df.iloc[:, value_col]
+            else:
+                val = df[value_col]
+            
+            # 创建几何对象
+            geometry = [Point(xy) for xy in zip(lon, lat)]
+            
+            # 创建新的DataFrame,只包含必要的列
+            data = {field_name: val}
+            gdf = gpd.GeoDataFrame(data, geometry=geometry, crs="EPSG:4326")
+            
+            self.logger.info(f"✓ 成功转换DataFrame到GeoDataFrame: {len(gdf)} 个点")
+            return gdf
+            
+        except Exception as e:
+            self.logger.error(f"DataFrame转GeoDataFrame失败: {str(e)}")
+            raise
+    
     def create_boundary_mask(self, raster, transform, gdf):
         """
         创建边界掩膜,只保留边界内的区域
@@ -202,6 +246,148 @@ class MappingUtils:
             self.logger.error(f"插值失败: {str(e)}")
             return raster
     
+    def geodataframe_to_raster(self, gdf, template_tif, output_tif, field, 
+                              resolution_factor=16.0, boundary_gdf=None, interpolation_method='nearest', enable_interpolation=True):
+        """
+        将GeoDataFrame直接转换为栅格数据(内存处理版本)
+        
+        @param gdf: 输入的GeoDataFrame
+        @param template_tif: 用作模板的GeoTIFF文件路径
+        @param output_tif: 输出栅格化后的GeoTIFF文件路径
+        @param field: 用于栅格化的属性字段名
+        @param resolution_factor: 分辨率倍数因子
+        @param boundary_gdf: 边界GeoDataFrame,用于创建掩膜
+        @param interpolation_method: 插值方法 ('nearest', 'linear', 'cubic')
+        @param enable_interpolation: 是否启用空间插值,默认True
+        @return: 输出的GeoTIFF文件路径和统计信息
+        """
+        try:
+            self.logger.info(f"开始处理GeoDataFrame到栅格(内存处理)")
+            interpolation_status = "启用" if enable_interpolation else "禁用"
+            self.logger.info(f"分辨率因子: {resolution_factor}, 插值设置: {interpolation_status} (方法: {interpolation_method})")
+            
+            # 读取模板栅格
+            with rasterio.open(template_tif) as src:
+                template_meta = src.meta.copy()
+                
+                # 根据分辨率因子计算新的尺寸和变换参数
+                if resolution_factor != 1.0:
+                    width = int(src.width * resolution_factor)
+                    height = int(src.height * resolution_factor)
+                    
+                    transform = rasterio.Affine(
+                        src.transform.a / resolution_factor,
+                        src.transform.b,
+                        src.transform.c,
+                        src.transform.d,
+                        src.transform.e / resolution_factor,
+                        src.transform.f
+                    )
+                    self.logger.info(f"分辨率调整: {src.width}x{src.height} -> {width}x{height}")
+                else:
+                    width = src.width
+                    height = src.height
+                    transform = src.transform
+                    self.logger.info(f"保持原始分辨率: {width}x{height}")
+                
+                crs = src.crs
+            
+            # 投影矢量数据
+            if gdf.crs != crs:
+                gdf = gdf.to_crs(crs)
+            
+            # 栅格化
+            shapes = ((geom, value) for geom, value in zip(gdf.geometry, gdf[field]))
+            raster = rasterize(
+                shapes=shapes,
+                out_shape=(height, width),
+                transform=transform,
+                fill=np.nan,
+                dtype='float32'
+            )
+            
+            # 预备掩膜:优先使用行政区边界;若未提供边界,则使用点集凸包限制绘制范围
+            boundary_mask = None
+            if boundary_gdf is not None:
+                self.logger.info("应用边界掩膜: 使用直接提供的GeoDataFrame")
+                # 确保边界GeoDataFrame的CRS与栅格一致
+                if boundary_gdf.crs != crs:
+                    boundary_gdf = boundary_gdf.to_crs(crs)
+                boundary_mask = self.create_boundary_mask(raster, transform, boundary_gdf)
+                raster[~boundary_mask] = np.nan
+            else:
+                try:
+                    # 使用点集凸包作为默认掩膜,避免边界外着色
+                    hull = gdf.unary_union.convex_hull
+                    hull_gdf = gpd.GeoDataFrame(geometry=[hull], crs=crs)
+                    boundary_mask = self.create_boundary_mask(raster, transform, hull_gdf)
+                    raster[~boundary_mask] = np.nan
+                    self.logger.info("已使用点集凸包限制绘制范围")
+                except Exception as hull_err:
+                    self.logger.warning(f"生成点集凸包掩膜失败,可能会出现边界外着色: {str(hull_err)}")
+            
+            # 检查栅格数据状态并决定是否插值
+            nan_count = np.isnan(raster).sum()
+            total_pixels = raster.size
+            self.logger.info(f"栅格数据状态: 总像素数 {total_pixels}, NaN像素数 {nan_count} ({nan_count/total_pixels*100:.1f}%)")
+            
+            # 使用插值方法填充NaN值(如果启用)
+            if enable_interpolation and nan_count > 0:
+                self.logger.info(f"✓ 启用插值: 使用 {interpolation_method} 方法填充 {nan_count} 个NaN像素...")
+                raster = self.interpolate_nan_values(raster, method=interpolation_method)
+                # 关键修正:插值后再次应用掩膜,确保边界外不被填充
+                if boundary_mask is not None:
+                    raster[~boundary_mask] = np.nan
+                final_nan_count = np.isnan(raster).sum()
+                self.logger.info(f"插值完成: 剩余NaN像素数 {final_nan_count}")
+            elif enable_interpolation and nan_count == 0:
+                self.logger.info("✓ 插值已启用,但栅格数据无NaN值,无需插值")
+            elif not enable_interpolation and nan_count > 0:
+                self.logger.info(f"✗ 插值已禁用,保留 {nan_count} 个NaN像素")
+            else:
+                self.logger.info("✓ 栅格数据完整,无需插值")
+            
+            # 创建输出目录
+            os.makedirs(os.path.dirname(output_tif), exist_ok=True)
+            
+            # 更新元数据
+            template_meta.update({
+                "count": 1,
+                "dtype": 'float32',
+                "nodata": np.nan,
+                "width": width,
+                "height": height,
+                "transform": transform
+            })
+            
+            # 保存栅格文件
+            with rasterio.open(output_tif, 'w', **template_meta) as dst:
+                dst.write(raster, 1)
+            
+            # 计算统计信息
+            valid_data = raster[~np.isnan(raster)]
+            stats = None
+            if len(valid_data) > 0:
+                stats = {
+                    'min': float(np.min(valid_data)),
+                    'max': float(np.max(valid_data)),
+                    'mean': float(np.mean(valid_data)),
+                    'std': float(np.std(valid_data)),
+                    'valid_pixels': int(len(valid_data)),
+                    'total_pixels': int(raster.size)
+                }
+                self.logger.info(f"统计信息: 有效像素 {stats['valid_pixels']}/{stats['total_pixels']}")
+                self.logger.info(f"数值范围: {stats['min']:.4f} - {stats['max']:.4f}")
+            else:
+                self.logger.warning("没有有效数据")
+            
+            self.logger.info(f"✓ 成功保存: {output_tif}")
+            return output_tif, stats
+            
+        except Exception as e:
+            self.logger.error(f"GeoDataFrame转栅格失败: {str(e)}")
+            raise
+    
     def vector_to_raster(self, input_shapefile, template_tif, output_tif, field, 
                         resolution_factor=16.0, boundary_shp=None, boundary_gdf=None, interpolation_method='nearest', enable_interpolation=True):
         """
@@ -702,12 +888,59 @@ def get_available_colormaps():
     return COLORMAPS.copy()
 
 
+def dataframe_to_raster_workflow(df, template_tif, output_dir, 
+                                boundary_gdf=None, resolution_factor=16.0,
+                                interpolation_method='nearest', field_name='Prediction',
+                                lon_col=0, lat_col=1, value_col=2, enable_interpolation=False):
+    """
+    DataFrame到栅格转换工作流(内存处理优化版本)
+    
+    @param df: pandas DataFrame
+    @param template_tif: 模板GeoTIFF文件路径
+    @param output_dir: 输出目录
+    @param boundary_gdf: 边界GeoDataFrame(可选)
+    @param resolution_factor: 分辨率因子
+    @param interpolation_method: 插值方法
+    @param field_name: 字段名称
+    @param lon_col: 经度列
+    @param lat_col: 纬度列
+    @param value_col: 数值列
+    @param enable_interpolation: 是否启用空间插值,默认False
+    @return: 输出文件路径字典
+    """
+    mapper = MappingUtils()
+    
+    # 确保输出目录存在
+    os.makedirs(output_dir, exist_ok=True)
+    
+    # 生成文件名(基于时间戳,避免冲突)
+    import time
+    timestamp = str(int(time.time()))
+    raster_path = os.path.join(output_dir, f"memory_raster_{timestamp}.tif")
+    
+    # 1. DataFrame直接转GeoDataFrame(内存处理)
+    gdf = mapper.dataframe_to_geodataframe(df, lon_col, lat_col, value_col, field_name)
+    
+    # 2. GeoDataFrame直接转栅格(内存处理)
+    raster_path, stats = mapper.geodataframe_to_raster(
+        gdf, template_tif, raster_path, field_name,
+        resolution_factor, boundary_gdf, interpolation_method, enable_interpolation
+    )
+    
+    return {
+        'shapefile': None,  # 内存处理不生成shapefile
+        'raster': raster_path,
+        'statistics': stats,
+        'geodataframe': gdf  # 返回GeoDataFrame供调试使用
+    }
+
+
 def csv_to_raster_workflow(csv_file, template_tif, output_dir, 
                           boundary_shp=None, boundary_gdf=None, resolution_factor=16.0,
                           interpolation_method='nearest', field_name='Prediction',
                           lon_col=0, lat_col=1, value_col=2, enable_interpolation=False):
     """
-    完整的CSV到栅格转换工作流
+    完整的CSV到栅格转换工作流(原版本,保持兼容性)
     
     @param csv_file: CSV文件路径
     @param template_tif: 模板GeoTIFF文件路径

+ 101 - 0
docs/logging_best_practices.md

@@ -0,0 +1,101 @@
+# 日志配置最佳实践
+
+## 问题背景
+
+在项目中出现了重复日志的问题,主要原因是:
+
+1. **多重日志处理器配置**:多个模块都手动配置了 `StreamHandler`
+2. **日志传播机制**:子 logger 的日志会传播到父 logger,如果都有处理器就会重复输出
+3. **缺乏统一配置**:各个模块独立配置日志,没有统一的配置模式
+
+## 解决方案
+
+### 1. 统一日志配置工具
+
+创建了 `app/utils/logger_config.py` 提供统一的日志配置功能:
+
+```python
+from app.utils.logger_config import get_api_logger
+
+# 在API模块中使用
+logger = get_api_logger(__name__)
+logger.info("这是API日志")
+```
+
+### 2. 避免重复配置的关键点
+
+```python
+# ✅ 正确的配置方式
+logger = logging.getLogger(__name__)
+if not logger.handlers:  # 检查是否已配置
+    logger.setLevel(logging.INFO)
+    handler = logging.StreamHandler()
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    logger.propagate = False  # 关闭传播,避免重复输出
+
+# ❌ 错误的配置方式(会导致重复日志)
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.INFO)
+handler = logging.StreamHandler()  # 每次都添加新的处理器
+logger.addHandler(handler)  # 重复添加处理器
+```
+
+### 3. 不同模块的使用方式
+
+#### API模块
+```python
+from app.utils.logger_config import get_api_logger
+
+logger = get_api_logger(__name__)
+logger.info("API操作日志")
+```
+
+#### Service模块
+```python
+from app.utils.logger_config import get_service_logger
+
+logger = get_service_logger(__name__)
+logger.info("业务逻辑日志")
+```
+
+#### 工具模块
+```python
+from app.utils.logger_config import get_utils_logger
+
+logger = get_utils_logger(__name__)
+logger.info("工具函数日志")
+```
+
+## 修复的文件
+
+已修复以下文件中的重复日志配置:
+
+1. `app/api/water.py` - 添加了处理器检查和传播控制
+2. `app/api/cd_flux.py` - 添加了处理器检查和传播控制  
+3. `app/services/cd_flux_service.py` - 添加了处理器检查和传播控制
+4. `app/utils/mapping_utils.py` - 已经有正确的配置
+
+## 测试方法
+
+运行演示脚本来验证修复效果:
+
+```bash
+conda activate GeoSys
+python scripts/demos/logging_demo.py
+```
+
+## 最佳实践总结
+
+1. **使用统一配置工具**:优先使用 `app/utils/logger_config.py` 中的函数
+2. **检查处理器**:配置前检查 `if not logger.handlers:`
+3. **控制传播**:设置 `logger.propagate = False` 避免重复输出
+4. **避免重复配置**:不要在多个地方配置同一个logger
+5. **根日志配置**:只在应用启动时配置一次根日志系统
+
+## 注意事项
+
+- 在类中使用日志时,建议在 `__init__` 方法中配置,避免每次调用都重新配置
+- 对于 FastAPI 应用,uvicorn 已经有自己的日志配置,避免与其冲突
+- 测试环境和生产环境可能需要不同的日志级别和输出方式

+ 74 - 0
migrations/versions/29a56094509c_add_four_county_land_use_table.py

@@ -0,0 +1,74 @@
+"""add_four_county_land_use_table
+
+Revision ID: 29a56094509c
+Revises: d89bb52ae481
+Create Date: 2025-08-17 10:01:27.121198
+
+"""
+from alembic import op
+import sqlalchemy as sa
+import geoalchemy2
+
+
+# revision identifiers, used by Alembic.
+revision = '29a56094509c'
+down_revision = 'd89bb52ae481'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """升级数据库到当前版本 - 创建四县三种用电土地利用类型表"""
+    # 创建四县三种用电土地利用类型表
+    op.create_table(
+        'four_county_land',
+        sa.Column('gid', sa.Integer(), autoincrement=True, nullable=False, comment='主键ID'),
+        sa.Column('objectid', sa.Integer(), nullable=True, comment='对象标识符'),
+        sa.Column('bsm', sa.String(length=18), nullable=True, comment='标识码'),
+        sa.Column('ysdm', sa.String(length=10), nullable=True, comment='要素代码'),
+        sa.Column('tbybh', sa.String(length=18), nullable=True, comment='图斑预编号'),
+        sa.Column('tbbh', sa.String(length=8), nullable=True, comment='图斑编号'),
+        sa.Column('dlbm', sa.String(length=5), nullable=True, comment='地类编码'),
+        sa.Column('dlmc', sa.String(length=60), nullable=True, comment='地类名称'),
+        sa.Column('qsxz', sa.String(length=2), nullable=True, comment='权属性质'),
+        sa.Column('qsdwdm', sa.String(length=19), nullable=True, comment='权属单位代码'),
+        sa.Column('qsdwmc', sa.String(length=60), nullable=True, comment='权属单位名称'),
+        sa.Column('zldwdm', sa.String(length=19), nullable=True, comment='坐落单位代码'),
+        sa.Column('zldwmc', sa.String(length=60), nullable=True, comment='坐落单位名称'),
+        sa.Column('tbmj', sa.Numeric(precision=19, scale=11), nullable=True, comment='图斑面积'),
+        sa.Column('kcdlbm', sa.String(length=5), nullable=True, comment='扣除地类编码'),
+        sa.Column('kcxs', sa.Numeric(precision=19, scale=11), nullable=True, comment='扣除系数'),
+        sa.Column('kcmj', sa.Numeric(precision=19, scale=11), nullable=True, comment='扣除面积'),
+        sa.Column('tbdlmj', sa.Numeric(precision=19, scale=11), nullable=True, comment='图斑地类面积'),
+        sa.Column('gdlx', sa.String(length=2), nullable=True, comment='耕地类型'),
+        sa.Column('gdpdjb', sa.String(length=2), nullable=True, comment='耕地坡度级别'),
+        sa.Column('xzdwkd', sa.Numeric(precision=19, scale=11), nullable=True, comment='线状地物宽度'),
+        sa.Column('tbxhdm', sa.String(length=6), nullable=True, comment='图斑细化代码'),
+        sa.Column('tbxhmc', sa.String(length=20), nullable=True, comment='图斑细化名称'),
+        sa.Column('zzsxdm', sa.String(length=6), nullable=True, comment='种植属性代码'),
+        sa.Column('zzsxmc', sa.String(length=20), nullable=True, comment='种植属性名称'),
+        sa.Column('gddb', sa.Integer(), nullable=True, comment='耕地等别'),
+        sa.Column('frdbs', sa.String(length=1), nullable=True, comment='非入等标识'),
+        sa.Column('czcsxm', sa.String(length=4), nullable=True, comment='城镇村属性码'),
+        sa.Column('sjnf', sa.Integer(), nullable=True, comment='数据年份'),
+        sa.Column('mssm', sa.String(length=2), nullable=True, comment='模式识别码'),
+        sa.Column('hdmc', sa.String(length=100), nullable=True, comment='河道名称'),
+        sa.Column('bz', sa.String(length=254), nullable=True, comment='备注'),
+        sa.Column('shape_leng', sa.Numeric(precision=19, scale=11), nullable=True, comment='形状长度'),
+        sa.Column('shape_le_1', sa.Numeric(precision=19, scale=11), nullable=True, comment='形状长度1'),
+        sa.Column('shape_area', sa.Numeric(precision=19, scale=11), nullable=True, comment='形状面积'),
+        sa.Column('geom', geoalchemy2.types.Geometry(geometry_type='POLYGON', srid=4526, from_text='ST_GeomFromEWKT', name='geometry'), nullable=True, comment='几何位置信息-多边形'),
+        sa.PrimaryKeyConstraint('gid'),
+        comment='四县三种用电土地利用类型数据'
+    )
+    
+    # 创建空间索引
+    op.execute("CREATE INDEX IF NOT EXISTS idx_four_county_land_geom ON four_county_land USING gist (geom)")
+
+
+def downgrade():
+    """将数据库降级到上一版本 - 删除四县三种用电土地利用类型表"""
+    # 删除空间索引
+    op.drop_index('idx_four_county_land_geom', table_name='four_county_land')
+    # 删除表
+    op.drop_table('four_county_land')