""" 四县土地利用类型数据服务 @description: 提供基于four_county_land表的土地利用类型数据查询和处理功能 @author: AcidMap Team @version: 1.0.0 """ import logging import pandas as pd import numpy as np from typing import Dict, Any, List, Optional, Tuple from sqlalchemy.orm import Session from sqlalchemy import text, and_, func from shapely.geometry import Point from shapely.wkt import loads from pyproj import Transformer import functools from datetime import datetime, timedelta from ..database import SessionLocal from ..models.land_use import FourCountyLandUse # 配置日志 logger = logging.getLogger(__name__) class LandUseService: """ 四县土地利用类型数据服务类 @description: 提供基于four_county_land表的土地利用类型数据查询和处理功能,包含性能优化 """ def __init__(self): """ 初始化土地利用服务 """ self.logger = logging.getLogger(__name__) self._cache = {} # 简单的内存缓存 self._cache_timeout = timedelta(minutes=30) # 缓存30分钟 def get_land_use_by_type(self, land_type: str) -> Optional[List[Dict[str, Any]]]: """ 根据土地利用类型查询数据 @param land_type: 土地利用类型,如:'水田'、'旱地'、'水浇地' @returns: 土地利用数据列表,包含几何中心点坐标信息 """ try: with SessionLocal() as db: # 查询指定类型的土地利用数据 query = db.query(FourCountyLandUse).filter( FourCountyLandUse.dlmc == land_type ) land_records = query.all() if not land_records: self.logger.warning(f"未找到类型为 '{land_type}' 的土地利用数据") return None self.logger.info(f"查询到 {len(land_records)} 条 '{land_type}' 类型的土地利用数据") # 转换数据为列表 result = [] for record in land_records: # 从PostGIS几何数据中提取中心点坐标 center_info = self._extract_geometry_center(db, record.gid) land_data = { 'gid': record.gid, 'objectid': record.objectid, 'dlmc': record.dlmc, # 地类名称 'dlbm': record.dlbm, # 地类编码 'tbmj': float(record.tbmj) if record.tbmj else None, # 图斑面积 'center_lon': center_info['longitude'] if center_info else None, 'center_lat': center_info['latitude'] if center_info else None, 'shape_area': float(record.shape_area) if record.shape_area else None, # 其他可能需要的字段 'qsdwmc': record.qsdwmc, # 权属单位名称 'zldwmc': record.zldwmc, # 坐落单位名称 } result.append(land_data) return result except Exception as e: self.logger.error(f"查询土地利用数据失败: {str(e)}") return None def _extract_geometry_center(self, db: Session, gid: int) -> Optional[Dict[str, float]]: """ 提取几何图形的中心点坐标 @param db: 数据库会话 @param gid: 图形记录ID @returns: 包含经纬度的字典 """ try: # 使用PostGIS函数计算几何中心并转换为WGS84坐标系 sql = text(""" SELECT ST_X(ST_Transform(ST_Centroid(geom), 4326)) as longitude, ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as latitude FROM four_county_land WHERE gid = :gid """) result = db.execute(sql, {'gid': gid}).fetchone() if result: return { 'longitude': float(result.longitude), 'latitude': float(result.latitude) } else: self.logger.warning(f"无法获取GID {gid} 的几何中心点") return None except Exception as e: self.logger.error(f"提取几何中心点失败 (GID: {gid}): {str(e)}") return None def _is_cache_valid(self, cache_key: str) -> bool: """检查缓存是否有效""" if cache_key not in self._cache: return False cache_time = self._cache[cache_key].get('timestamp') if not cache_time: return False return datetime.now() - cache_time < self._cache_timeout def _get_from_cache(self, cache_key: str): """从缓存获取数据""" if self._is_cache_valid(cache_key): return self._cache[cache_key]['data'] return None def _set_cache(self, cache_key: str, data): """设置缓存数据""" self._cache[cache_key] = { 'data': data, 'timestamp': datetime.now() } def get_land_centers_optimized(self, land_type: str, limit: Optional[int] = None, offset: Optional[int] = None) -> Optional[pd.DataFrame]: """ 高性能批量获取土地中心点数据 @param land_type: 土地利用类型 @param limit: 限制返回数量,None表示返回所有 @param offset: 偏移量,用于分页 @returns: 包含中心点坐标的DataFrame """ try: # 构建缓存键 cache_key = f"centers_optimized_{land_type}_{limit}_{offset}" # 尝试从缓存获取 cached_data = self._get_from_cache(cache_key) if cached_data is not None: self.logger.info(f"从缓存获取 {land_type} 中心点数据,数量: {len(cached_data)}") return cached_data with SessionLocal() as db: # 使用一次性SQL查询获取所有必要数据,避免N+1查询问题 sql = text(""" SELECT gid, dlmc, shape_area, ST_X(ST_Transform(ST_Centroid(geom), 4326)) as center_lon, ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as center_lat FROM four_county_land WHERE dlmc = :land_type AND geom IS NOT NULL ORDER BY gid %s """ % (f"LIMIT {limit} OFFSET {offset or 0}" if limit else "")) results = db.execute(sql, {'land_type': land_type}).fetchall() if not results: self.logger.warning(f"未找到 '{land_type}' 类型的土地利用数据") return None # 直接构建DataFrame,避免中间转换 data = { 'gid': [r.gid for r in results], 'center_lon': [float(r.center_lon) for r in results], 'center_lat': [float(r.center_lat) for r in results], 'dlmc': [r.dlmc for r in results], 'shape_area': [float(r.shape_area) if r.shape_area else None for r in results] } df = pd.DataFrame(data) # 缓存结果 self._set_cache(cache_key, df) self.logger.info(f"高性能查询获取 {len(df)} 个 '{land_type}' 中心点") return df except Exception as e: self.logger.error(f"高性能获取土地中心点数据失败: {str(e)}") return None def get_land_centers_for_processing(self, land_type: str) -> Optional[pd.DataFrame]: """ 获取用于处理的土地中心点数据(使用优化版本) @param land_type: 土地利用类型 @returns: 包含中心点坐标的DataFrame """ # 使用优化版本的方法 return self.get_land_centers_optimized(land_type) def get_available_land_types(self) -> List[str]: """ 获取数据库中可用的土地利用类型(带缓存) @returns: 土地利用类型列表 """ cache_key = "available_land_types" # 尝试从缓存获取 cached_data = self._get_from_cache(cache_key) if cached_data is not None: return cached_data try: with SessionLocal() as db: # 查询所有不同的土地利用类型 query = db.query(FourCountyLandUse.dlmc).distinct() results = query.all() land_types = [result.dlmc for result in results if result.dlmc] land_types.sort() # 排序便于查看 # 缓存结果 self._set_cache(cache_key, land_types) self.logger.info(f"可用的土地利用类型: {land_types}") return land_types except Exception as e: self.logger.error(f"获取土地利用类型失败: {str(e)}") return [] def get_land_statistics_summary(self) -> Dict[str, Any]: """ 获取土地利用数据统计摘要(带缓存) @returns: 统计摘要信息 """ cache_key = "land_statistics_summary" # 尝试从缓存获取 cached_data = self._get_from_cache(cache_key) if cached_data is not None: return cached_data try: with SessionLocal() as db: # 统计各类型土地的数量 sql = text(""" SELECT dlmc, COUNT(*) as count, SUM(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE 0 END) as total_area, AVG(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE NULL END) as avg_area FROM four_county_land WHERE dlmc IS NOT NULL GROUP BY dlmc ORDER BY count DESC """) results = db.execute(sql).fetchall() summary = { 'total_records': 0, 'land_types': [], 'statistics_by_type': {} } for result in results: land_type = result.dlmc count = result.count total_area = float(result.total_area) if result.total_area else 0 avg_area = float(result.avg_area) if result.avg_area else 0 summary['total_records'] += count summary['land_types'].append(land_type) summary['statistics_by_type'][land_type] = { 'count': count, 'total_area': total_area, 'average_area': avg_area } # 缓存结果 self._set_cache(cache_key, summary) self.logger.info(f"统计摘要: 总记录数 {summary['total_records']}, 土地类型数 {len(summary['land_types'])}") return summary except Exception as e: self.logger.error(f"获取统计摘要失败: {str(e)}") return { 'total_records': 0, 'land_types': [], 'statistics_by_type': {} } def clear_cache(self): """清空缓存""" self._cache.clear() self.logger.info("缓存已清空") def get_cache_info(self) -> Dict[str, Any]: """获取缓存信息""" cache_info = { 'cache_size': len(self._cache), 'cache_keys': list(self._cache.keys()), 'cache_timeout_minutes': self._cache_timeout.total_seconds() / 60 } return cache_info # 全局服务实例 land_use_service = LandUseService()