Переглянути джерело

添加农业投入Cd通量可视化接口,支持生成栅格地图和导出计算数据为CSV文件

drggboy 6 днів тому
батько
коміт
5627bdbc59

+ 148 - 0
app/api/agricultural_input.py

@@ -4,9 +4,11 @@
 """
 
 from fastapi import APIRouter, HTTPException, Query, Body
+from fastapi.responses import FileResponse
 from pydantic import BaseModel, Field
 from typing import Dict, Any, Optional
 import logging
+import os
 from ..services.agricultural_input_service import AgriculturalInputService
 
 router = APIRouter()
@@ -15,6 +17,18 @@ router = APIRouter()
 # 数据模型定义
 # =============================================================================
 
+class VisualizationResponse(BaseModel):
+    """
+    可视化结果响应模型
+    
+    @description: 绘图接口的响应格式
+    """
+    success: bool = Field(..., description="是否成功")
+    message: str = Field(..., description="响应消息")
+    data: Optional[Dict[str, Any]] = Field(None, description="可视化结果数据")
+    files: Optional[Dict[str, str]] = Field(None, description="生成的文件路径")
+
+
 class CdFluxCalculationRequest(BaseModel):
     """
     农业投入Cd通量计算请求模型
@@ -259,4 +273,138 @@ async def get_calculation_formula() -> Dict[str, Any]:
         raise HTTPException(
             status_code=500, 
             detail=f"获取失败: {str(e)}"
+        )
+
+
+# =============================================================================
+# 农业投入Cd通量可视化接口
+# =============================================================================
+
+@router.get("/visualize",
+           summary="生成农业投入Cd通量可视化图表",
+           description="计算农业投入Cd通量并生成栅格地图,参数固定使用韶关数据,area用于地图边界")
+async def visualize_agricultural_input(
+    area: str = Query(..., description="地区名称,如:乐昌市(用于地图边界,参数固定使用韶关)"),
+    level: str = Query(..., description="行政层级,必须为: county | city | province"),
+    colormap: str = Query("green_yellow_red_purple", description="色彩方案"),
+    resolution_factor: float = Query(4.0, description="分辨率因子(默认4.0,更快)"),
+    enable_interpolation: bool = Query(False, description="是否启用空间插值(默认关闭以提升性能)"),
+    cleanup_intermediate: bool = Query(True, description="是否清理中间文件(默认是)")
+):
+    """
+    生成农业投入Cd通量可视化图表
+    
+    @param area: 地区名称(用于地图边界,参数数据固定使用韶关)
+    @param level: 行政层级
+    @returns: 栅格地图文件
+    
+    功能包括:
+    1. 计算农业投入Cd通量(参数固定使用韶关)
+    2. 生成指定地区边界的栅格地图
+    3. 直接返回图片文件
+    """
+    try:
+        service = AgriculturalInputService()
+
+        # 行政层级校验(不允许模糊)
+        if level not in ("county", "city", "province"):
+            raise HTTPException(status_code=400, detail="参数 level 必须为 'county' | 'city' | 'province'")
+        
+        # 计算农业投入Cd通量(使用韶关参数,area仅用于边界)
+        calc_result = service.calculate_cd_flux_for_visualization(area)
+        
+        if not calc_result["success"]:
+            raise HTTPException(
+                status_code=404, 
+                detail=calc_result["message"]
+            )
+        
+        # 获取包含坐标的结果数据
+        results_with_coords = service.get_coordinates_for_results(calc_result["data"], area)
+        
+        if not results_with_coords:
+            raise HTTPException(
+                status_code=404,
+                detail=f"未找到地区 '{area}' 的坐标数据,无法生成可视化"
+            )
+        
+        # 创建可视化
+        visualization_files = service.create_agricultural_input_visualization(
+            area=area,
+            level=level,
+            calculation_type="agricultural_input",
+            results_with_coords=results_with_coords,
+            colormap=colormap,
+            resolution_factor=resolution_factor,
+            enable_interpolation=enable_interpolation,
+            cleanup_intermediate=cleanup_intermediate
+        )
+        
+        # 检查地图文件是否生成成功
+        map_file = visualization_files.get("map")
+        if not map_file or not os.path.exists(map_file):
+            raise HTTPException(status_code=500, detail="地图文件生成失败")
+        
+        return FileResponse(
+            path=map_file,
+            filename=f"{area}_agricultural_input_cd_flux_map.jpg",
+            media_type="image/jpeg"
+        )
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"生成地区 '{area}' 的农业投入Cd通量可视化失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"可视化生成失败: {str(e)}"
+        )
+
+
+@router.get("/export-data",
+           summary="导出农业投入Cd通量计算数据",
+           description="导出指定地区的农业投入Cd通量计算结果为CSV文件")
+async def export_agricultural_input_data(
+    area: str = Query(..., description="地区名称,如:韶关")
+) -> Dict[str, Any]:
+    """
+    导出农业投入Cd通量计算数据
+    
+    @param area: 地区名称
+    @returns: 导出结果和文件路径
+    """
+    try:
+        service = AgriculturalInputService()
+        
+        # 计算农业投入Cd通量(使用韶关参数,area仅用于边界)
+        calc_result = service.calculate_cd_flux_for_visualization(area)
+        
+        if not calc_result["success"]:
+            raise HTTPException(
+                status_code=404, 
+                detail=calc_result["message"]
+            )
+        
+        # 导出数据
+        csv_path = service.export_results_to_csv(calc_result["data"])
+        
+        return {
+            "success": True,
+            "message": f"地区 '{area}' 的农业投入Cd通量数据导出成功",
+            "data": {
+                "area": area,
+                "calculation_type": "agricultural_input",
+                "exported_file": csv_path,
+                "total_cd_flux": calc_result["data"]["total_cd_flux"],
+                "unit": calc_result["data"]["unit"]
+            }
+        }
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"导出地区 '{area}' 的农业投入Cd通量数据失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"数据导出失败: {str(e)}"
         ) 

