Quellcode durchsuchen

重构土地利用数据处理逻辑,替换原有SHP文件读取为数据库查询,优化数据获取和处理流程,更新相关文件路径和输出格式,提升代码可维护性和性能。

drggboy vor 5 Tagen
Ursprung
Commit
90b6150272

+ 31 - 33
Water/Python_codes/Calculate.py

@@ -1,33 +1,35 @@
 import os
-import geopandas as gpd
+import sys
 import pandas as pd
-from pyproj import Transformer
 from shapely.geometry import Point
 import numpy as np
+from pathlib import Path
 
+# 添加项目根目录到Python路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.append(str(project_root))
 
+# 导入数据库服务
+from app.services.land_use_service import land_use_service
 
-# 读取 Shp 文件
-shp_file = r"..\Raster\四县三种用电.shp"
-gdf_shp = gpd.read_file(shp_file)
-
+# 设置要处理的土地类型
 str = "旱地"
-# 筛选出字段 DLMC 为指定类型的面要素
-gdf_shp = gdf_shp[gdf_shp['DLMC'] == str]
 
-# 定义坐标系转换对象(从原始坐标系转换为WGS84)
-transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True)
+# 从数据库获取土地利用数据,替代原来的SHP文件读取
+print(f"从数据库获取 '{str}' 类型的土地利用数据...")
+land_centers_df = land_use_service.get_land_centers_for_processing(str)
+
+if land_centers_df is None or land_centers_df.empty:
+    print(f"数据库中没有找到 '{str}' 类型的土地利用数据")
+    exit(1)
+
+print(f"从数据库获取到 {len(land_centers_df)} 个 '{str}' 类型的土地数据")
 
 # 读取 xls 文件
 xls_file = r"..\Data\Irrigation_water_SamplingPoint.xlsx"
 df_xls = pd.read_excel(xls_file)
 
-# 新建重金属含量字段,指定数据类型为双精度浮点数
-gdf_shp['Cd'] = pd.Series(dtype=np.float64)
-gdf_shp['Hg'] = pd.Series(dtype=np.float64)
-gdf_shp['Cr'] = pd.Series(dtype=np.float64)
-gdf_shp['Pb'] = pd.Series(dtype=np.float64)
-gdf_shp['As'] = pd.Series(dtype=np.float64)
+# 不再需要为GeoDataFrame添加字段,直接处理数据即可
 
 # 根据面要素类型设置系数
 A = 711 * 0.524
@@ -45,11 +47,11 @@ else:
 # 创建一个空列表来存储数据行
 data_rows = []
 
-# 遍历面要素
-for index, row in gdf_shp.iterrows():
-    # 获取面要素的中心并转换坐标
-    center_original = row['geometry'].centroid
-    center_lon, center_lat = transformer.transform(center_original.x, center_original.y)
+# 遍历面要素,使用数据库中的中心点坐标
+for index, row in land_centers_df.iterrows():
+    # 直接使用数据库中已经转换好的WGS84坐标
+    center_lon = row['center_lon']
+    center_lat = row['center_lat']
     center_transformed = Point(center_lon, center_lat)
     
     # 计算面要素中心与采样点的距离
@@ -59,24 +61,17 @@ for index, row in gdf_shp.iterrows():
     min_distance_index = distances.idxmin()
     nearest_sample = df_xls.loc[min_distance_index]
     
-    # 将最近采样点的值赋给面要素,并乘以系数Num
+    # 计算最近采样点的重金属含量,并乘以系数Num
     cd_value = nearest_sample['Cd (ug/L)'] * Num
     hg_value = nearest_sample['Hg (ug/L)'] * Num
     cr_value = nearest_sample['Cr (ug/L)'] * Num
     pb_value = nearest_sample['Pb (ug/L)'] * Num
     as_value = nearest_sample['As (ug/L)'] * Num
     
