land_use_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. """
  2. 四县土地利用类型数据服务
  3. @description: 提供基于four_county_land表的土地利用类型数据查询和处理功能
  4. @author: AcidMap Team
  5. @version: 1.0.0
  6. """
  7. import logging
  8. import pandas as pd
  9. import numpy as np
  10. from typing import Dict, Any, List, Optional, Tuple
  11. from sqlalchemy.orm import Session
  12. from sqlalchemy import text, and_, func
  13. from shapely.geometry import Point
  14. from shapely.wkt import loads
  15. from pyproj import Transformer
  16. import functools
  17. from datetime import datetime, timedelta
  18. from ..database import SessionLocal
  19. from ..models.land_use import FourCountyLandUse
  20. # 配置日志
  21. logger = logging.getLogger(__name__)
  22. class LandUseService:
  23. """
  24. 四县土地利用类型数据服务类
  25. @description: 提供基于four_county_land表的土地利用类型数据查询和处理功能,包含性能优化
  26. """
  27. def __init__(self):
  28. """
  29. 初始化土地利用服务
  30. """
  31. self.logger = logging.getLogger(__name__)
  32. self._cache = {} # 简单的内存缓存
  33. self._cache_timeout = timedelta(minutes=30) # 缓存30分钟
  34. def get_land_use_by_type(self, land_type: str) -> Optional[List[Dict[str, Any]]]:
  35. """
  36. 根据土地利用类型查询数据
  37. @param land_type: 土地利用类型,如:'水田'、'旱地'、'水浇地'
  38. @returns: 土地利用数据列表,包含几何中心点坐标信息
  39. """
  40. try:
  41. with SessionLocal() as db:
  42. # 查询指定类型的土地利用数据
  43. query = db.query(FourCountyLandUse).filter(
  44. FourCountyLandUse.dlmc == land_type
  45. )
  46. land_records = query.all()
  47. if not land_records:
  48. self.logger.warning(f"未找到类型为 '{land_type}' 的土地利用数据")
  49. return None
  50. self.logger.info(f"查询到 {len(land_records)} 条 '{land_type}' 类型的土地利用数据")
  51. # 转换数据为列表
  52. result = []
  53. for record in land_records:
  54. # 从PostGIS几何数据中提取中心点坐标
  55. center_info = self._extract_geometry_center(db, record.gid)
  56. land_data = {
  57. 'gid': record.gid,
  58. 'objectid': record.objectid,
  59. 'dlmc': record.dlmc, # 地类名称
  60. 'dlbm': record.dlbm, # 地类编码
  61. 'tbmj': float(record.tbmj) if record.tbmj else None, # 图斑面积
  62. 'center_lon': center_info['longitude'] if center_info else None,
  63. 'center_lat': center_info['latitude'] if center_info else None,
  64. 'shape_area': float(record.shape_area) if record.shape_area else None,
  65. # 其他可能需要的字段
  66. 'qsdwmc': record.qsdwmc, # 权属单位名称
  67. 'zldwmc': record.zldwmc, # 坐落单位名称
  68. }
  69. result.append(land_data)
  70. return result
  71. except Exception as e:
  72. self.logger.error(f"查询土地利用数据失败: {str(e)}")
  73. return None
  74. def _extract_geometry_center(self, db: Session, gid: int) -> Optional[Dict[str, float]]:
  75. """
  76. 提取几何图形的中心点坐标
  77. @param db: 数据库会话
  78. @param gid: 图形记录ID
  79. @returns: 包含经纬度的字典
  80. """
  81. try:
  82. # 使用PostGIS函数计算几何中心并转换为WGS84坐标系
  83. sql = text("""
  84. SELECT
  85. ST_X(ST_Transform(ST_Centroid(geom), 4326)) as longitude,
  86. ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as latitude
  87. FROM four_county_land
  88. WHERE gid = :gid
  89. """)
  90. result = db.execute(sql, {'gid': gid}).fetchone()
  91. if result:
  92. return {
  93. 'longitude': float(result.longitude),
  94. 'latitude': float(result.latitude)
  95. }
  96. else:
  97. self.logger.warning(f"无法获取GID {gid} 的几何中心点")
  98. return None
  99. except Exception as e:
  100. self.logger.error(f"提取几何中心点失败 (GID: {gid}): {str(e)}")
  101. return None
  102. def _is_cache_valid(self, cache_key: str) -> bool:
  103. """检查缓存是否有效"""
  104. if cache_key not in self._cache:
  105. return False
  106. cache_time = self._cache[cache_key].get('timestamp')
  107. if not cache_time:
  108. return False
  109. return datetime.now() - cache_time < self._cache_timeout
  110. def _get_from_cache(self, cache_key: str):
  111. """从缓存获取数据"""
  112. if self._is_cache_valid(cache_key):
  113. return self._cache[cache_key]['data']
  114. return None
  115. def _set_cache(self, cache_key: str, data):
  116. """设置缓存数据"""
  117. self._cache[cache_key] = {
  118. 'data': data,
  119. 'timestamp': datetime.now()
  120. }
  121. def get_land_centers_optimized(self, land_type: str, limit: Optional[int] = None,
  122. offset: Optional[int] = None) -> Optional[pd.DataFrame]:
  123. """
  124. 高性能批量获取土地中心点数据
  125. @param land_type: 土地利用类型
  126. @param limit: 限制返回数量,None表示返回所有
  127. @param offset: 偏移量,用于分页
  128. @returns: 包含中心点坐标的DataFrame
  129. """
  130. try:
  131. # 构建缓存键
  132. cache_key = f"centers_optimized_{land_type}_{limit}_{offset}"
  133. # 尝试从缓存获取
  134. cached_data = self._get_from_cache(cache_key)
  135. if cached_data is not None:
  136. self.logger.info(f"从缓存获取 {land_type} 中心点数据,数量: {len(cached_data)}")
  137. return cached_data
  138. with SessionLocal() as db:
  139. # 使用一次性SQL查询获取所有必要数据,避免N+1查询问题
  140. sql = text("""
  141. SELECT
  142. gid,
  143. dlmc,
  144. shape_area,
  145. ST_X(ST_Transform(ST_Centroid(geom), 4326)) as center_lon,
  146. ST_Y(ST_Transform(ST_Centroid(geom), 4326)) as center_lat
  147. FROM four_county_land
  148. WHERE dlmc = :land_type
  149. AND geom IS NOT NULL
  150. ORDER BY gid
  151. %s
  152. """ % (f"LIMIT {limit} OFFSET {offset or 0}" if limit else ""))
  153. results = db.execute(sql, {'land_type': land_type}).fetchall()
  154. if not results:
  155. self.logger.warning(f"未找到 '{land_type}' 类型的土地利用数据")
  156. return None
  157. # 直接构建DataFrame,避免中间转换
  158. data = {
  159. 'gid': [r.gid for r in results],
  160. 'center_lon': [float(r.center_lon) for r in results],
  161. 'center_lat': [float(r.center_lat) for r in results],
  162. 'dlmc': [r.dlmc for r in results],
  163. 'shape_area': [float(r.shape_area) if r.shape_area else None for r in results]
  164. }
  165. df = pd.DataFrame(data)
  166. # 缓存结果
  167. self._set_cache(cache_key, df)
  168. self.logger.info(f"高性能查询获取 {len(df)} 个 '{land_type}' 中心点")
  169. return df
  170. except Exception as e:
  171. self.logger.error(f"高性能获取土地中心点数据失败: {str(e)}")
  172. return None
  173. def get_land_centers_for_processing(self, land_type: str) -> Optional[pd.DataFrame]:
  174. """
  175. 获取用于处理的土地中心点数据(使用优化版本)
  176. @param land_type: 土地利用类型
  177. @returns: 包含中心点坐标的DataFrame
  178. """
  179. # 使用优化版本的方法
  180. return self.get_land_centers_optimized(land_type)
  181. def get_available_land_types(self) -> List[str]:
  182. """
  183. 获取数据库中可用的土地利用类型(带缓存)
  184. @returns: 土地利用类型列表
  185. """
  186. cache_key = "available_land_types"
  187. # 尝试从缓存获取
  188. cached_data = self._get_from_cache(cache_key)
  189. if cached_data is not None:
  190. return cached_data
  191. try:
  192. with SessionLocal() as db:
  193. # 查询所有不同的土地利用类型
  194. query = db.query(FourCountyLandUse.dlmc).distinct()
  195. results = query.all()
  196. land_types = [result.dlmc for result in results if result.dlmc]
  197. land_types.sort() # 排序便于查看
  198. # 缓存结果
  199. self._set_cache(cache_key, land_types)
  200. self.logger.info(f"可用的土地利用类型: {land_types}")
  201. return land_types
  202. except Exception as e:
  203. self.logger.error(f"获取土地利用类型失败: {str(e)}")
  204. return []
  205. def get_land_statistics_summary(self) -> Dict[str, Any]:
  206. """
  207. 获取土地利用数据统计摘要(带缓存)
  208. @returns: 统计摘要信息
  209. """
  210. cache_key = "land_statistics_summary"
  211. # 尝试从缓存获取
  212. cached_data = self._get_from_cache(cache_key)
  213. if cached_data is not None:
  214. return cached_data
  215. try:
  216. with SessionLocal() as db:
  217. # 统计各类型土地的数量
  218. sql = text("""
  219. SELECT
  220. dlmc,
  221. COUNT(*) as count,
  222. SUM(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE 0 END) as total_area,
  223. AVG(CASE WHEN tbmj IS NOT NULL THEN tbmj ELSE NULL END) as avg_area
  224. FROM four_county_land
  225. WHERE dlmc IS NOT NULL
  226. GROUP BY dlmc
  227. ORDER BY count DESC
  228. """)
  229. results = db.execute(sql).fetchall()
  230. summary = {
  231. 'total_records': 0,
  232. 'land_types': [],
  233. 'statistics_by_type': {}
  234. }
  235. for result in results:
  236. land_type = result.dlmc
  237. count = result.count
  238. total_area = float(result.total_area) if result.total_area else 0
  239. avg_area = float(result.avg_area) if result.avg_area else 0
  240. summary['total_records'] += count
  241. summary['land_types'].append(land_type)
  242. summary['statistics_by_type'][land_type] = {
  243. 'count': count,
  244. 'total_area': total_area,
  245. 'average_area': avg_area
  246. }
  247. # 缓存结果
  248. self._set_cache(cache_key, summary)
  249. self.logger.info(f"统计摘要: 总记录数 {summary['total_records']}, 土地类型数 {len(summary['land_types'])}")
  250. return summary
  251. except Exception as e:
  252. self.logger.error(f"获取统计摘要失败: {str(e)}")
  253. return {
  254. 'total_records': 0,
  255. 'land_types': [],
  256. 'statistics_by_type': {}
  257. }
  258. def clear_cache(self):
  259. """清空缓存"""
  260. self._cache.clear()
  261. self.logger.info("缓存已清空")
  262. def get_cache_info(self) -> Dict[str, Any]:
  263. """获取缓存信息"""
  264. cache_info = {
  265. 'cache_size': len(self._cache),
  266. 'cache_keys': list(self._cache.keys()),
  267. 'cache_timeout_minutes': self._cache_timeout.total_seconds() / 60
  268. }
  269. return cache_info
  270. # 全局服务实例
  271. land_use_service = LandUseService()