+ 433 - 3
app/services/agricultural_input_service.py

@@ -6,11 +6,23 @@
 """
 
 import logging
+import math
+import os
+import pandas as pd
+import numpy as np
+from datetime import datetime
 from typing import Dict, Any, List, Optional
-from sqlalchemy.orm import sessionmaker
-from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker, Session
+from sqlalchemy import create_engine, and_
 from ..database import SessionLocal, engine
 from ..models.parameters import Parameters
+from ..models.farmland import FarmlandData
+from ..utils.mapping_utils import MappingUtils
+from .admin_boundary_service import get_boundary_geojson_by_name, get_boundary_gdf_by_name
+import geopandas as gpd
+from shapely.geometry import shape, Point
+import tempfile
+import json
 
 class AgriculturalInputService:
     """
@@ -332,4 +344,422 @@ class AgriculturalInputService:
             
         except Exception as e:
             self.logger.error(f"验证参数时发生错误: {str(e)}")
-            return False 
+            return False
+    
+    def calculate_cd_flux_for_visualization(self, area: str) -> Dict[str, Any]:
+        """
+        专门用于可视化的农业投入Cd通量计算(参数固定使用韶关)
+        
+        @param area: 地区名称(仅用于地图边界,参数固定使用"韶关")
+        @returns: 计算结果
+        """
+        try:
+            with SessionLocal() as db:
+                # 参数固定使用"韶关",area参数仅用于地图边界
+                parameter = db.query(Parameters).filter(Parameters.area == "韶关").first()
+                
+                if not parameter:
+                    return {
+                        "success": False,
+                        "message": f"未找到韶关地区的参数数据",
+                        "data": None
+                    }
+                
+                # 计算农业投入输入Cd通量
+                # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
+                cd_flux = (
+                    parameter.f3 * parameter.nf +  # 氮肥
+                    parameter.f4 * parameter.pf +  # 磷肥
+                    parameter.f5 * parameter.kf +  # 钾肥
+                    parameter.f6 * parameter.cf +  # 复合肥
+                    parameter.f7 * parameter.of +  # 有机肥
+                    parameter.f8 * parameter.p +   # 农药
+                    parameter.f9 * parameter.ff +  # 农家肥
+                    parameter.f10 * parameter.af   # 农膜
+                )
+                
+                # 计算各项明细
+                details = {
+                    "nitrogen_fertilizer": parameter.f3 * parameter.nf,      # 氮肥贡献
+                    "phosphorus_fertilizer": parameter.f4 * parameter.pf,    # 磷肥贡献
+                    "potassium_fertilizer": parameter.f5 * parameter.kf,     # 钾肥贡献
+                    "compound_fertilizer": parameter.f6 * parameter.cf,      # 复合肥贡献
+                    "organic_fertilizer": parameter.f7 * parameter.of,       # 有机肥贡献
+                    "pesticide": parameter.f8 * parameter.p,                 # 农药贡献
+                    "farmyard_manure": parameter.f9 * parameter.ff,          # 农家肥贡献
+                    "agricultural_film": parameter.f10 * parameter.af        # 农膜贡献
+                }
+                
+                return {
+                    "success": True,
+                    "message": f"成功计算地区 '{area}' 的农业投入输入Cd通量(使用韶关参数)",
+                    "data": {
+                        "area": area,
+                        "total_cd_flux": round(cd_flux, 6),
+                        "unit": "g/ha/a",
+                        "details": {key: round(value, 6) for key, value in details.items()},
+                        "parameters_used": {
+                            "f3_nitrogen_cd_content": parameter.f3,
+                            "f4_phosphorus_cd_content": parameter.f4,
+                            "f5_potassium_cd_content": parameter.f5,
+                            "f6_compound_cd_content": parameter.f6,
+                            "f7_organic_cd_content": parameter.f7,
+                            "f8_pesticide_cd_content": parameter.f8,
+                            "f9_farmyard_cd_content": parameter.f9,
+                            "f10_film_cd_content": parameter.f10,
+                            "nf_nitrogen_usage": parameter.nf,
+                            "pf_phosphorus_usage": parameter.pf,
+                            "kf_potassium_usage": parameter.kf,
+                            "cf_compound_usage": parameter.cf,
+                            "of_organic_usage": parameter.of,
+                            "p_pesticide_usage": parameter.p,
+                            "ff_farmyard_usage": parameter.ff,
+                            "af_film_usage": parameter.af
+                        }
+                    }
+                }
+                
+        except Exception as e:
+            self.logger.error(f"计算地区 '{area}' 的Cd通量时发生错误: {str(e)}")
+            return {
+                "success": False,
+                "message": f"计算失败: {str(e)}",
+                "data": None
+            }
+    
+    def get_coordinates_for_results(self, results_data: Dict[str, Any], area: str) -> List[Dict[str, Any]]:
+        """
+        获取农业投入计算结果对应的坐标信息
+        
+        @param results_data: 计算结果数据
+        @param area: 地区名称
+        @returns: 包含坐标的结果列表
+        """
+        try:
+            # 农业投入计算只有一个结果值,需要与所有农田数据关联
+            total_cd_flux = results_data.get("total_cd_flux", 0)
+            
+            with SessionLocal() as db:
+                # 查询所有农田数据获取坐标
+                farmland_data = db.query(FarmlandData).all()
+                
+                if not farmland_data:
+                    self.logger.warning("未找到农田坐标数据")
+                    return []
+                
+                coordinates_results = []
+                for farmland in farmland_data:
+                    coord_result = {
+                        "farmland_id": farmland.farmland_id,
+                        "sample_id": farmland.sample_id,
+                        "longitude": farmland.lon,
+                        "latitude": farmland.lan,
+                        "flux_value": total_cd_flux,  # 所有点使用相同的农业投入Cd通量值
+                        "area": area
+                    }
+                    coordinates_results.append(coord_result)
+                
+                self.logger.info(f"✓ 成功获取 {len(coordinates_results)} 个农田样点的坐标信息,农业投入Cd通量: {total_cd_flux}")
+                return coordinates_results
+                
+        except Exception as e:
+            self.logger.error(f"获取坐标信息失败: {str(e)}")
+            raise
+    
+    def export_results_to_csv(self, results_data: Dict[str, Any], output_dir: str = "app/static/agricultural_input") -> str:
+        """
+        将农业投入计算结果导出为CSV文件
+        
+        @param results_data: 计算结果数据
+        @param output_dir: 输出目录
+        @returns: CSV文件路径
+        """
+        try:
+            # 确保输出目录存在
+            os.makedirs(output_dir, exist_ok=True)
+            
+            # 生成时间戳
+            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+            
+            # 生成文件名
+            area = results_data.get("area", "unknown")
+            filename = f"agricultural_input_{area}_{timestamp}.csv"
+            csv_path = os.path.join(output_dir, filename)
+            
+            # 准备导出数据
+            export_data = {
+                "area": results_data.get("area"),
+                "total_cd_flux": results_data.get("total_cd_flux"),
+                "unit": results_data.get("unit"),
+                "nitrogen_fertilizer": results_data["details"]["nitrogen_fertilizer"],
+                "phosphorus_fertilizer": results_data["details"]["phosphorus_fertilizer"],
+                "potassium_fertilizer": results_data["details"]["potassium_fertilizer"],
+                "compound_fertilizer": results_data["details"]["compound_fertilizer"],
+                "organic_fertilizer": results_data["details"]["organic_fertilizer"],
+                "pesticide": results_data["details"]["pesticide"],
+                "farmyard_manure": results_data["details"]["farmyard_manure"],
+                "agricultural_film": results_data["details"]["agricultural_film"]
+            }
+            
+            # 转换为DataFrame
+            df = pd.DataFrame([export_data])
+            
+            # 保存CSV文件
+            df.to_csv(csv_path, index=False, encoding='utf-8-sig')
+            
+            self.logger.info(f"✓ 成功导出农业投入结果到: {csv_path}")
+            return csv_path
+            
+        except Exception as e:
+            self.logger.error(f"导出CSV文件失败: {str(e)}")
+            raise
+    
+    def create_agricultural_input_visualization(self, area: str, calculation_type: str,
+                                              results_with_coords: List[Dict[str, Any]],
+                                              level: str = None,
+                                              output_dir: str = "app/static/agricultural_input",
+                                              template_raster: str = "app/static/cd_flux/meanTemp.tif",
+                                              boundary_shp: str = None,
+                                              colormap: str = "green_yellow_red_purple",
+                                              resolution_factor: float = 4.0,
+                                              enable_interpolation: bool = False,
+                                              cleanup_intermediate: bool = True) -> Dict[str, str]:
+        """
+        创建农业投入Cd通量可视化图表
+        
+        @param area: 地区名称
+        @param calculation_type: 计算类型(agricultural_input)
+        @param results_with_coords: 包含坐标的结果数据
+        @param level: 行政层级
+        @param output_dir: 输出目录
+        @param template_raster: 模板栅格文件路径
+        @param boundary_shp: 边界shapefile路径
+        @param colormap: 色彩方案
+        @param resolution_factor: 分辨率因子
+        @param enable_interpolation: 是否启用空间插值
+        @param cleanup_intermediate: 是否清理中间文件
+        @returns: 生成的图片文件路径字典
+        """
+        try:
+            if not results_with_coords:
+                raise ValueError("没有包含坐标的结果数据")
+            
+            # 确保输出目录存在
+            os.makedirs(output_dir, exist_ok=True)
+            generated_files: Dict[str, str] = {}
+            
+            # 生成时间戳
+            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+            
+            # 创建CSV文件用于绘图
+            csv_filename = f"agricultural_input_{area}_temp_{timestamp}.csv"
+            csv_path = os.path.join(output_dir, csv_filename)
+            
+            # 准备绘图数据
+            plot_data = []
+            for result in results_with_coords:
+                plot_data.append({
+                    "longitude": result["longitude"],
+                    "latitude": result["latitude"],
+                    "flux_value": result["flux_value"]
+                })
+            
+            # 保存为CSV
+            df = pd.DataFrame(plot_data)
+            df.to_csv(csv_path, index=False, encoding='utf-8-sig')
+            
+            # 初始化绘图工具
+            mapper = MappingUtils()
+            
+            # 生成输出文件路径
+            map_output = os.path.join(output_dir, f"agricultural_input_{area}_map_{timestamp}")
+            histogram_output = os.path.join(output_dir, f"agricultural_input_{area}_histogram_{timestamp}")
+            
+            # 检查模板文件是否存在
+            if not os.path.exists(template_raster):
+                self.logger.warning(f"模板栅格文件不存在: {template_raster}")
+                template_raster = None
+            
+            # 动态获取边界数据(严格使用指定层级)
+            if not level:
+                raise ValueError("必须提供行政层级 level:county | city | province")
+            
+            # 直接从数据库获取边界GeoDataFrame
+            boundary_gdf = self._get_boundary_gdf_from_database(area, level)
+            boundary_shp = None  # 不再需要临时边界文件
+            
+            if boundary_gdf is None:
+                self.logger.warning(f"未找到地区 '{area}' 的边界数据,将不使用边界裁剪")
+            else:
+                # 在绘图前进行样点边界包含性统计
+                try:
+                    if boundary_gdf is not None and len(boundary_gdf) > 0:
+                        boundary_union = boundary_gdf.unary_union
+                        total_points = len(results_with_coords)
+                        inside_count = 0
+                        outside_points: List[Dict[str, Any]] = []
+                        for r in results_with_coords:
+                            pt = Point(float(r["longitude"]), float(r["latitude"]))
+                            if boundary_union.contains(pt) or boundary_union.touches(pt):
+                                inside_count += 1
+                            else:
+                                outside_points.append({
+                                    "farmland_id": r.get("farmland_id"),
+                                    "sample_id": r.get("sample_id"),
+                                    "longitude": r.get("longitude"),
+                                    "latitude": r.get("latitude"),
+                                    "flux_value": r.get("flux_value")
+                                })
+
+                        outside_count = total_points - inside_count
+                        inside_pct = (inside_count / total_points * 100.0) if total_points > 0 else 0.0
+
+                        self.logger.info(
+                            f"样点边界检查 - 总数: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), 边界外: {outside_count}")
+                        if outside_count > 0:
+                            sample_preview = outside_points[:10]
+                            self.logger.warning(
+                                f"存在 {outside_count} 个样点位于边界之外,绘图时将被掩膜隐藏。示例(最多10条): {sample_preview}")
+
+                        # 在日志中打印边界检查统计结果
+                        self.logger.info(
+                            f"边界检查统计 - 地区: {area}, 层级: {level}, 计算类型: {calculation_type}, "
+                            f"总样点: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), "
+                            f"边界外: {outside_count}"
+                        )
+                        if outside_count > 0 and len(outside_points) > 0:
+                            sample_preview = outside_points[:5]  # 只显示前5个样本
+                            self.logger.info(f"边界外样点示例(前5个): {sample_preview}")
+                    else:
+                        generated_files = {}
+                except Exception as check_err:
+                    self.logger.warning(f"样点边界包含性检查失败: {str(check_err)}")
+            
+            # 创建shapefile
+            shapefile_path = csv_path.replace('.csv', '_points.shp')
+            mapper.csv_to_shapefile(csv_path, shapefile_path, 
+                                  lon_col='longitude', lat_col='latitude', value_col='flux_value')
+            
+            # 合并已生成文件映射
+            generated_files.update({"csv": csv_path, "shapefile": shapefile_path})
+            
+            # 如果有模板栅格文件,创建栅格地图
+            if template_raster:
+                try:
+                    # 创建栅格
+                    raster_path = csv_path.replace('.csv', '_raster.tif')
+                    raster_path, stats = mapper.vector_to_raster(
+                        shapefile_path, template_raster, raster_path, 'flux_value',
+                        resolution_factor=resolution_factor, boundary_shp=boundary_shp, boundary_gdf=boundary_gdf,
+                        interpolation_method='nearest', enable_interpolation=enable_interpolation
+                    )
+                    generated_files["raster"] = raster_path
+                    
+                    # 创建栅格地图 - 使用英文标题避免中文乱码
+                    map_title = "Agricultural Input Cd Flux"
+                    
+                    map_file = mapper.create_raster_map(
+                        boundary_shp if boundary_shp else None,
+                        raster_path,
+                        map_output,
+                        colormap=colormap,
+                        title=map_title,
+                        output_size=12,
+                        dpi=300,
+                        resolution_factor=4.0,
+                        enable_interpolation=False,
+                        interpolation_method='nearest',
+                        boundary_gdf=boundary_gdf
+                    )
+                    generated_files["map"] = map_file
+                    
+                    # 创建直方图 - 使用英文标题避免中文乱码
+                    histogram_title = "Agricultural Input Cd Flux Distribution"
+                    
+                    histogram_file = mapper.create_histogram(
+                        raster_path,
+                        f"{histogram_output}.jpg",
+                        title=histogram_title,
+                        xlabel='Cd Flux (g/ha/a)',
+                        ylabel='Frequency Density'
+                    )
+                    generated_files["histogram"] = histogram_file
+                    
+                except Exception as viz_error:
+                    self.logger.warning(f"栅格可视化创建失败: {str(viz_error)}")
+                    # 即使栅格可视化失败,也返回已生成的文件
+            
+            # 清理中间文件(默认开启,仅保留最终可视化)
+            if cleanup_intermediate:
+                try:
+                    self._cleanup_intermediate_files(generated_files, None)
+                except Exception as cleanup_err:
+                    self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
+
+            self.logger.info(f"✓ 成功创建农业投入Cd通量可视化,生成文件: {list(generated_files.keys())}")
+            return generated_files
+            
+        except Exception as e:
+            self.logger.error(f"创建农业投入可视化失败: {str(e)}")
+            raise
+
+    def _cleanup_intermediate_files(self, generated_files: Dict[str, str], boundary_shp: Optional[str]) -> None:
+        """
+        清理中间文件:CSV、Shapefile 及其配套文件、栅格TIFF;若边界为临时目录,则一并删除
+        """
+        import shutil
+        import tempfile
+
+        def _safe_remove(path: str) -> None:
+            try:
+                if path and os.path.exists(path) and os.path.isfile(path):
+                    os.remove(path)
+            except Exception:
+                pass
+
+        # 删除 CSV
+        _safe_remove(generated_files.get("csv"))
+
+        # 删除栅格
+        _safe_remove(generated_files.get("raster"))
+
+        # 删除 Shapefile 全家桶
+        shp_path = generated_files.get("shapefile")
+        if shp_path:
+            base, _ = os.path.splitext(shp_path)
+            for ext in (".shp", ".shx", ".dbf", ".prj", ".cpg"):
+                _safe_remove(base + ext)
+
+        # 如果边界文件来自系统临时目录,删除其所在目录
+        if boundary_shp:
+            temp_root = tempfile.gettempdir()
+            try:
+                if os.path.commonprefix([os.path.abspath(boundary_shp), temp_root]) == temp_root:
+                    temp_dir = os.path.dirname(os.path.abspath(boundary_shp))
+                    if os.path.isdir(temp_dir):
+                        shutil.rmtree(temp_dir, ignore_errors=True)
+            except Exception:
+                pass
+    
+    def _get_boundary_gdf_from_database(self, area: str, level: str) -> Optional[gpd.GeoDataFrame]:
+        """
+        直接从数据库获取边界数据作为GeoDataFrame
+        
+        @param area: 地区名称
+        @param level: 行政层级
+        @returns: 边界GeoDataFrame或None
+        """
+        try:
+            with SessionLocal() as db:
+                norm_area = area.strip()
+                boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
+                if boundary_gdf is not None:
+                    self.logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
+                return boundary_gdf
+                    
+        except Exception as e:
+            self.logger.warning(f"从数据库获取边界数据失败: {str(e)}")
+            
+        return None
+    
+ 