-    # 更新面要素的重金属含量
-    gdf_shp.at[index, 'Cd'] = cd_value
-    gdf_shp.at[index, 'Hg'] = hg_value
-    gdf_shp.at[index, 'Cr'] = cr_value
-    gdf_shp.at[index, 'Pb'] = pb_value
-    gdf_shp.at[index, 'As'] = as_value
-    
     # 将数据行添加到列表中
     data_rows.append({
-        '面要素ID': index,
-        'DLMC': row['DLMC'],
+        '面要素ID': row['gid'],  # 使用数据库中的gid
+        'DLMC': row['dlmc'],    # 使用数据库中的dlmc字段
         '中心点经度': center_lon,
         '中心点纬度': center_lat,
         '最近采样点ID': nearest_sample.name,
@@ -92,8 +87,11 @@ for index, row in gdf_shp.iterrows():
 # 从列表创建DataFrame
 csv_data = pd.DataFrame(data_rows)
 
-# 保存结果为CSV文件 - 修复路径中的转义字符问题
-csv_file_path = os.path.join('D:\土壤-大创\Water\Data\四县三用地&旱地.csv')
+# 保存结果为CSV文件
+output_dir = Path(__file__).parent.parent / "Data"
+output_dir.mkdir(exist_ok=True)
+csv_file_path = output_dir / f"四县三用地&{str}_DB.csv"
 csv_data.to_csv(csv_file_path, index=False, encoding='utf-8-sig')
 
 print(f"CSV文件已保存至: {csv_file_path}")
+print(f"总共处理了 {len(csv_data)} 个 {str} 面要素")

+ 27 - 27
Water/Python_codes/Counter.py

@@ -1,28 +1,29 @@
 import os
+import sys
 import pandas as pd
-from pyproj import Transformer
 from shapely.geometry import Point
 import numpy as np
+from pathlib import Path
 
+# 添加项目根目录到Python路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.append(str(project_root))
 
+# 导入数据库服务
+from app.services.land_use_service import land_use_service
 
-# 读取耕地中心点CSV文件
-csv_cultivated_land = r"..\Data\四县三用地&水田.csv"
-df_land = pd.read_csv(csv_cultivated_land)
+# 设置要处理的土地类型
+land_type = "旱地"  # 避免使用str作为变量名,因为它是Python内置函数
 
-# 确保CSV文件包含必要的字段
-required_fields = ['中心点经度', '中心点纬度', 'DLMC']
-for field in required_fields:
-    if field not in df_land.columns:
-        raise ValueError(f"耕地CSV文件缺少必要的字段: {field}")
+# 从数据库获取土地利用数据,替代原来的CSV文件读取
+print(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
+land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
 
-# 筛选出旱地
-land_type = "旱地"  # 避免使用str作为变量名,因为它是Python内置函数
-df_land = df_land[df_land['DLMC'] == land_type]
+if land_centers_df is None or land_centers_df.empty:
+    print(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
+    exit(1)
 
-# 定义坐标系转换(假设输入已是WGS84,根据实际情况修改)
-source_crs = "EPSG:4326"
-transformer = Transformer.from_crs(source_crs, "EPSG:4326", always_xy=True)
+print(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
 
 # 读取采样点数据
 xls_file = r"..\Data\Irrigation_water_SamplingPoint.xlsx"
@@ -45,15 +46,11 @@ else:
 # 存储结果的列表
 data_rows = []
 
-# 遍历耕地中心点
-for index, row in df_land.iterrows():
-    # 获取正确的坐标字段
-    lon = row['中心点经度']
-    lat = row['中心点纬度']
-    
-    # 坐标转换(如果需要)
-    if source_crs != "EPSG:4326":
-        lon, lat = transformer.transform(lon, lat)
+# 遍历耕地中心点,使用数据库中的中心点坐标
+for index, row in land_centers_df.iterrows():
+    # 直接使用数据库中已经转换好的WGS84坐标
+    lon = row['center_lon']
+    lat = row['center_lat']
     
     # 创建中心点几何对象
     center_point = Point(lon, lat)
@@ -73,7 +70,7 @@ for index, row in df_land.iterrows():
     
     # 添加到结果列表
     data_rows.append({
-        'DLMC': row['DLMC'],
+        'DLMC': row['dlmc'],  # 使用数据库中的字段名
         '中心点经度': lon,
         '中心点纬度': lat,
         'Cd (ug/L)': cd_value
@@ -83,7 +80,10 @@ for index, row in df_land.iterrows():
 csv_data = pd.DataFrame(data_rows)
 
 # 保存结果
-csv_file_path = r"D:\土壤-大创\Water\Data\旱地_Cd.csv"
+output_dir = Path(__file__).parent.parent / "Data"
+output_dir.mkdir(exist_ok=True)
+csv_file_path = output_dir / f"{land_type}_Cd_DB.csv"
 csv_data.to_csv(csv_file_path, index=False, encoding='utf-8-sig')
 
-print(f"CSV文件已保存至: {csv_file_path}")
+print(f"CSV文件已保存至: {csv_file_path}")
+print(f"总共处理了 {len(csv_data)} 个 {land_type} 面要素")

+ 41 - 10
Water/Python_codes/point_match.py

@@ -118,18 +118,48 @@ def write_csv(data, output_file):
         print(f"错误: 写入文件 {output_file} 时发生错误: {e}")
 
 def main():
-    # 配置文件路径
-    farmland_file = r'D:\17417\Documents\backend\Water\Data\四县三用地&水浇地.csv'  # 耕地中心点数据文件
-    sample_file = r'D:\17417\Documents\backend\Water\Data\SamplingPoint.csv'    # 采样点数据文件
-    output_file = r'D:\17417\Documents\backend\Water\Data\matched_data.csv'  # 输出文件
+    """主函数 - 使用数据库数据进行点匹配"""
+    import sys
+    from pathlib import Path
+    
+    # 添加项目根目录到Python路径
+    project_root = Path(__file__).parent.parent.parent
+    sys.path.append(str(project_root))
+    
+    # 导入数据库服务
+    from app.services.land_use_service import land_use_service
+    
+    # 设置要处理的土地类型
+    land_type = "水浇地"
+    
+    # 从数据库获取土地利用数据
+    print(f"正在从数据库读取 {land_type} 数据...")
+    land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
     
-    # 读取数据
-    print("正在读取耕地数据...")
-    farmland_data = read_csv(farmland_file)
-    print(f"已读取 {len(farmland_data)} 条耕地记录")
+    if land_centers_df is None or land_centers_df.empty:
+        print(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
+        return
+    
+    print(f"已从数据库读取 {len(land_centers_df)} 条 {land_type} 记录")
+    
+    # 转换为字典格式以兼容现有函数
+    farmland_data = []
+    for _, row in land_centers_df.iterrows():
+        farmland_data.append({
+            'id': row['gid'],
+            '中心点经度': row['center_lon'],
+            '中心点纬度': row['center_lat'],
+            'DLMC': row['dlmc']
+        })
+    
+    # 配置文件路径
+    sample_file = Path(__file__).parent.parent / "Data" / "SamplingPoint.csv"
+    output_dir = Path(__file__).parent.parent / "Data"
+    output_dir.mkdir(exist_ok=True)
+    output_file = output_dir / f"matched_data_{land_type}_DB.csv"
     
     print("正在读取采样点数据...")
-    sample_data = read_csv(sample_file)
+    sample_data = read_csv(str(sample_file))
     print(f"已读取 {len(sample_data)} 条采样点记录")
     
     if not farmland_data or not sample_data:
@@ -142,7 +172,8 @@ def main():
     
     # 写入结果
     print("正在写入结果...")
-    write_csv(merged_data, output_file)
+    write_csv(merged_data, str(output_file))
+    print(f"总共处理了 {len(merged_data)} 个 {land_type} 面要素")
 
 if __name__ == "__main__":
     main()

+ 331 - 0
app/services/land_use_service.py

@@ -0,0 +1,331 @@
+"""
+四县土地利用类型数据服务
+@description: 提供基于four_county_land表的土地利用类型数据查询和处理功能
+@author: AcidMap Team
+@version: 1.0.0
+"""
+
+import logging
+import pandas as pd
+import numpy as np
+from typing import Dict, Any, List, Optional, Tuple
+from sqlalchemy.orm import Session
+from sqlalchemy import text, and_, func
+from shapely.geometry import Point
+from shapely.wkt import loads
+from pyproj import Transformer
+import functools
+from datetime import datetime, timedelta
+
+from ..database import SessionLocal
+from ..models.land_use import FourCountyLandUse
+
+# 配置日志
+logger = logging.getLogger(__name__)
+
+
+class LandUseService:
+    """
+    四县土地利用类型数据服务类
+    
+    @description: 提供基于four_county_land表的土地利用类型数据查询和处理功能,包含性能优化
+    """
+    
+    def __init__(self):
+        """
+        初始化土地利用服务
+        """
+        self.logger = logging.getLogger(__name__)
+        self._cache = {}  # 简单的内存缓存
+        self._cache_timeout = timedelta(minutes=30)  # 缓存30分钟
+        
+    def get_land_use_by_type(self, land_type: str) -> Optional[List[Dict[str, Any]]]:
+        """
+        根据土地利用类型查询数据
+        
+        @param land_type: 土地利用类型,如:'水田'、'旱地'、'水浇地'
+        @returns: 土地利用数据列表,包含几何中心点坐标信息
+        """
+        try:
+            with SessionLocal() as db:
+                # 查询指定类型的土地利用数据
+                query = db.query(FourCountyLandUse).filter(
+                    FourCountyLandUse.dlmc == land_type
+                )
+                
+                land_records = query.all()
+                
+                if not land_records:
+                    self.logger.warning(f"未找到类型为 '{land_type}' 的土地利用数据")
+                    return None
+                
+                self.logger.info(f"查询到 {len(land_records)} 条 '{land_type}' 类型的土地利用数据")
+                
+                # 转换数据为列表
+                result = []
+                for record in land_records:
+                    # 从PostGIS几何数据中提取中心点坐标
+                    center_info = self._extract_geometry_center(db, record.gid)
+                    
+                    land_data = {
+                        'gid': record.gid,
+                        'objectid': record.objectid,
+                        'dlmc': record.dlmc,  # 地类名称
+                        'dlbm': record.dlbm,  # 地类编码
+                        'tbmj': float(record.tbmj) if record.tbmj else None,  # 图斑面积
+                        'center_lon': center_info['longitude'] if center_info else None,
+                        'center_lat': center_info['latitude'] if center_info else None,
+                        'shape_area': float(record.shape_area) if record.shape_area else None,
+                        # 其他可能需要的字段
+                        'qsdwmc': record.qsdwmc,  # 权属单位名称
+                        'zldwmc': record.zldwmc,  # 坐落单位名称
+                    }
+                    result.append(land_data)
+                
+                return result
+                
+        except Exception as e:
+            self.logger.error(f"查询土地利用数据失败: {str(e)}")
+            return None
+    
+    def _extract_geometry_center(self, db: Session, gid: int) -> Optional[Dict[str, float]]:
+        """
+        提取几何图形的中心点坐标
+        
+        @param db: 数据库会话
+        @param gid: 图形记录ID
+        @returns: 包含经纬度的字典
+        """
+        try:
+            # 使用PostGIS函数计算几何中心并转换为WGS84坐标系
+            sql = text("""
+                SELECT 
+                    ST_X(ST_Transform(ST_Centroid(geom), 4326)) as longitude,
+                    ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as latitude
+                FROM four_county_land 
+                WHERE gid = :gid
+            """)
+            
+            result = db.execute(sql, {'gid': gid}).fetchone()
+            
+            if result:
+                return {
+                    'longitude': float(result.longitude),
+                    'latitude': float(result.latitude)
+                }
+            else:
+                self.logger.warning(f"无法获取GID {gid} 的几何中心点")
+                return None
+                
+        except Exception as e:
+            self.logger.error(f"提取几何中心点失败 (GID: {gid}): {str(e)}")
+            return None
+    
+    def _is_cache_valid(self, cache_key: str) -> bool:
+        """检查缓存是否有效"""
+        if cache_key not in self._cache:
+            return False
+        cache_time = self._cache[cache_key].get('timestamp')
+        if not cache_time:
+            return False
+        return datetime.now() - cache_time < self._cache_timeout
+    
+    def _get_from_cache(self, cache_key: str):
+        """从缓存获取数据"""
+        if self._is_cache_valid(cache_key):
+            return self._cache[cache_key]['data']
+        return None
+    
+    def _set_cache(self, cache_key: str, data):
+        """设置缓存数据"""
+        self._cache[cache_key] = {
+            'data': data,
+            'timestamp': datetime.now()
+        }
+    
+    def get_land_centers_optimized(self, land_type: str, limit: Optional[int] = None, 
+                                  offset: Optional[int] = None) -> Optional[pd.DataFrame]:
+        """
+        高性能批量获取土地中心点数据
+        
+        @param land_type: 土地利用类型
+        @param limit: 限制返回数量,None表示返回所有
+        @param offset: 偏移量,用于分页
+        @returns: 包含中心点坐标的DataFrame
+        """
+        try:
+            # 构建缓存键
+            cache_key = f"centers_optimized_{land_type}_{limit}_{offset}"
+            
+            # 尝试从缓存获取
+            cached_data = self._get_from_cache(cache_key)
+            if cached_data is not None:
+                self.logger.info(f"从缓存获取 {land_type} 中心点数据,数量: {len(cached_data)}")
+                return cached_data
+            
+            with SessionLocal() as db:
+                # 使用一次性SQL查询获取所有必要数据,避免N+1查询问题
+                sql = text("""
+                    SELECT 
+                        gid,
+                        dlmc,
+                        shape_area,
+                        ST_X(ST_Transform(ST_Centroid(geom), 4326)) as center_lon,
+                        ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as center_lat
+                    FROM four_county_land 
+                    WHERE dlmc = :land_type
+                    AND geom IS NOT NULL
+                    ORDER BY gid
+                    %s
+                """ % (f"LIMIT {limit} OFFSET {offset or 0}" if limit else ""))
+                
+                results = db.execute(sql, {'land_type': land_type}).fetchall()
+                
+                if not results:
+                    self.logger.warning(f"未找到 '{land_type}' 类型的土地利用数据")
+                    return None
+                
+                # 直接构建DataFrame,避免中间转换
+                data = {
+                    'gid': [r.gid for r in results],
+                    'center_lon': [float(r.center_lon) for r in results],
+                    'center_lat': [float(r.center_lat) for r in results],
+                    'dlmc': [r.dlmc for r in results],
+                    'shape_area': [float(r.shape_area) if r.shape_area else None for r in results]
+                }
+                
+                df = pd.DataFrame(data)
+                
+                # 缓存结果
+                self._set_cache(cache_key, df)
+                
+                self.logger.info(f"高性能查询获取 {len(df)} 个 '{land_type}' 中心点")
+                return df
+                
+        except Exception as e:
+            self.logger.error(f"高性能获取土地中心点数据失败: {str(e)}")
+            return None
+    
+    def get_land_centers_for_processing(self, land_type: str) -> Optional[pd.DataFrame]:
+        """
+        获取用于处理的土地中心点数据(使用优化版本)
+        
+        @param land_type: 土地利用类型
+        @returns: 包含中心点坐标的DataFrame
+        """
+        # 使用优化版本的方法
+        return self.get_land_centers_optimized(land_type)
+    
+    def get_available_land_types(self) -> List[str]:
+        """
+        获取数据库中可用的土地利用类型(带缓存)
+        
+        @returns: 土地利用类型列表
+        """
+        cache_key = "available_land_types"
+        
+        # 尝试从缓存获取
+        cached_data = self._get_from_cache(cache_key)
+        if cached_data is not None:
+            return cached_data
+        
+        try:
+            with SessionLocal() as db:
+                # 查询所有不同的土地利用类型
+                query = db.query(FourCountyLandUse.dlmc).distinct()
+                results = query.all()
+                
+                land_types = [result.dlmc for result in results if result.dlmc]
+                land_types.sort()  # 排序便于查看
+                
+                # 缓存结果
+                self._set_cache(cache_key, land_types)
+                
+                self.logger.info(f"可用的土地利用类型: {land_types}")
+                return land_types
+                
+        except Exception as e:
+            self.logger.error(f"获取土地利用类型失败: {str(e)}")
+            return []
+    
+    def get_land_statistics_summary(self) -> Dict[str, Any]:
+        """
+        获取土地利用数据统计摘要(带缓存)
+        
+        @returns: 统计摘要信息
+        """
+        cache_key = "land_statistics_summary"
+        
+        # 尝试从缓存获取
+        cached_data = self._get_from_cache(cache_key)
+        if cached_data is not None:
+            return cached_data
+        
+        try:
+            with SessionLocal() as db:
+                # 统计各类型土地的数量
+                sql = text("""
+                    SELECT 
+                        dlmc,
+                        COUNT(*) as count,
+                        SUM(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE 0 END) as total_area,
+                        AVG(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE NULL END) as avg_area
+                    FROM four_county_land 
+                    WHERE dlmc IS NOT NULL
+                    GROUP BY dlmc
+                    ORDER BY count DESC
+                """)
+                
+                results = db.execute(sql).fetchall()
+                
+                summary = {
+                    'total_records': 0,
+                    'land_types': [],
+                    'statistics_by_type': {}
+                }
+                
+                for result in results:
+                    land_type = result.dlmc
+                    count = result.count
+                    total_area = float(result.total_area) if result.total_area else 0
+                    avg_area = float(result.avg_area) if result.avg_area else 0
+                    
+                    summary['total_records'] += count
+                    summary['land_types'].append(land_type)
+                    summary['statistics_by_type'][land_type] = {
+                        'count': count,
+                        'total_area': total_area,
+                        'average_area': avg_area
+                    }
+                
+                # 缓存结果
+                self._set_cache(cache_key, summary)
+                
+                self.logger.info(f"统计摘要: 总记录数 {summary['total_records']}, 土地类型数 {len(summary['land_types'])}")
+                return summary
+                
+        except Exception as e:
+            self.logger.error(f"获取统计摘要失败: {str(e)}")
+            return {
+                'total_records': 0,
+                'land_types': [],
+                'statistics_by_type': {}
+            }
+    
+    def clear_cache(self):
+        """清空缓存"""
+        self._cache.clear()
+        self.logger.info("缓存已清空")
+    
+    def get_cache_info(self) -> Dict[str, Any]:
+        """获取缓存信息"""
+        cache_info = {
+            'cache_size': len(self._cache),
+            'cache_keys': list(self._cache.keys()),
+            'cache_timeout_minutes': self._cache_timeout.total_seconds() / 60
+        }
+        return cache_info
+
+
+# 全局服务实例
+land_use_service = LandUseService()

+ 21 - 20
app/services/water_service.py

@@ -13,14 +13,11 @@ import sys
 
 # 导入MappingUtils
 from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow
+# 导入土地利用服务
+from .land_use_service import land_use_service
 
 # 配置日志
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-handler = logging.StreamHandler()
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-handler.setFormatter(formatter)
-logger.addHandler(handler)
 
 # 设置全局字体
 import matplotlib.pyplot as plt
@@ -65,26 +62,28 @@ def get_base_dir():
 def process_land_data(land_type, coefficient_params=None):
     """处理土地类型数据并生成清洗后的简化数据"""
     base_dir = get_base_dir()
-    shp_file = os.path.join(base_dir, "..", "static", "water", "Raster", "四县三种用电.shp")
     xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
 
     logger.info(f"处理土地类型: {land_type}")
-    logger.info(f"SHP文件: {shp_file}")
     logger.info(f"Excel文件: {xls_file}")
 
-    # 读取和处理SHP数据
-    gdf_shp = gpd.read_file(shp_file)
-    gdf_shp = gdf_shp[gdf_shp['DLMC'] == land_type]
-
-    if gdf_shp.empty:
-        logger.warning(f"没有找到 '{land_type}' 类型的要素")
+    # 从数据库获取土地利用数据,替代原来的SHP文件读取
+    logger.info(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
+    land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
+    
+    if land_centers_df is None or land_centers_df.empty:
+        logger.warning(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
         return None, None
 
-    # 坐标系转换器
-    transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True)
+    logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
 
     # 读取Excel采样点数据
+    if not os.path.exists(xls_file):
+        logger.error(f"采样点Excel文件不存在: {xls_file}")
+        return None, None
+        
     df_xls = pd.read_excel(xls_file)
+    logger.info(f"读取到 {len(df_xls)} 个采样点数据")
 
     # 设置土地类型系数
     default_params = {
@@ -98,14 +97,16 @@ def process_land_data(land_type, coefficient_params=None):
     Num = param1 * param2
     logger.info(f"系数: {param1} * {param2} = {Num}")
 
-    # 处理每个面要素
+    # 处理每个面要素,使用数据库中的中心点坐标
     cd_values = []
     centers = []
-    for index, row in gdf_shp.iterrows():
-        center_original = row['geometry'].centroid
-        center_lon, center_lat = transformer.transform(center_original.x, center_original.y)
+    
+    for index, row in land_centers_df.iterrows():
+        center_lon = row['center_lon']
+        center_lat = row['center_lat']
         centers.append((center_lon, center_lat))
 
+        # 计算到所有采样点的距离
         distances = df_xls.apply(
             lambda x: Point(center_lon, center_lat).distance(Point(x['经度'], x['纬度'])),
             axis=1
@@ -113,7 +114,7 @@ def process_land_data(land_type, coefficient_params=None):
         min_idx = distances.idxmin()
         nearest = df_xls.loc[min_idx]
 
-        # 计算Cd含量值
+        # 计算Cd含量值
         cd_value = nearest['Cd (ug/L)'] * Num
         cd_values.append(cd_value)