admin_boundary_service.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. from sqlalchemy.orm import Session
  2. from sqlalchemy.sql import text
  3. import json
  4. import geopandas as gpd
  5. from shapely.geometry import shape
  6. from typing import Optional
  7. def get_boundary_geojson_by_name(db: Session, name: str, level: str = "auto") -> dict:
  8. """根据名称获取边界GeoJSON Feature
  9. Args:
  10. db (Session): 数据库会话
  11. name (str): 名称(县/市/省)
  12. level (str): 层级,可选 "county"|"city"|"province"|"auto"
  13. Returns:
  14. dict: GeoJSON Feature 对象
  15. """
  16. # county 精确匹配
  17. if level in ("county", "auto"):
  18. r = db.execute(text(
  19. """
  20. SELECT ST_AsGeoJSON(geometry) AS g, name, city_name, province_name
  21. FROM counties WHERE name=:n LIMIT 1
  22. """
  23. ), {"n": name}).fetchone()
  24. if r and r.g:
  25. return {
  26. "type": "Feature",
  27. "properties": {
  28. "level": "county",
  29. "name": r.name,
  30. "city": r.city_name,
  31. "province": r.province_name,
  32. },
  33. "geometry": json.loads(r.g)
  34. }
  35. # city 直接查询或从counties聚合
  36. if level in ("city", "auto"):
  37. # 优先从cities表直接查询
  38. r = db.execute(text(
  39. """
  40. SELECT ST_AsGeoJSON(geometry) AS g, name, province_name
  41. FROM cities WHERE name=:n LIMIT 1
  42. """
  43. ), {"n": name}).fetchone()
  44. if r and r.g:
  45. return {
  46. "type": "Feature",
  47. "properties": {
  48. "level": "city",
  49. "name": r.name,
  50. "province": r.province_name
  51. },
  52. "geometry": json.loads(r.g)
  53. }
  54. # 如果cities表没有,从counties表聚合
  55. r = db.execute(text(
  56. """
  57. SELECT ST_AsGeoJSON(ST_UnaryUnion(ST_Union(geometry))) AS g,
  58. MIN(province_name) AS province_name
  59. FROM counties WHERE city_name=:n
  60. """
  61. ), {"n": name}).fetchone()
  62. if r and r.g:
  63. return {
  64. "type": "Feature",
  65. "properties": {
  66. "level": "city",
  67. "name": name,
  68. "province": r.province_name
  69. },
  70. "geometry": json.loads(r.g)
  71. }
  72. # province 直接查询或从counties聚合
  73. if level in ("province", "auto"):
  74. # 优先从provinces表直接查询
  75. r = db.execute(text(
  76. """
  77. SELECT ST_AsGeoJSON(geometry) AS g, name
  78. FROM provinces WHERE name=:n LIMIT 1
  79. """
  80. ), {"n": name}).fetchone()
  81. if r and r.g:
  82. return {
  83. "type": "Feature",
  84. "properties": {
  85. "level": "province",
  86. "name": r.name
  87. },
  88. "geometry": json.loads(r.g)
  89. }
  90. # 如果provinces表没有,从counties表聚合
  91. r = db.execute(text(
  92. """
  93. SELECT ST_AsGeoJSON(ST_UnaryUnion(ST_Union(geometry))) AS g
  94. FROM counties WHERE province_name=:n
  95. """
  96. ), {"n": name}).fetchone()
  97. if r and r.g:
  98. return {
  99. "type": "Feature",
  100. "properties": {
  101. "level": "province",
  102. "name": name
  103. },
  104. "geometry": json.loads(r.g)
  105. }
  106. raise ValueError(f"未找到名称: {name}")
  107. def get_boundary_gdf_by_name(db: Session, name: str, level: str = "auto") -> Optional[gpd.GeoDataFrame]:
  108. """根据名称获取边界GeoDataFrame,优化版本避免创建临时文件
  109. 这是一个通用的边界数据获取函数,直接返回GeoDataFrame而不需要创建临时Shapefile文件。
  110. 建议在需要边界数据进行空间操作的场景中使用此函数。
  111. Args:
  112. db (Session): 数据库会话
  113. name (str): 名称(县/市/省)
  114. level (str): 层级,可选 "county"|"city"|"province"|"auto"
  115. Returns:
  116. Optional[gpd.GeoDataFrame]: 边界GeoDataFrame,如果未找到则返回None
  117. Example:
  118. ```python
  119. with SessionLocal() as db:
  120. boundary_gdf = get_boundary_gdf_by_name(db, "乐昌市", "county")
  121. if boundary_gdf is not None:
  122. # 直接使用GeoDataFrame进行空间操作
  123. boundary_union = boundary_gdf.unary_union
  124. ```
  125. """
  126. try:
  127. boundary_geojson = get_boundary_geojson_by_name(db, name, level)
  128. if boundary_geojson:
  129. geometry_obj = shape(boundary_geojson["geometry"])
  130. gdf = gpd.GeoDataFrame([boundary_geojson["properties"]],
  131. geometry=[geometry_obj],
  132. crs="EPSG:4326")
  133. return gdf
  134. except ValueError:
  135. # 未找到对应边界数据
  136. pass
  137. except Exception:
  138. # 其他错误也返回None,让调用方处理
  139. pass
  140. return None