+ 70 - 27
app/utils/mapping_utils.py

@@ -438,21 +438,38 @@ class MappingUtils:
             if np.all(np.isnan(raster)):
                 raise ValueError("栅格数据中没有有效值")
 
-            # 根据分位数分为6个等级
-            bounds = np.nanpercentile(raster, [0, 20, 40, 60, 80, 90, 100])
-            norm = BoundaryNorm(bounds, ncolors=len(bounds) - 1)
-
-            # 获取色彩方案
-            if isinstance(colormap, str):
-                if colormap in COLORMAPS:
-                    color_list = COLORMAPS[colormap]
+            # 检查数据是否为相同值
+            valid_data = raster[~np.isnan(raster)]
+            data_min = np.min(valid_data)
+            data_max = np.max(valid_data)
+            
+            if data_min == data_max:
+                # 所有值相同的情况:创建简单的单色映射
+                self.logger.info(f"检测到所有值相同 ({data_min:.6f}),使用单色映射")
+                bounds = [data_min - 0.001, data_min + 0.001]  # 创建微小的范围
+                norm = BoundaryNorm(bounds, ncolors=1)
+                # 使用绿色系的第一个颜色作为单色
+                if isinstance(colormap, str) and colormap in COLORMAPS:
+                    single_color = COLORMAPS[colormap][0]  # 使用色彩方案的第一个颜色
                 else:
