123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- """
- 四县土地利用类型数据服务
- @description: 提供基于four_county_land表的土地利用类型数据查询和处理功能
- @author: AcidMap Team
- @version: 1.0.0
- """
- import logging
- import pandas as pd
- import numpy as np
- from typing import Dict, Any, List, Optional, Tuple
- from sqlalchemy.orm import Session
- from sqlalchemy import text, and_, func
- from shapely.geometry import Point
- from shapely.wkt import loads
- from pyproj import Transformer
- import functools
- from datetime import datetime, timedelta
- from ..database import SessionLocal
- from ..models.land_use import FourCountyLandUse
- # 配置日志
- logger = logging.getLogger(__name__)
- class LandUseService:
- """
- 四县土地利用类型数据服务类
-
- @description: 提供基于four_county_land表的土地利用类型数据查询和处理功能,包含性能优化
- """
-
- def __init__(self):
- """
- 初始化土地利用服务
- """
- self.logger = logging.getLogger(__name__)
- self._cache = {} # 简单的内存缓存
- self._cache_timeout = timedelta(minutes=30) # 缓存30分钟
-
- def get_land_use_by_type(self, land_type: str) -> Optional[List[Dict[str, Any]]]:
- """
- 根据土地利用类型查询数据
-
- @param land_type: 土地利用类型,如:'水田'、'旱地'、'水浇地'
- @returns: 土地利用数据列表,包含几何中心点坐标信息
- """
- try:
- with SessionLocal() as db:
- # 查询指定类型的土地利用数据
- query = db.query(FourCountyLandUse).filter(
- FourCountyLandUse.dlmc == land_type
- )
-
- land_records = query.all()
-
- if not land_records:
- self.logger.warning(f"未找到类型为 '{land_type}' 的土地利用数据")
- return None
-
- self.logger.info(f"查询到 {len(land_records)} 条 '{land_type}' 类型的土地利用数据")
-
- # 转换数据为列表
- result = []
- for record in land_records:
- # 从PostGIS几何数据中提取中心点坐标
- center_info = self._extract_geometry_center(db, record.gid)
-
- land_data = {
- 'gid': record.gid,
- 'objectid': record.objectid,
- 'dlmc': record.dlmc, # 地类名称
- 'dlbm': record.dlbm, # 地类编码
- 'tbmj': float(record.tbmj) if record.tbmj else None, # 图斑面积
- 'center_lon': center_info['longitude'] if center_info else None,
- 'center_lat': center_info['latitude'] if center_info else None,
- 'shape_area': float(record.shape_area) if record.shape_area else None,
- # 其他可能需要的字段
- 'qsdwmc': record.qsdwmc, # 权属单位名称
- 'zldwmc': record.zldwmc, # 坐落单位名称
- }
- result.append(land_data)
-
- return result
-
- except Exception as e:
- self.logger.error(f"查询土地利用数据失败: {str(e)}")
- return None
-
- def _extract_geometry_center(self, db: Session, gid: int) -> Optional[Dict[str, float]]:
- """
- 提取几何图形的中心点坐标
-
- @param db: 数据库会话
- @param gid: 图形记录ID
- @returns: 包含经纬度的字典
- """
- try:
- # 使用PostGIS函数计算几何中心并转换为WGS84坐标系
- sql = text("""
- SELECT
- ST_X(ST_Transform(ST_Centroid(geom), 4326)) as longitude,
- ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as latitude
- FROM four_county_land
- WHERE gid = :gid
- """)
-
- result = db.execute(sql, {'gid': gid}).fetchone()
-
- if result:
- return {
- 'longitude': float(result.longitude),
- 'latitude': float(result.latitude)
- }
- else:
- self.logger.warning(f"无法获取GID {gid} 的几何中心点")
- return None
-
- except Exception as e:
- self.logger.error(f"提取几何中心点失败 (GID: {gid}): {str(e)}")
- return None
-
- def _is_cache_valid(self, cache_key: str) -> bool:
- """检查缓存是否有效"""
- if cache_key not in self._cache:
- return False
- cache_time = self._cache[cache_key].get('timestamp')
- if not cache_time:
- return False
- return datetime.now() - cache_time < self._cache_timeout
-
- def _get_from_cache(self, cache_key: str):
- """从缓存获取数据"""
- if self._is_cache_valid(cache_key):
- return self._cache[cache_key]['data']
- return None
-
- def _set_cache(self, cache_key: str, data):
- """设置缓存数据"""
- self._cache[cache_key] = {
- 'data': data,
- 'timestamp': datetime.now()
- }
-
- def get_land_centers_optimized(self, land_type: str, limit: Optional[int] = None,
- offset: Optional[int] = None) -> Optional[pd.DataFrame]:
- """
- 高性能批量获取土地中心点数据
-
- @param land_type: 土地利用类型
- @param limit: 限制返回数量,None表示返回所有
- @param offset: 偏移量,用于分页
- @returns: 包含中心点坐标的DataFrame
- """
- try:
- # 构建缓存键
- cache_key = f"centers_optimized_{land_type}_{limit}_{offset}"
-
- # 尝试从缓存获取
- cached_data = self._get_from_cache(cache_key)
- if cached_data is not None:
- self.logger.info(f"从缓存获取 {land_type} 中心点数据,数量: {len(cached_data)}")
- return cached_data
-
- with SessionLocal() as db:
- # 使用一次性SQL查询获取所有必要数据,避免N+1查询问题
- sql = text("""
- SELECT
- gid,
- dlmc,
- shape_area,
- ST_X(ST_Transform(ST_Centroid(geom), 4326)) as center_lon,
- ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as center_lat
- FROM four_county_land
- WHERE dlmc = :land_type
- AND geom IS NOT NULL
- ORDER BY gid
- %s
- """ % (f"LIMIT {limit} OFFSET {offset or 0}" if limit else ""))
-
- results = db.execute(sql, {'land_type': land_type}).fetchall()
-
- if not results:
- self.logger.warning(f"未找到 '{land_type}' 类型的土地利用数据")
- return None
-
- # 直接构建DataFrame,避免中间转换
- data = {
- 'gid': [r.gid for r in results],
- 'center_lon': [float(r.center_lon) for r in results],
- 'center_lat': [float(r.center_lat) for r in results],
- 'dlmc': [r.dlmc for r in results],
- 'shape_area': [float(r.shape_area) if r.shape_area else None for r in results]
- }
-
- df = pd.DataFrame(data)
-
- # 缓存结果
- self._set_cache(cache_key, df)
-
- self.logger.info(f"高性能查询获取 {len(df)} 个 '{land_type}' 中心点")
- return df
-
- except Exception as e:
- self.logger.error(f"高性能获取土地中心点数据失败: {str(e)}")
- return None
-
- def get_land_centers_for_processing(self, land_type: str) -> Optional[pd.DataFrame]:
- """
- 获取用于处理的土地中心点数据(使用优化版本)
-
- @param land_type: 土地利用类型
- @returns: 包含中心点坐标的DataFrame
- """
- # 使用优化版本的方法
- return self.get_land_centers_optimized(land_type)
-
- def get_available_land_types(self) -> List[str]:
- """
- 获取数据库中可用的土地利用类型(带缓存)
-
- @returns: 土地利用类型列表
- """
- cache_key = "available_land_types"
-
- # 尝试从缓存获取
- cached_data = self._get_from_cache(cache_key)
- if cached_data is not None:
- return cached_data
-
- try:
- with SessionLocal() as db:
- # 查询所有不同的土地利用类型
- query = db.query(FourCountyLandUse.dlmc).distinct()
- results = query.all()
-
- land_types = [result.dlmc for result in results if result.dlmc]
- land_types.sort() # 排序便于查看
-
- # 缓存结果
- self._set_cache(cache_key, land_types)
-
- self.logger.info(f"可用的土地利用类型: {land_types}")
- return land_types
-
- except Exception as e:
- self.logger.error(f"获取土地利用类型失败: {str(e)}")
- return []
-
- def get_land_statistics_summary(self) -> Dict[str, Any]:
- """
- 获取土地利用数据统计摘要(带缓存)
-
- @returns: 统计摘要信息
- """
- cache_key = "land_statistics_summary"
-
- # 尝试从缓存获取
- cached_data = self._get_from_cache(cache_key)
- if cached_data is not None:
- return cached_data
-
- try:
- with SessionLocal() as db:
- # 统计各类型土地的数量
- sql = text("""
- SELECT
- dlmc,
- COUNT(*) as count,
- SUM(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE 0 END) as total_area,
- AVG(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE NULL END) as avg_area
- FROM four_county_land
- WHERE dlmc IS NOT NULL
- GROUP BY dlmc
- ORDER BY count DESC
- """)
-
- results = db.execute(sql).fetchall()
-
- summary = {
- 'total_records': 0,
- 'land_types': [],
- 'statistics_by_type': {}
- }
-
- for result in results:
- land_type = result.dlmc
- count = result.count
- total_area = float(result.total_area) if result.total_area else 0
- avg_area = float(result.avg_area) if result.avg_area else 0
-
- summary['total_records'] += count
- summary['land_types'].append(land_type)
- summary['statistics_by_type'][land_type] = {
- 'count': count,
- 'total_area': total_area,
- 'average_area': avg_area
- }
-
- # 缓存结果
- self._set_cache(cache_key, summary)
-
- self.logger.info(f"统计摘要: 总记录数 {summary['total_records']}, 土地类型数 {len(summary['land_types'])}")
- return summary
-
- except Exception as e:
- self.logger.error(f"获取统计摘要失败: {str(e)}")
- return {
- 'total_records': 0,
- 'land_types': [],
- 'statistics_by_type': {}
- }
-
- def clear_cache(self):
- """清空缓存"""
- self._cache.clear()
- self.logger.info("缓存已清空")
-
- def get_cache_info(self) -> Dict[str, Any]:
- """获取缓存信息"""
- cache_info = {
- 'cache_size': len(self._cache),
- 'cache_keys': list(self._cache.keys()),
- 'cache_timeout_minutes': self._cache_timeout.total_seconds() / 60
- }
- return cache_info
- # 全局服务实例
- land_use_service = LandUseService()
|