point_unit_grouping_service.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import psycopg2
  2. from flask import Flask, jsonify
  3. from shapely import wkb
  4. from collections import Counter
  5. from app.database import engine
  6. from sqlalchemy.orm import Session
  7. from sqlalchemy import text
  8. app = Flask(__name__)
  9. # 定义 h_xtfx 值的映射关系(数值用于插值计算)
  10. h_xtfx_mapping = {
  11. "优先保护类": 1,
  12. "安全利用类": 2,
  13. "严格管控类": 3
  14. }
  15. reverse_h_xtfx_mapping = {v: k for k, v in h_xtfx_mapping.items()}
  16. def fetch_unit_and_point_data(db: Session):
  17. """获取单元和点位数据(使用 SQLAlchemy ORM Session)"""
  18. try:
  19. # 查询单元信息(GID 和 几何图形)
  20. unit_query = text("SELECT gid, geom FROM unit_ceil")
  21. unit_result = db.execute(unit_query).fetchall()
  22. # 查询点位信息(ID、几何图形、h_xtfx 值)
  23. point_query = text("SELECT id, geom, h_xtfx FROM fifty_thousand_survey_data")
  24. point_result = db.execute(point_query).fetchall()
  25. return unit_result, point_result
  26. except Exception as error:
  27. print("数据查询失败:", error)
  28. return [], []
  29. def idw_interpolation(points, target_point):
  30. """反距离加权插值函数"""
  31. total_weight = 0
  32. weighted_sum = 0
  33. power = 2 # 距离权重的幂
  34. for point, value in points:
  35. distance = point.distance(target_point)
  36. if distance == 0:
  37. return value # 距离为0时直接返回该点值
  38. weight = 1 / (distance ** power)
  39. total_weight += weight
  40. weighted_sum += weight * value
  41. return weighted_sum / total_weight if total_weight != 0 else None
  42. def check_h_xtfx_values(unit_rows, point_rows):
  43. """核心逻辑:判断单元的 h_xtfx 值"""
  44. unit_point_mapping = {}
  45. for unit_id, unit_geom_wkb in unit_rows:
  46. try:
  47. unit_geom = wkb.loads(unit_geom_wkb, hex=True)
  48. unit_points = []
  49. for point_id, point_geom_wkb, h_xtfx in point_rows:
  50. point_geom = wkb.loads(point_geom_wkb, hex=True)
  51. if unit_geom.contains(point_geom):
  52. unit_points.append((point_geom, h_xtfx)) # 存储几何坐标和 h_xtfx 值
  53. unit_point_mapping[unit_id] = unit_points
  54. except Exception as e:
  55. print(f"处理单元 {unit_id} 失败:", e)
  56. result = {}
  57. for unit_id, points in unit_point_mapping.items():
  58. if not points:
  59. result[unit_id] = None
  60. continue # 无点位,直接设为 None
  61. has_strict_control = any(h_xtfx == "严格管控类" for _, h_xtfx in points)
  62. h_xtfx_list = [h_xtfx for _, h_xtfx in points]
  63. if not has_strict_control:
  64. # 无严格管控类:先判断比例是否 ≥80%
  65. counter = Counter(h_xtfx_list)
  66. most_common, count = counter.most_common(1)[0]
  67. if count / len(points) >= 0.8:
  68. result[unit_id] = most_common
  69. continue # 比例达标,直接输出
  70. # 比例不达标:对优先保护类和安全利用类进行插值
  71. valid_points = [(geom, h_xtfx_mapping[h_xtfx])
  72. for geom, h_xtfx in points
  73. if h_xtfx in ["优先保护类", "安全利用类"]]
  74. if len(valid_points) < 2:
  75. # 有效点位不足,取最常见值(避免插值错误)
  76. result[unit_id] = most_common
  77. else:
  78. unit_center = unit_geom.centroid
  79. interpolated = idw_interpolation(valid_points, unit_center)
  80. result[unit_id] = (
  81. "优先保护类" if interpolated <= 1.5
  82. else "安全利用类" if interpolated <= 2.5
  83. else "严格管控类" # 理论上不会出现,因无严格管控类点位
  84. )
  85. else:
  86. # 存在严格管控类:对所有点位(含严格管控类)进行插值
  87. point_coords = [(geom, h_xtfx_mapping[h_xtfx]) for geom, h_xtfx in points]
  88. unit_center = unit_geom.centroid
  89. interpolated = idw_interpolation(point_coords, unit_center)
  90. result[unit_id] = (
  91. "优先保护类" if interpolated <= 1.5
  92. else "安全利用类" if interpolated <= 2.5
  93. else "严格管控类"
  94. )
  95. return result