-                    self.logger.warning(f"未知色彩方案: {colormap},使用默认方案")
-                    color_list = COLORMAPS['green_yellow_red_purple']
+                    single_color = '#89AC46'  # 默认绿色
+                cmap = ListedColormap([single_color])
             else:
-                color_list = colormap
-
-            cmap = ListedColormap(color_list)
+                # 正常情况:根据分位数分为6个等级
+                bounds = np.nanpercentile(raster, [0, 20, 40, 60, 80, 90, 100])
+                norm = BoundaryNorm(bounds, ncolors=len(bounds) - 1)
+                
+                # 获取色彩方案
+                if isinstance(colormap, str):
+                    if colormap in COLORMAPS:
+                        color_list = COLORMAPS[colormap]
+                    else:
+                        self.logger.warning(f"未知色彩方案: {colormap},使用默认方案")
+                        color_list = COLORMAPS['green_yellow_red_purple']
+                else:
+                    color_list = colormap
+                
+                cmap = ListedColormap(color_list)
 
             # 设置图片尺寸
             if figsize is not None:
@@ -493,16 +510,29 @@ class MappingUtils:
             ax.tick_params(axis='y', labelrotation=90)
 
             # 添加色带
-            tick_labels = [f"{bounds[i]:.1f}" for i in range(len(bounds) - 1)]
-            cbar = plt.colorbar(
-                plt.cm.ScalarMappable(norm=norm, cmap=cmap),
-                ax=ax,
-                ticks=[(bounds[i] + bounds[i+1]) / 2 for i in range(len(bounds) - 1)],
-                shrink=0.6,
-                aspect=15
-            )
-            cbar.ax.set_yticklabels(tick_labels)
-            cbar.set_label("Values")
+            if data_min == data_max:
+                # 单值情况:简化的色带
+                cbar = plt.colorbar(
+                    plt.cm.ScalarMappable(norm=norm, cmap=cmap),
+                    ax=ax,
+                    ticks=[data_min],
+                    shrink=0.6,
+                    aspect=15
+                )
+                cbar.ax.set_yticklabels([f"{data_min:.6f}"])
+                cbar.set_label("Fixed Value")
+            else:
+                # 正常情况:分级色带
+                tick_labels = [f"{bounds[i]:.1f}" for i in range(len(bounds) - 1)]
+                cbar = plt.colorbar(
+                    plt.cm.ScalarMappable(norm=norm, cmap=cmap),
+                    ax=ax,
+                    ticks=[(bounds[i] + bounds[i+1]) / 2 for i in range(len(bounds) - 1)],
+                    shrink=0.6,
+                    aspect=15
+                )
+                cbar.ax.set_yticklabels(tick_labels)
+                cbar.set_label("Values")
 
             plt.tight_layout()
 
