from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from typing import Dict, Any, List import logging from ..database import get_db from ..services.unit_grouping_service import UnitGroupingService logger = logging.getLogger(__name__) router = APIRouter() @router.get("/h_xtfx", summary="获取单元h_xtfx分类结果", description="基于点位数据计算各单元的土壤环境质量类别(h_xtfx),使用反距离加权插值算法") async def get_unit_h_xtfx_result(db: Session = Depends(get_db)) -> Dict[str, Any]: """ 获取单元h_xtfx分类结果 该接口实现以下功能: 1. 获取单元几何数据和点位数据 2. 判断每个单元内包含的点位 3. 根据点位的h_xtfx值计算单元的h_xtfx值 4. 使用反距离加权插值算法进行计算 算法逻辑: - 如果单元内无严格管控类点位: - 优先判断某类别占比是否≥80%,是则直接采用该类别 - 否则对优先保护类和安全利用类点位进行插值 - 如果单元内有严格管控类点位: - 对所有点位(包括严格管控类)进行插值 Returns: Dict[str, Any]: 包含以下字段的响应: - success: 是否成功 - data: 单元ID到h_xtfx值的映射 - statistics: 统计信息 - error: 错误信息(如果失败) Raises: HTTPException: 当数据库查询失败或处理异常时抛出 """ try: logger.info("开始获取单元h_xtfx分类结果") # 创建服务实例 service = UnitGroupingService(db) # 获取结果 result = service.get_unit_h_xtfx_result() if not result["success"]: logger.error(f"服务层处理失败: {result.get('error', 'Unknown error')}") raise HTTPException( status_code=500, detail=f"计算单元h_xtfx值失败: {result.get('error', 'Unknown error')}" ) logger.info(f"成功获取 {result['statistics']['total_units']} 个单元的h_xtfx结果") return result except HTTPException: raise except Exception as e: logger.error(f"获取单元h_xtfx结果时发生异常: {str(e)}") raise HTTPException( status_code=500, detail=f"内部服务器错误: {str(e)}" ) @router.get("/statistics", summary="获取单元分组统计信息", description="获取单元分组的统计摘要信息") async def get_unit_grouping_statistics(db: Session = Depends(get_db)) -> Dict[str, Any]: """ 获取单元分组统计信息 提供单元分组的统计摘要,包括: - 总单元数 - 有数据的单元数 - 无数据的单元数 - 各类别的分布情况 Returns: Dict[str, Any]: 统计信息 """ try: logger.info("开始获取单元分组统计信息") # 创建服务实例 service = UnitGroupingService(db) # 获取完整结果以提取统计信息 result = service.get_unit_h_xtfx_result() if not result["success"]: raise HTTPException( status_code=500, detail=f"获取统计信息失败: {result.get('error', 'Unknown error')}" ) # 返回统计信息 return { "success": True, "statistics": result["statistics"] } except HTTPException: raise except Exception as e: logger.error(f"获取统计信息时发生异常: {str(e)}") raise HTTPException( status_code=500, detail=f"内部服务器错误: {str(e)}" ) @router.get("/unit/{unit_id}", summary="获取特定单元的h_xtfx值", description="获取指定单元ID的h_xtfx分类结果") async def get_unit_h_xtfx_by_id(unit_id: int, db: Session = Depends(get_db)) -> Dict[str, Any]: """ 获取特定单元的h_xtfx值 Args: unit_id: 单元ID Returns: Dict[str, Any]: 包含单元h_xtfx值的响应 """ try: logger.info(f"开始获取单元 {unit_id} 的h_xtfx值") # 创建服务实例 service = UnitGroupingService(db) # 获取所有结果 result = service.get_unit_h_xtfx_result() if not result["success"]: raise HTTPException( status_code=500, detail=f"获取单元数据失败: {result.get('error', 'Unknown error')}" ) # 检查指定单元是否存在 if unit_id not in result["data"]: raise HTTPException( status_code=404, detail=f"未找到单元 {unit_id}" ) return { "success": True, "unit_id": unit_id, "h_xtfx": result["data"][unit_id] } except HTTPException: raise except Exception as e: logger.error(f"获取单元 {unit_id} 的h_xtfx值时发生异常: {str(e)}") raise HTTPException( status_code=500, detail=f"内部服务器错误: {str(e)}" ) @router.get("/units/batch", summary="批量获取单元信息", description="使用ORM方式批量获取指定单元的基本信息") async def get_units_batch( unit_ids: List[int] = Query(..., description="单元ID列表"), db: Session = Depends(get_db) ) -> Dict[str, Any]: """ 批量获取单元信息 Args: unit_ids: 单元ID列表 Returns: Dict[str, Any]: 包含单元信息的响应 """ try: logger.info(f"开始批量获取 {len(unit_ids)} 个单元的信息") # 创建服务实例 service = UnitGroupingService(db) # 使用ORM方式批量获取单元信息 units = service.get_units_by_ids(unit_ids) # 转换为字典格式 unit_data = [] for unit in units: unit_data.append({ "gid": unit.gid, "BSM": unit.BSM, "PXZQMC": unit.PXZQMC, "CXZQMC": unit.CXZQMC, "SUM_NYDTBM": float(unit.SUM_NYDTBM) if unit.SUM_NYDTBM else None, "Shape_Area": float(unit.Shape_Area) if unit.Shape_Area else None }) return { "success": True, "total_requested": len(unit_ids), "total_found": len(units), "data": unit_data } except Exception as e: logger.error(f"批量获取单元信息时发生异常: {str(e)}") raise HTTPException( status_code=500, detail=f"内部服务器错误: {str(e)}" ) @router.get("/points/statistics", summary="获取点位统计信息", description="使用ORM方式获取点位按h_xtfx分类的统计信息") async def get_points_statistics(db: Session = Depends(get_db)) -> Dict[str, Any]: """ 获取点位统计信息 返回按h_xtfx分类的点位数量统计 Returns: Dict[str, Any]: 点位统计信息 """ try: logger.info("开始获取点位统计信息") # 创建服务实例 service = UnitGroupingService(db) # 使用ORM方式获取统计信息 point_stats = service.get_point_count_by_h_xtfx() # 计算总数 total_points = sum(point_stats.values()) # 计算百分比 percentage_stats = {} for category, count in point_stats.items(): percentage_stats[category] = { "count": count, "percentage": round(count / total_points * 100, 2) if total_points > 0 else 0 } return { "success": True, "total_points": total_points, "distribution": percentage_stats } except Exception as e: logger.error(f"获取点位统计信息时发生异常: {str(e)}") raise HTTPException( status_code=500, detail=f"内部服务器错误: {str(e)}" ) @router.get("/points/by-area", summary="按区域获取点位数据", description="使用ORM方式获取特定区域的点位数据") async def get_points_by_area( area_name: str = Query(None, description="区域名称(县名称)"), db: Session = Depends(get_db) ) -> Dict[str, Any]: """ 按区域获取点位数据 Args: area_name: 区域名称(县名称),如果为空则获取所有区域 Returns: Dict[str, Any]: 点位数据 """ try: logger.info(f"开始获取区域 '{area_name}' 的点位数据") # 创建服务实例 service = UnitGroupingService(db) # 使用ORM方式获取点位数据 points = service.get_points_in_area(area_name) # 转换为字典格式 point_data = [] for point in points: point_data.append({ "id": point.id, "dwmc": point.dwmc, "xmc": point.xmc, "zmc": point.zmc, "cmc": point.cmc, "h_xtfx": point.h_xtfx, "lat": float(point.lat) if point.lat else None, "lon": float(point.lon) if point.lon else None, "ph": float(point.ph) if point.ph else None }) # 统计该区域的h_xtfx分布 h_xtfx_stats = {} for point in points: if point.h_xtfx: h_xtfx_stats[point.h_xtfx] = h_xtfx_stats.get(point.h_xtfx, 0) + 1 return { "success": True, "area_name": area_name or "全部区域", "total_points": len(points), "h_xtfx_distribution": h_xtfx_stats, "data": point_data } except Exception as e: logger.error(f"获取区域点位数据时发生异常: {str(e)}") raise HTTPException( status_code=500, detail=f"内部服务器错误: {str(e)}" ) @router.get("/database/summary", summary="获取数据库摘要信息", description="获取数据库中单元和点位的摘要统计") async def get_database_summary(db: Session = Depends(get_db)) -> Dict[str, Any]: """ 获取数据库摘要信息 Returns: Dict[str, Any]: 数据库摘要信息 """ try: logger.info("开始获取数据库摘要信息") # 创建服务实例 service = UnitGroupingService(db) # 使用ORM方式获取各种统计信息 total_units = service.get_unit_count() point_stats = service.get_point_count_by_h_xtfx() total_points = sum(point_stats.values()) return { "success": True, "summary": { "total_units": total_units, "total_points": total_points, "points_with_h_xtfx": total_points, "h_xtfx_categories": len(point_stats), "point_distribution": point_stats } } except Exception as e: logger.error(f"获取数据库摘要信息时发生异常: {str(e)}") raise HTTPException( status_code=500, detail=f"内部服务器错误: {str(e)}" ) @router.get("/areas/statistics", summary="获取各区域统计信息", description="使用ORM聚合查询获取各区域的h_xtfx分布统计") async def get_areas_statistics(db: Session = Depends(get_db)) -> Dict[str, Any]: """ 获取各区域统计信息 使用ORM的高级聚合查询功能,按区域分组统计h_xtfx分布 Returns: Dict[str, Any]: 各区域的统计信息 """ try: logger.info("开始获取各区域统计信息") # 创建服务实例 service = UnitGroupingService(db) # 使用ORM方式获取区域统计信息 area_stats = service.get_area_statistics() # 计算总体统计 total_areas = len(area_stats) total_points_by_area = {} for area_name, stats in area_stats.items(): total_points_by_area[area_name] = sum(stats.values()) return { "success": True, "total_areas": total_areas, "area_statistics": area_stats, "area_totals": total_points_by_area } except Exception as e: logger.error(f"获取各区域统计信息时发生异常: {str(e)}") raise HTTPException( status_code=500, detail=f"内部服务器错误: {str(e)}" )