from sqlalchemy.orm import Session from sqlalchemy.sql import text import json import geopandas as gpd from shapely.geometry import shape from typing import Optional def get_boundary_geojson_by_name(db: Session, name: str, level: str = "auto") -> dict: """根据名称获取边界GeoJSON Feature Args: db (Session): 数据库会话 name (str): 名称(县/市/省) level (str): 层级,可选 "county"|"city"|"province"|"auto" Returns: dict: GeoJSON Feature 对象 """ # county 精确匹配 if level in ("county", "auto"): r = db.execute(text( """ SELECT ST_AsGeoJSON(geometry) AS g, name, city_name, province_name FROM counties WHERE name=:n LIMIT 1 """ ), {"n": name}).fetchone() if r and r.g: return { "type": "Feature", "properties": { "level": "county", "name": r.name, "city": r.city_name, "province": r.province_name, }, "geometry": json.loads(r.g) } # city 直接查询或从counties聚合 if level in ("city", "auto"): # 优先从cities表直接查询 r = db.execute(text( """ SELECT ST_AsGeoJSON(geometry) AS g, name, province_name FROM cities WHERE name=:n LIMIT 1 """ ), {"n": name}).fetchone() if r and r.g: return { "type": "Feature", "properties": { "level": "city", "name": r.name, "province": r.province_name }, "geometry": json.loads(r.g) } # 如果cities表没有,从counties表聚合 r = db.execute(text( """ SELECT ST_AsGeoJSON(ST_UnaryUnion(ST_Union(geometry))) AS g, MIN(province_name) AS province_name FROM counties WHERE city_name=:n """ ), {"n": name}).fetchone() if r and r.g: return { "type": "Feature", "properties": { "level": "city", "name": name, "province": r.province_name }, "geometry": json.loads(r.g) } # province 直接查询或从counties聚合 if level in ("province", "auto"): # 优先从provinces表直接查询 r = db.execute(text( """ SELECT ST_AsGeoJSON(geometry) AS g, name FROM provinces WHERE name=:n LIMIT 1 """ ), {"n": name}).fetchone() if r and r.g: return { "type": "Feature", "properties": { "level": "province", "name": r.name }, "geometry": json.loads(r.g) } # 如果provinces表没有,从counties表聚合 r = db.execute(text( """ SELECT ST_AsGeoJSON(ST_UnaryUnion(ST_Union(geometry))) AS g FROM counties WHERE province_name=:n """ ), {"n": name}).fetchone() if r and r.g: return { "type": "Feature", "properties": { "level": "province", "name": name }, "geometry": json.loads(r.g) } raise ValueError(f"未找到名称: {name}") def get_boundary_gdf_by_name(db: Session, name: str, level: str = "auto") -> Optional[gpd.GeoDataFrame]: """根据名称获取边界GeoDataFrame,优化版本避免创建临时文件 这是一个通用的边界数据获取函数,直接返回GeoDataFrame而不需要创建临时Shapefile文件。 建议在需要边界数据进行空间操作的场景中使用此函数。 Args: db (Session): 数据库会话 name (str): 名称(县/市/省) level (str): 层级,可选 "county"|"city"|"province"|"auto" Returns: Optional[gpd.GeoDataFrame]: 边界GeoDataFrame,如果未找到则返回None Example: ```python with SessionLocal() as db: boundary_gdf = get_boundary_gdf_by_name(db, "乐昌市", "county") if boundary_gdf is not None: # 直接使用GeoDataFrame进行空间操作 boundary_union = boundary_gdf.unary_union ``` """ try: boundary_geojson = get_boundary_geojson_by_name(db, name, level) if boundary_geojson: geometry_obj = shape(boundary_geojson["geometry"]) gdf = gpd.GeoDataFrame([boundary_geojson["properties"]], geometry=[geometry_obj], crs="EPSG:4326") return gdf except ValueError: # 未找到对应边界数据 pass except Exception: # 其他错误也返回None,让调用方处理 pass return None