@@ -617,13 +647,26 @@ class MappingUtils:
             if len(band_flat) == 0:
                 raise ValueError("栅格数据中没有有效值")
             
+            # 检查是否所有值相同
+            data_min = np.min(band_flat)
+            data_max = np.max(band_flat)
+            
             # 创建图形
             plt.figure(figsize=figsize)
             
-            # 绘制直方图和密度曲线
-            sns.histplot(band_flat, bins=bins, color='steelblue', alpha=0.7, 
-                        edgecolor='black', stat='density')
-            sns.kdeplot(band_flat, color='red', linewidth=2)
+            if data_min == data_max:
+                # 所有值相同:创建特殊的单值直方图
+                self.logger.info(f"检测到所有值相同 ({data_min:.6f}),创建单值直方图")
+                plt.bar([data_min], [len(band_flat)], width=0.1*abs(data_min) if data_min != 0 else 0.1, 
+                       color='steelblue', alpha=0.7, edgecolor='black')
+                plt.axvline(x=data_min, color='red', linewidth=2, linestyle='--', 
+                           label=f'Fixed Value: {data_min:.6f}')
+                plt.legend()
+            else:
+                # 正常情况:绘制直方图和密度曲线
+                sns.histplot(band_flat, bins=bins, color='steelblue', alpha=0.7, 
+                            edgecolor='black', stat='density')
+                sns.kdeplot(band_flat, color='red', linewidth=2)
             
             # 设置标签和标题
             plt.xlabel(xlabel, fontsize=14)