Pārlūkot izejas kodu

Merge branch 'zsj_dev' of Ding/AcidMap into master

Ding 2 dienas atpakaļ
vecāks
revīzija
ac8e3b5424

+ 66 - 15
Cd_Prediction_Integrated_System/analysis/data_processing.py

@@ -30,30 +30,50 @@ class DataProcessor:
         
     def load_predictions(self):
         """
-        加载模型预测结果
+        加载模型预测结果(根据WORKFLOW_CONFIG配置)
         
         @return: 包含预测结果的字典
         """
         try:
             predictions = {}
             
+            # 动态读取当前的工作流配置(运行时可能被修改)
+            workflow_config = self._get_current_workflow_config()
+            self.logger.info(f"当前工作流配置: {workflow_config}")
+            
+            # 只加载在工作流配置中启用的模型的预测结果
             # 加载作物Cd预测结果
-            crop_cd_path = os.path.join(
-                config.DATA_PATHS["predictions_dir"],
-                config.CROP_CD_MODEL["output_file"]
-            )
-            if os.path.exists(crop_cd_path):
-                predictions['crop_cd'] = pd.read_csv(crop_cd_path)
-                self.logger.info(f"作物Cd预测结果加载成功: {crop_cd_path}")
+            if workflow_config.get("run_crop_model", False):
+                crop_cd_path = os.path.join(
+                    config.DATA_PATHS["predictions_dir"],
+                    config.CROP_CD_MODEL["output_file"]
+                )
+                if os.path.exists(crop_cd_path):
+                    predictions['crop_cd'] = pd.read_csv(crop_cd_path)
+                    self.logger.info(f"作物Cd预测结果加载成功: {crop_cd_path}")
+                else:
+                    self.logger.warning(f"作物Cd预测文件不存在: {crop_cd_path}")
+            else:
+                self.logger.info("跳过作物Cd预测结果加载(工作流配置中未启用)")
             
             # 加载有效态Cd预测结果
-            effective_cd_path = os.path.join(
-                config.DATA_PATHS["predictions_dir"],
-                config.EFFECTIVE_CD_MODEL["output_file"]
-            )
-            if os.path.exists(effective_cd_path):
-                predictions['effective_cd'] = pd.read_csv(effective_cd_path)
-                self.logger.info(f"有效态Cd预测结果加载成功: {effective_cd_path}")
+            if workflow_config.get("run_effective_model", False):
+                effective_cd_path = os.path.join(
+                    config.DATA_PATHS["predictions_dir"],
+                    config.EFFECTIVE_CD_MODEL["output_file"]
+                )
+                if os.path.exists(effective_cd_path):
+                    predictions['effective_cd'] = pd.read_csv(effective_cd_path)
+                    self.logger.info(f"有效态Cd预测结果加载成功: {effective_cd_path}")
+                else:
+                    self.logger.warning(f"有效态Cd预测文件不存在: {effective_cd_path}")
+            else:
+                self.logger.info("跳过有效态Cd预测结果加载(工作流配置中未启用)")
+            
+            if not predictions:
+                self.logger.warning("没有加载到任何预测结果,请检查WORKFLOW_CONFIG配置和预测文件是否存在")
+            else:
+                self.logger.info(f"根据工作流配置,成功加载了 {len(predictions)} 个模型的预测结果: {list(predictions.keys())}")
             
             return predictions
             
@@ -61,6 +81,37 @@ class DataProcessor:
             self.logger.error(f"预测结果加载失败: {str(e)}")
             raise
     
+    def _get_current_workflow_config(self):
+        """
+        动态读取当前的工作流配置
+        
+        @return: 当前的工作流配置字典
+        """
+        try:
+            config_file = os.path.join(config.PROJECT_ROOT, "config.py")
+            
+            # 读取配置文件内容
+            with open(config_file, 'r', encoding='utf-8') as f:
+                config_content = f.read()
+            
+            # 提取WORKFLOW_CONFIG
+            import re
+            pattern = r'WORKFLOW_CONFIG\s*=\s*(\{[^}]*\})'
+            match = re.search(pattern, config_content)
+            
+            if match:
+                # 使用eval安全地解析配置(这里是安全的,因为我们控制配置文件内容)
+                workflow_config_str = match.group(1)
+                workflow_config = eval(workflow_config_str)
+                return workflow_config
+            else:
+                self.logger.warning("无法从配置文件中提取WORKFLOW_CONFIG,使用默认配置")
+                return config.WORKFLOW_CONFIG
+                
+        except Exception as e:
+            self.logger.error(f"读取工作流配置失败: {str(e)},使用默认配置")
+            return config.WORKFLOW_CONFIG
+    
     def load_coordinates(self):
         """
         加载坐标数据

+ 12 - 4
Cd_Prediction_Integrated_System/analysis/visualization.py

@@ -122,7 +122,8 @@ class Visualizer:
                          color_map_name=None,
                          title_name="Prediction Cd",
                          output_path=None,
-                         output_size=None):
+                         output_size=None,
+                         high_res=False):
         """
         创建栅格地图
         
@@ -132,6 +133,7 @@ class Visualizer:
         @param title_name: 输出数据的图的名称
         @param output_path: 输出保存的图片的路径
         @param output_size: 图片尺寸
+        @param high_res: 是否使用高分辨率输出(默认False,DPI=300)
         @return: 输出图片路径
         """
         try:
@@ -222,7 +224,9 @@ class Visualizer:
             
             # 保存图片
             output_file = f"{output_path}.jpg"
-            plt.savefig(output_file, dpi=config.VISUALIZATION_CONFIG["dpi"])
+            # 根据high_res参数决定使用的DPI
+            output_dpi = 600 if high_res else config.VISUALIZATION_CONFIG["dpi"]
+            plt.savefig(output_file, dpi=output_dpi, format='jpg', bbox_inches='tight')
             plt.close()
             
             self.logger.info(f"栅格地图创建成功: {output_file}")
@@ -238,7 +242,8 @@ class Visualizer:
                         xlabel='Cd content',
                         ylabel='Frequency',
                         title='County level Cd Frequency',
-                        save_path=None):
+                        save_path=None,
+                        high_res=False):
         """
         绘制GeoTIFF文件的直方图
         
@@ -248,6 +253,7 @@ class Visualizer:
         @param ylabel: 纵坐标标签
         @param title: 图标题
         @param save_path: 可选,保存图片路径(含文件名和扩展名)
+        @param high_res: 是否使用高分辨率输出(默认False,DPI=300)
         @return: 输出图片路径
         """
         try:
@@ -303,7 +309,9 @@ class Visualizer:
             os.makedirs(os.path.dirname(save_path), exist_ok=True)
             
             # 保存图片
-            plt.savefig(save_path, dpi=config.VISUALIZATION_CONFIG["dpi"], 
+            # 根据high_res参数决定使用的DPI
+            output_dpi = 600 if high_res else config.VISUALIZATION_CONFIG["dpi"]
+            plt.savefig(save_path, dpi=output_dpi, 
                        format='jpg', bbox_inches='tight')
             plt.close()
             

+ 1 - 1
Cd_Prediction_Integrated_System/config.py

@@ -70,7 +70,7 @@ VISUALIZATION_CONFIG = {
     },
     "default_colormap": "colormap6",
     "figure_size": 12,
-    "dpi": 300
+    "dpi": 600
 }
 
 # 执行流程配置

+ 323 - 32
PROJECT_RULES.md

@@ -37,7 +37,7 @@
 - 矢量数据处理 (Vector Data Processing)  
 - 空间数据查询和分析
 - GIS数据导入导出
-- **Cd预测模型分析 (Cd Prediction Analysis)** ✨新增
+- Cd预测模型分析 (Cd Prediction Analysis)
   - 作物Cd含量预测
   - 有效态Cd含量预测
   - 预测结果可视化
@@ -51,24 +51,25 @@ AcidMap/
 │   ├── api/               # API路由层
 │   │   ├── raster.py      # 栅格数据接口
 │   │   ├── vector.py      # 矢量数据接口
-│   │   └── cd_prediction.py # Cd预测模型接口 ✨新增
+│   │   └── cd_prediction.py # Cd预测模型接口
 │   ├── services/          # 业务逻辑层
 │   │   ├── raster_service.py
 │   │   ├── vector_service.py
-│   │   └── cd_prediction_service.py # Cd预测业务服务 ✨新增
+│   │   └── cd_prediction_service.py # Cd预测业务服务
 │   ├── models/            # 数据模型层
 │   │   ├── base.py        # 基础模型
 │   │   ├── orm_models.py  # ORM模型
 │   │   ├── raster.py      # 栅格数据模型
 │   │   ├── vector.py      # 矢量数据模型
-│   │   └── county.py      # 县域数据模型
+│   │   ├── county.py      # 县域数据模型
+│   │   └── farmland.py    # 农田样点数据模型
 │   ├── utils/             # 工具函数
 │   │   ├── file_validators.py
-│   │   └── cd_prediction_wrapper.py # Cd预测系统包装器 ✨新增
-│   ├── config/            # 配置管理 ✨新增
+│   │   └── cd_prediction_wrapper.py # Cd预测系统包装器
+│   ├── config/            # 配置管理
 │   │   ├── __init__.py
 │   │   └── cd_prediction_config.py # Cd预测配置管理
-│   ├── static/            # 静态文件 ✨新增
+│   ├── static/            # 静态文件
 │   │   └── cd_predictions/ # Cd预测输出文件
 │   │       ├── figures/    # 地图和直方图
 │   │       ├── raster/     # 栅格文件
@@ -78,7 +79,7 @@ AcidMap/
 │   ├── scripts/           # 脚本文件
 │   ├── database.py        # 数据库配置
 │   └── main.py           # FastAPI应用入口
-├── Cd_Prediction_Integrated_System/ # Cd预测系统 ✨新增
+├── Cd_Prediction_Integrated_System/ # Cd预测系统
 │   ├── models/            # 预测模型
 │   │   ├── crop_cd_model/
 │   │   └── effective_cd_model/
@@ -98,9 +99,9 @@ AcidMap/
 ├── soilgd.sql           # 数据库备份文件
 ├── db_migrate.py        # 数据库迁移脚本
 ├── reset_db.py          # 数据库重置脚本
-├── test_cd_integration.py # Cd集成测试脚本 ✨新增
-├── INTEGRATION_GUIDE.md  # 集成使用指南 ✨新增
-├── CD_INTEGRATION_SUMMARY.md # 集成总结文档 ✨新增
+├── test_cd_integration.py # Cd集成测试脚本
+├── INTEGRATION_GUIDE.md  # 集成使用指南
+├── CD_INTEGRATION_SUMMARY.md # 集成总结文档
 └── main.py              # 应用启动入口
 ```
 
@@ -109,7 +110,7 @@ AcidMap/
 2. **Service层 (app/services/)**: 业务逻辑处理,数据转换,调用Model层
 3. **Model层 (app/models/)**: 数据模型定义,数据库操作
 4. **Utils层 (app/utils/)**: 通用工具函数,验证器等
-5. **Config层 (app/config/)**: 配置管理,环境设置 ✨新增
+5. **Config层 (app/config/)**: 配置管理,环境设置
 
 ## 3. 开发规范
 
@@ -163,7 +164,7 @@ from .services import RasterService
 #### 3.2.1 路由组织
 - 栅格数据: `/api/raster/*`
 - 矢量数据: `/api/vector/*`
-- **Cd预测分析: `/api/cd-prediction/*`** ✨新增
+- **Cd预测分析: `/api/cd-prediction/*`**
 - 使用RESTful设计原则
 
 #### 3.2.2 HTTP状态码
@@ -198,6 +199,29 @@ from .services import RasterService
 
 #### 3.3.3 迁移管理
 - 使用Alembic进行数据库迁移
+
+### 3.4 可视化规范
+
+#### 3.4.1 分辨率设置
+- **默认DPI**: 300 (标准分辨率输出)
+- **高分辨率DPI**: 600 (高质量输出) 
+- **高分辨率模式**: 支持通过`high_res=True`参数启用600 DPI输出
+- **图像格式**: JPG格式,支持bbox_inches='tight'自动裁切
+
+#### 3.4.2 分辨率配置层级
+1. **配置文件级别**: `config.VISUALIZATION_CONFIG["dpi"] = 300`
+2. **matplotlib全局设置**: `plt.rcParams['savefig.dpi'] = 300`
+3. **方法参数级别**: `high_res`参数可动态控制输出DPI
+
+#### 3.4.3 可视化方法接口
+- **栅格地图**: `create_raster_map(high_res=False)` - 默认300 DPI标准分辨率输出
+- **直方图**: `create_histogram(high_res=False)` - 默认300 DPI标准分辨率输出
+- **图像尺寸**: 可通过`figure_size`和`figsize`参数调整
+
+#### 3.4.4 字体配置
+- 优先使用Windows系统中文字体: Microsoft YaHei, SimHei等
+- 支持跨平台字体回退机制
+- 自动字体缓存重建解决字体问题
 - 保持迁移文件版本控制
 - 提供数据库重置脚本
 
@@ -215,18 +239,22 @@ from .services import RasterService
 ### 4.2 API路由层
 - **栅格API (api/raster.py)**: 处理栅格数据的CRUD操作
 - **矢量API (api/vector.py)**: 处理矢量数据的CRUD操作
-- **Cd预测API (api/cd_prediction.py)**: 处理Cd预测模型的调用和结果获取 ✨新增
+- **Cd预测API (api/cd_prediction.py)**: 处理Cd预测模型的调用和结果获取
 
 ### 4.3 业务服务层
 - **RasterService**: 栅格数据业务逻辑
 - **VectorService**: 矢量数据业务逻辑
-- **CdPredictionService**: Cd预测分析业务逻辑 ✨新增
+- **CdPredictionService**: Cd预测分析业务逻辑
 
 ### 4.4 数据模型层
 - **ORM模型**: 数据库表映射
+  - RasterData: 栅格数据模型
+  - VectorData: 矢量数据模型
+  - Counties: 县域地理数据模型
+  - FarmlandData: 农田样点空间位置与索引数据模型
 - **Pydantic模型**: API输入输出验证
 
-### 4.5 Cd预测系统组件 ✨新增
+### 4.5 Cd预测系统组件
 - **CdPredictionWrapper**: Cd预测系统包装器
 - **CdPredictionConfig**: Cd预测配置管理
 - **作物Cd预测模型**: 基于神经网络的作物Cd含量预测
@@ -241,13 +269,276 @@ from .services import RasterService
 4. **API层开发**: 实现HTTP接口
 5. **测试**: 单元测试和集成测试
 6. **文档更新**: 更新API文档和代码注释
-7. **规则文件更新**: 更新PROJECT_RULES.md中的相关规范 ✨新增
+7. **规则文件更新**: 更新PROJECT_RULES.md中的相关规范
 
 ### 5.2 数据库变更流程
-1. 修改ORM模型
-2. 生成迁移文件: `alembic revision --autogenerate -m "描述"`
-3. 执行迁移: `alembic upgrade head`
-4. 更新数据库备份文件
+
+#### 5.2.1 数据库表格添加标准流程
+
+**第一步: 创建/修改 ORM 模型**
+1. 在 `app/models/` 目录下创建或编辑模型文件
+2. 定义SQLAlchemy ORM模型类,继承Base类
+3. 确保在 `app/models/__init__.py` 中导入新模型
+
+```python
+# 实际案例: 农田数据模型
+# app/models/farmland.py
+from sqlalchemy import Column, Integer, Float
+from geoalchemy2 import Geometry
+from app.database import Base  # 统一的Base导入
+
+class FarmlandData(Base):
+    """
+    农田样点空间位置与索引数据模型
+    
+    @param {int} farmland_id - 区域农业用地矢量点编号(主键)
+    @param {int} sample_id - 采样自增的ID(主键+自增)
+    @param {float} lon - 经度
+    @param {float} lan - 纬度
+    @param {float} type - 用地类型:旱地(0)、水田(1)、水浇地(2)
+    """
+    __tablename__ = 'Farmland_data'
+    
+    farmland_id = Column('Farmland_ID', Integer, primary_key=True)
+    sample_id = Column('Sample_ID', Integer, primary_key=True, autoincrement=True)
+    lon = Column('lon', Float, nullable=True)
+    lan = Column('lan', Float, nullable=True)
+    type = Column('Type', Float, nullable=True)
+    geom = Column(Geometry('POINT', srid=4326))
+```
+
+**第二步: 使用迁移脚本生成迁移文件**
+```bash
+# 使用项目自带的迁移脚本
+python db_migrate.py create "添加新数据表"
+
+# 或直接使用alembic命令
+alembic revision --autogenerate -m "添加新数据表"
+```
+
+**第三步: 检查生成的迁移文件**
+1. 查看 `migrations/versions/` 目录下新生成的迁移文件
+2. 验证迁移内容是否正确
+3. 必要时手动调整迁移脚本
+
+**第四步: 执行数据库迁移**
+```bash
+# 使用项目迁移脚本
+python db_migrate.py upgrade
+
+# 或直接使用alembic命令  
+alembic upgrade head
+```
+
+**第五步: 验证迁移结果**
+```bash
+# 查看当前数据库版本
+python db_migrate.py current
+
+# 查看迁移历史
+python db_migrate.py history
+```
+
+#### 5.2.2 使用sqlacodegen反向生成模型 (可选)
+
+当需要从现有数据库表生成ORM模型时:
+
+```bash
+# 反向生成所有表的模型
+sqlacodegen postgresql://postgres:123456789Qq@localhost/soilgd > models.py
+
+# 生成特定表的模型
+sqlacodegen postgresql://postgres:123456789Qq@localhost/soilgd --tables table1,table2 > specific_models.py
+```
+
+#### 5.2.3 数据库脚本命令参考
+
+**db_migrate.py 脚本命令**:
+- `python db_migrate.py create "描述"` - 创建新迁移
+- `python db_migrate.py upgrade` - 升级到最新版本
+- `python db_migrate.py downgrade` - 降级到上一版本
+- `python db_migrate.py current` - 显示当前版本
+- `python db_migrate.py history` - 显示迁移历史
+- `python db_migrate.py stamp head` - 标记数据库版本
+
+**其他数据库操作**:
+- `python reset_db.py` - 重置数据库
+- `psql -U postgres -d soilgd -f soilgd.sql` - 从备份文件恢复数据库
+
+#### 5.2.4 数据导入脚本开发规范
+
+##### 5.2.4.1 脚本开发原则
+- **统一模式**: 所有导入脚本应遵循统一的结构和错误处理模式
+- **参考现有脚本**: 可参考 `scripts/import_counties.py` (GeoJSON导入) 和 `scripts/import_farmland_data.py` (Excel导入)
+- **完整日志**: 实现详细的操作日志记录
+- **事务安全**: 确保数据完整性,支持回滚机制
+- **批量处理**: 大数据集使用批量插入优化性能
+
+##### 5.2.4.2 数据导入脚本模板结构
+
+```python
+"""
+数据导入脚本模板
+@description: [描述导入的数据类型和来源]
+"""
+
+import os
+import sys
+import logging
+# 添加项目根目录到Python路径
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from app.database import SessionLocal
+from app.models.[model_file] import [ModelClass]
+
+# 设置日志
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
+
+class [DataType]Importer:
+    """
+    [数据类型]导入器
+    """
+    def __init__(self, file_path: str):
+        self.file_path = file_path
+    
+    def read_data(self):
+        """读取数据文件"""
+        pass
+    
+    def validate_data(self, data):
+        """验证数据格式和完整性"""
+        pass
+    
+    def import_data(self, data):
+        """将数据导入到数据库"""
+        db = SessionLocal()
+        try:
+            # 批量处理逻辑
+            # 事务提交
+            db.commit()
+            logger.info("数据导入完成")
+        except Exception as e:
+            db.rollback()
+            logger.error(f"数据导入失败: {str(e)}")
+            raise
+        finally:
+            db.close()
+    
+    def run_import(self):
+        """执行完整的导入流程"""
+        try:
+            data = self.read_data()
+            validated_data = self.validate_data(data)
+            self.import_data(validated_data)
+        except Exception as e:
+            logger.error(f"导入流程失败: {str(e)}")
+            raise
+
+def main():
+    file_path = "path/to/data/file"
+    importer = [DataType]Importer(file_path)
+    importer.run_import()
+
+if __name__ == "__main__":
+    main()
+```
+
+##### 5.2.4.3 GeoJSON数据导入参考
+
+**参考脚本**: `scripts/import_counties.py`
+
+主要特点:
+- 使用JSON模块读取GeoJSON文件
+- 通过模型类的`from_geojson_feature`方法处理几何数据
+- 完整的错误处理和日志记录
+- 事务安全保证
+
+```python
+# 核心导入逻辑示例
+def import_counties_from_geojson(file_path: str, db: Session):
+    with open(file_path, 'r', encoding='utf-8') as f:
+        geojson_data = json.load(f)
+    
+    for feature in geojson_data['features']:
+        try:
+            county = County.from_geojson_feature(feature)
+            db.add(county)
+            logger.info(f"成功导入: {county.name}")
+        except Exception as e:
+            logger.error(f"导入失败: {str(e)}")
+            continue
+    
+    db.commit()
+```
+
+##### 5.2.4.4 Excel数据导入参考
+
+**参考脚本**: `scripts/import_farmland_data.py`
+
+主要特点:
+- 使用pandas读取Excel文件和指定sheet
+- 支持数据类型转换和映射(如Type字段映射)
+- 自动生成PostGIS几何对象
+- 批量插入优化(1000条/批次)
+- 数据验证和清洗
+
+```python
+# 核心导入逻辑示例
+class FarmlandDataImporter:
+    def __init__(self, excel_path, sheet_name='Farmland'):
+        self.excel_path = excel_path
+        self.sheet_name = sheet_name
+        self.type_mapping = {'旱': 0.0, '水田': 1.0, '水浇地': 2.0}
+    
+    def read_excel_data(self):
+        df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
+        return df
+    
+    def validate_data(self, df):
+        # 数据类型转换
+        df['Type_Numeric'] = df['Type'].map(self.type_mapping)
+        return df
+    
+    def create_geometry(self, lon, lat):
+        return WKTElement(f'POINT({lon} {lat})', srid=4326)
+    
+    def import_data(self, df):
+        # 批量导入逻辑
+        batch_size = 1000
+        for i in range(0, len(df), batch_size):
+            batch_objects = []
+            # ... 创建批次对象
+            db.add_all(batch_objects)
+            db.commit()
+```
+
+##### 5.2.4.5 数据导入最佳实践
+
+1. **文件路径检查**: 导入前验证数据文件是否存在
+2. **数据预览**: 读取数据后显示前几行供确认
+3. **字段映射**: 建立清晰的字段映射表(如类型编码转换)
+4. **几何数据处理**: 自动生成PostGIS兼容的几何对象
+5. **批量操作**: 大数据集分批处理,避免内存溢出
+6. **进度跟踪**: 显示导入进度和统计信息
+7. **异常处理**: 跳过无效数据行,记录详细错误信息
+8. **结果验证**: 导入完成后验证数据库中的记录数量
+
+##### 5.2.4.6 常用数据源类型
+
+- **GeoJSON**: 地理空间矢量数据(参考`import_counties.py`)
+- **Excel/CSV**: 结构化表格数据(参考`import_farmland_data.py`)
+- **Shapefile**: GIS矢量数据(使用geopandas读取)
+- **GeoTIFF**: 栅格地理数据(使用rasterio处理)
+
+#### 5.2.5 注意事项
+- 备份数据库后再执行迁移
+- 在开发环境先测试迁移
+- 保持迁移文件的版本控制
+- 迁移失败时及时回滚
+- **Base类统一性**:所有模型必须使用`app.database`中的同一个Base实例
+  - ❌ 错误:`from .base import Base`(会创建独立的Base)
+  - ✅ 正确:`from app.database import Base`(使用统一的Base)
 
 ### 5.3 代码提交规范
 - feat: 新功能
@@ -255,9 +546,9 @@ from .services import RasterService
 - docs: 文档更新
 - refactor: 代码重构
 - test: 测试相关
-- **integrate: 系统集成** ✨新增
+- **integrate: 系统集成**
 
-### 5.4 Cd预测功能开发流程 ✨新增
+### 5.4 Cd预测功能开发流程
 1. **模型验证**: 确保Cd预测系统完整性
 2. **包装器开发**: 创建系统调用包装器
 3. **服务层集成**: 实现异步预测服务
@@ -270,7 +561,7 @@ from .services import RasterService
 ### 6.1 开发环境要求
 - Python 3.8+
 - PostgreSQL 12+ (with PostGIS)
-- **Cd预测系统依赖**: ✨新增
+- **Cd预测系统依赖**:
   - PyTorch >= 1.9.0
   - scikit-learn >= 1.0.0
   - geopandas >= 0.10.0
@@ -312,7 +603,7 @@ python test_cd_integration.py
 - 文件大小限制
 - 恶意文件检测
 
-### 7.4 Cd预测系统安全 ✨新增
+### 7.4 Cd预测系统安全
 - 预测任务超时控制 (5分钟)
 - 输出文件访问权限管理
 - 敏感路径信息隐藏
@@ -330,7 +621,7 @@ python test_cd_integration.py
 - 缓存策略
 - 分页处理
 
-### 8.3 Cd预测性能优化 ✨新增
+### 8.3 Cd预测性能优化
 - 异步任务处理 (asyncio)
 - CPU密集型任务线程池执行
 - 预测结果文件缓存
@@ -348,7 +639,7 @@ python test_cd_integration.py
 - 分级别记录日志
 - 敏感信息脱敏
 
-### 9.3 Cd预测错误处理 ✨新增
+### 9.3 Cd预测错误处理
 - 预测系统完整性检查
 - 依赖包缺失检测
 - 预测超时异常处理
@@ -366,7 +657,7 @@ python test_cd_integration.py
 - 测试数据隔离
 - 数据清理策略
 
-### 10.3 Cd预测测试规范 ✨新增
+### 10.3 Cd预测测试规范
 - 配置模块测试
 - 包装器功能测试
 - API端点注册验证
@@ -384,13 +675,13 @@ python test_cd_integration.py
 - 错误日志监控
 - 数据库性能监控
 
-### 11.3 Cd预测部署特殊要求 ✨新增
+### 11.3 Cd预测部署特殊要求
 - 确保Cd_Prediction_Integrated_System目录完整
 - 验证地理空间库安装 (fiona, pyogrio)
 - 配置预测输出目录权限
 - 监控预测任务执行时间
 
-## 12. API文档规范 ✨新增
+## 12. API文档规范
 
 ### 12.1 Cd预测API端点
 
@@ -412,7 +703,7 @@ GET  /api/cd-prediction/crop-cd/download-histogram
 GET  /api/cd-prediction/effective-cd/download-histogram
 ```
 
-### 12.2 一键接口特点 ✨新增
+### 12.2 一键接口特点
 - **即时响应**: 生成完成后直接返回图片文件
 - **简化操作**: 无需分两步操作,一次调用完成
 - **适用场景**: 前端直接显示、浏览器预览、快速下载
@@ -466,4 +757,4 @@ Content-Disposition: attachment; filename="crop_cd_prediction_map.jpg"
 3. 性能优化和用户体验
 4. 与现有架构的一致性
 
-**最后更新**: 2025-06-01 (Cd预测功能集成) 
+**最后更新**: 2025-06-14 (新增数据库导入脚本开发规范和农田数据模型) 

+ 174 - 0
app/api/cd_prediction.py

@@ -536,4 +536,178 @@ async def get_latest_results_overview(county_name: str):
             detail=f"获取{county_name}预测结果概览失败: {str(e)}"
         )
 
+# =============================================================================
+# 预测结果统计信息接口
+# =============================================================================
+
+@router.get("/crop-cd/statistics/{county_name}", 
+           summary="获取作物Cd预测统计信息", 
+           description="获取指定县市的作物Cd预测结果的详细统计信息")
+async def get_crop_cd_statistics(county_name: str):
+    """
+    获取指定县市的作物Cd预测统计信息
+    
+    @param county_name: 县市名称,如:乐昌市
+    @returns {Dict[str, Any]} 预测结果的统计信息
+    """
+    try:
+        logger.info(f"获取{county_name}的作物Cd预测统计信息")
+        
+        service = CdPredictionService()
+        
+        # 验证县市是否支持
+        if not service.is_county_supported(county_name):
+            raise HTTPException(
+                status_code=404, 
+                detail=f"不支持的县市: {county_name}"
+            )
+        
+        # 获取预测结果数据
+        stats = service.get_crop_cd_statistics(county_name)
+        
+        if not stats:
+            raise HTTPException(
+                status_code=404, 
+                detail=f"未找到{county_name}的作物Cd预测结果,请先执行预测"
+            )
+        
+        return {
+            "success": True,
+            "message": f"获取{county_name}作物Cd预测统计信息成功",
+            "data": stats
+        }
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"获取{county_name}作物Cd预测统计信息失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"获取{county_name}作物Cd预测统计信息失败: {str(e)}"
+        )
+
+@router.get("/effective-cd/statistics/{county_name}", 
+           summary="获取有效态Cd预测统计信息", 
+           description="获取指定县市的有效态Cd预测结果的详细统计信息")
+async def get_effective_cd_statistics(county_name: str):
+    """
+    获取指定县市的有效态Cd预测统计信息
+    
+    @param county_name: 县市名称,如:乐昌市
+    @returns {Dict[str, Any]} 预测结果的统计信息
+    """
+    try:
+        logger.info(f"获取{county_name}的有效态Cd预测统计信息")
+        
+        service = CdPredictionService()
+        
+        # 验证县市是否支持
+        if not service.is_county_supported(county_name):
+            raise HTTPException(
+                status_code=404, 
+                detail=f"不支持的县市: {county_name}"
+            )
+        
+        # 获取预测结果数据
+        stats = service.get_effective_cd_statistics(county_name)
+        
+        if not stats:
+            raise HTTPException(
+                status_code=404, 
+                detail=f"未找到{county_name}的有效态Cd预测结果,请先执行预测"
+            )
+        
+        return {
+            "success": True,
+            "message": f"获取{county_name}有效态Cd预测统计信息成功",
+            "data": stats
+        }
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"获取{county_name}有效态Cd预测统计信息失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"获取{county_name}有效态Cd预测统计信息失败: {str(e)}"
+        )
+
+@router.get("/combined-statistics/{county_name}", 
+           summary="获取综合预测统计信息", 
+           description="获取指定县市的作物Cd和有效态Cd预测结果的综合统计信息")
+async def get_combined_statistics(county_name: str):
+    """
+    获取指定县市的综合预测统计信息
+    
+    @param county_name: 县市名称,如:乐昌市
+    @returns {Dict[str, Any]} 综合预测结果的统计信息
+    """
+    try:
+        logger.info(f"获取{county_name}的综合预测统计信息")
+        
+        service = CdPredictionService()
+        
+        # 验证县市是否支持
+        if not service.is_county_supported(county_name):
+            raise HTTPException(
+                status_code=404, 
+                detail=f"不支持的县市: {county_name}"
+            )
+        
+        # 获取综合统计信息
+        stats = service.get_combined_statistics(county_name)
+        
+        if not stats:
+            raise HTTPException(
+                status_code=404, 
+                detail=f"未找到{county_name}的预测结果,请先执行预测"
+            )
+        
+        return {
+            "success": True,
+            "message": f"获取{county_name}综合预测统计信息成功",
+            "data": stats
+        }
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"获取{county_name}综合预测统计信息失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"获取{county_name}综合预测统计信息失败: {str(e)}"
+        )
+
+
+
+@router.get("/all-statistics", 
+           summary="获取所有县市统计概览", 
+           description="获取所有支持县市的预测结果统计概览")
+async def get_all_statistics():
+    """
+    获取所有支持县市的预测结果统计概览
+    
+    @returns {Dict[str, Any]} 所有县市的统计概览
+    """
+    try:
+        logger.info("获取所有县市统计概览")
+        
+        service = CdPredictionService()
+        
+        # 获取所有县市的统计概览
+        all_stats = service.get_all_counties_statistics()
+        
+        return {
+            "success": True,
+            "message": "获取所有县市统计概览成功",
+            "data": all_stats
+        }
+        
+    except Exception as e:
+        logger.error(f"获取所有县市统计概览失败: {str(e)}")
+        raise HTTPException(
+            status_code=500, 
+            detail=f"获取所有县市统计概览失败: {str(e)}"
+        )
+
  

+ 71 - 2
app/main.py

@@ -2,9 +2,78 @@ from fastapi import FastAPI
 from .api import vector, raster, cd_prediction
 from .database import engine, Base
 from fastapi.middleware.cors import CORSMiddleware
+import logging
+import sys
+from alembic.config import Config
+from alembic import command
+from alembic.runtime.migration import MigrationContext
+from alembic.script import ScriptDirectory
+import os
 
-# 创建数据库表
-Base.metadata.create_all(bind=engine)
+# 设置日志
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+def check_and_upgrade_database():
+    """
+    检查数据库迁移状态并自动升级到最新版本
+    
+    @description: 在应用启动前检查数据库版本,如果需要升级则自动执行
+    @returns: None
+    @throws: SystemExit 当数据库操作失败时退出程序
+    """
+    try:
+        # 配置 Alembic
+        alembic_cfg = Config(os.path.join(os.path.dirname(os.path.dirname(__file__)), "alembic.ini"))
+        
+        # 获取当前数据库版本
+        with engine.connect() as connection:
+            context = MigrationContext.configure(connection)
+            current_rev = context.get_current_revision()
+            
+        # 获取脚本目录和最新版本
+        script_dir = ScriptDirectory.from_config(alembic_cfg)
+        head_rev = script_dir.get_current_head()
+        
+        logger.info(f"当前数据库版本: {current_rev}")
+        logger.info(f"最新迁移版本: {head_rev}")
+        
+        # 检查是否需要升级
+        if current_rev != head_rev:
+            logger.warning("数据库版本不是最新版本,正在自动升级...")
+            
+            # 执行升级
+            command.upgrade(alembic_cfg, "head")
+            logger.info("数据库升级成功")
+        else:
+            logger.info("数据库版本已是最新")
+            
+    except Exception as e:
+        logger.error(f"数据库迁移检查失败: {str(e)}")
+        logger.error("程序将退出,请手动检查数据库状态")
+        sys.exit(1)
+
+def safe_create_tables():
+    """
+    安全地创建数据库表
+    
+    @description: 在确保迁移状态正确后创建表结构
+    """
+    try:
+        # 先检查和升级数据库
+        check_and_upgrade_database()
+        
+        # 创建数据库表(如果迁移已正确应用,这里应该不会有冲突)
+        Base.metadata.create_all(bind=engine)
+        logger.info("数据库表结构检查完成")
+        
+    except Exception as e:
+        logger.error(f"数据库表创建失败: {str(e)}")
+        logger.error("请检查数据库连接和迁移状态")
+        sys.exit(1)
+
+# 执行数据库初始化
+safe_create_tables()
 
 app = FastAPI(
     title="地图数据处理系统",

+ 2 - 1
app/models/__init__.py

@@ -1,4 +1,5 @@
 from app.models.orm_models import *
 from app.models.vector import *
 from app.models.raster import *
-from app.models.county import *  # 导入新的县级地理数据模型 
+from app.models.county import *  # 导入新的县级地理数据模型
+from app.models.farmland import *  # 导入农田数据模型 

+ 72 - 0
app/models/farmland.py

@@ -0,0 +1,72 @@
+"""
+农田数据模型
+
+定义Farmland_data表的ORM模型,用于存储耕地样点空间位置与索引数据
+"""
+
+from sqlalchemy import Column, Integer, Float, ForeignKey
+from geoalchemy2 import Geometry
+from app.database import Base
+
+
+class FarmlandData(Base):
+    """
+    耕地样点空间位置与索引数据模型
+    
+    @param {int} farmland_id - 区域农业用地矢量点编号(主键)
+    @param {int} sample_id - 采样自增的ID(主键+外键+自增)
+    @param {float} lon - 经度
+    @param {float} lan - 纬度
+    @param {float} type - 用地类型,旱地(0)、水田(1)、水浇地(2)
+    @param {geometry} geom - 点几何对象(使用PostGIS,基于经纬度生成)
+    """
+    __tablename__ = 'Farmland_data'
+    __table_args__ = {'comment': '耕地样点空间位置与索引数据表,存储农业用地的坐标、类型和空间几何信息'}
+    
+    # 主键字段 - 保持与原表结构完全一致的大小写
+    farmland_id = Column('Farmland_ID', Integer, primary_key=True, comment='区域农业用地矢量点编号')
+    sample_id = Column('Sample_ID', Integer, primary_key=True, autoincrement=True, comment='采样自增的ID')
+    
+    # 地理坐标字段 - 保持原始大小写
+    lon = Column('lon', Float, nullable=True, comment='经度')
+    lan = Column('lan', Float, nullable=True, comment='纬度')
+    
+    # 用地类型字段 - 保持原始大小写
+    type = Column('Type', Float, nullable=True, comment='用地类型:旱地(0)、水田(1)、水浇地(2)')
+    
+    # 使用PostGIS几何字段存储点位置(可选,便于空间查询)
+    geom = Column(Geometry('POINT', srid=4326), comment='点几何对象')
+    
+    def __repr__(self):
+        return f"<FarmlandData(farmland_id={self.farmland_id}, sample_id={self.sample_id}, " \
+               f"lon={self.lon}, lan={self.lan}, type={self.type})>"
+    
+    @property
+    def land_type_name(self):
+        """
+        获取用地类型名称
+        
+        @returns {str} 用地类型名称
+        """
+        type_mapping = {
+            0.0: "旱地",
+            1.0: "水田", 
+            2.0: "水浇地"
+        }
+        return type_mapping.get(self.type, "未知类型")
+    
+    def to_dict(self):
+        """
+        转换为字典格式
+        
+        @returns {dict} 包含所有字段的字典
+        """
+        return {
+            'farmland_id': self.farmland_id,
+            'sample_id': self.sample_id,
+            'lon': self.lon,
+            'lan': self.lan,
+            'type': self.type,
+            'land_type_name': self.land_type_name,
+            'geom': str(self.geom) if self.geom else None
+        } 

+ 292 - 9
app/services/cd_prediction_service.py

@@ -510,8 +510,8 @@ class CdPredictionService:
             self.logger.info(f"为{county_name}执行作物Cd预测")
             prediction_result = self.wrapper.run_prediction_script("crop")
             
-            # 获取输出文件
-            latest_outputs = self.wrapper.get_latest_outputs("all")
+            # 获取输出文件(指定作物Cd模型类型)
+            latest_outputs = self.wrapper.get_latest_outputs("all", "crop")
             
             # 复制文件到API输出目录
             timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
@@ -549,8 +549,8 @@ class CdPredictionService:
             self.logger.info(f"为{county_name}执行有效态Cd预测")
             prediction_result = self.wrapper.run_prediction_script("effective")
             
-            # 获取输出文件
-            latest_outputs = self.wrapper.get_latest_outputs("all")
+            # 获取输出文件(指定有效态Cd模型类型)
+            latest_outputs = self.wrapper.get_latest_outputs("all", "effective")
             
             # 复制文件到API输出目录
             timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
@@ -640,8 +640,8 @@ class CdPredictionService:
             self.logger.info("执行作物Cd预测")
             prediction_result = self.wrapper.run_prediction_script("crop")
             
-            # 获取输出文件
-            latest_outputs = self.wrapper.get_latest_outputs("all")
+            # 获取输出文件(指定作物Cd模型类型)
+            latest_outputs = self.wrapper.get_latest_outputs("all", "crop")
             
             # 复制文件到API输出目录
             timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
@@ -675,8 +675,8 @@ class CdPredictionService:
             self.logger.info("执行有效态Cd预测")
             prediction_result = self.wrapper.run_prediction_script("effective")
             
-            # 获取输出文件
-            latest_outputs = self.wrapper.get_latest_outputs("all")
+            # 获取输出文件(指定有效态Cd模型类型)
+            latest_outputs = self.wrapper.get_latest_outputs("all", "effective")
             
             # 复制文件到API输出目录
             timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
@@ -869,4 +869,287 @@ class CdPredictionService:
                 return max(files, key=os.path.getctime)
             return None
         except Exception:
-            return None 
+            return None
+    
+    # =============================================================================
+    # 统计信息方法
+    # =============================================================================
+    
+    def get_crop_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
+        """
+        获取作物Cd预测结果的统计信息
+        
+        @param {str} county_name - 县市名称
+        @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
+        """
+        try:
+            # 查找最新的预测结果文件
+            cd_system_path = self.config.get_cd_system_path()
+            final_data_path = os.path.join(cd_system_path, "data", "final", "Final_predictions_crop_cd.csv")
+            
+            if not os.path.exists(final_data_path):
+                self.logger.warning(f"未找到作物Cd预测结果文件: {final_data_path}")
+                return None
+            
+            # 读取预测数据
+            df = pd.read_csv(final_data_path)
+            
+            if 'Prediction' not in df.columns:
+                self.logger.warning("预测结果文件中缺少'Prediction'列")
+                return None
+            
+            predictions = df['Prediction']
+            
+            # 计算基础统计信息
+            basic_stats = {
+                "数据点总数": len(predictions),
+                "均值": float(predictions.mean()),
+                "中位数": float(predictions.median()),
+                "标准差": float(predictions.std()),
+                "最小值": float(predictions.min()),
+                "最大值": float(predictions.max()),
+                "25%分位数": float(predictions.quantile(0.25)),
+                "75%分位数": float(predictions.quantile(0.75)),
+                "偏度": float(predictions.skew()),
+                "峰度": float(predictions.kurtosis())
+            }
+            
+            # 计算分布直方图数据
+            histogram_data = self._calculate_histogram_data(predictions)
+            
+            # 计算空间统计信息(如果有坐标信息)
+            spatial_stats = None
+            if 'longitude' in df.columns and 'latitude' in df.columns:
+                spatial_stats = self._calculate_spatial_statistics(df)
+            
+            return {
+                "模型类型": "作物Cd模型",
+                "县市名称": county_name,
+                "数据更新时间": datetime.fromtimestamp(os.path.getmtime(final_data_path)).isoformat(),
+                "基础统计": basic_stats,
+                "分布直方图": histogram_data,
+                "空间统计": spatial_stats
+            }
+            
+        except Exception as e:
+            self.logger.error(f"获取作物Cd统计信息失败: {str(e)}")
+            return None
+    
+    def get_effective_cd_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
+        """
+        获取有效态Cd预测结果的统计信息
+        
+        @param {str} county_name - 县市名称
+        @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
+        """
+        try:
+            # 查找最新的预测结果文件
+            cd_system_path = self.config.get_cd_system_path()
+            final_data_path = os.path.join(cd_system_path, "data", "final", "Final_predictions_effective_cd.csv")
+            
+            if not os.path.exists(final_data_path):
+                self.logger.warning(f"未找到有效态Cd预测结果文件: {final_data_path}")
+                return None
+            
+            # 读取预测数据
+            df = pd.read_csv(final_data_path)
+            
+            if 'Prediction' not in df.columns:
+                self.logger.warning("预测结果文件中缺少'Prediction'列")
+                return None
+            
+            predictions = df['Prediction']
+            
+            # 计算基础统计信息
+            basic_stats = {
+                "数据点总数": len(predictions),
+                "均值": float(predictions.mean()),
+                "中位数": float(predictions.median()),
+                "标准差": float(predictions.std()),
+                "最小值": float(predictions.min()),
+                "最大值": float(predictions.max()),
+                "25%分位数": float(predictions.quantile(0.25)),
+                "75%分位数": float(predictions.quantile(0.75)),
+                "偏度": float(predictions.skew()),
+                "峰度": float(predictions.kurtosis())
+            }
+            
+            # 计算分布直方图数据
+            histogram_data = self._calculate_histogram_data(predictions)
+            
+            # 计算空间统计信息(如果有坐标信息)
+            spatial_stats = None
+            if 'longitude' in df.columns and 'latitude' in df.columns:
+                spatial_stats = self._calculate_spatial_statistics(df)
+            
+            return {
+                "模型类型": "有效态Cd模型",
+                "县市名称": county_name,
+                "数据更新时间": datetime.fromtimestamp(os.path.getmtime(final_data_path)).isoformat(),
+                "基础统计": basic_stats,
+                "分布直方图": histogram_data,
+                "空间统计": spatial_stats
+            }
+            
+        except Exception as e:
+            self.logger.error(f"获取有效态Cd统计信息失败: {str(e)}")
+            return None
+    
+    def get_combined_statistics(self, county_name: str) -> Optional[Dict[str, Any]]:
+        """
+        获取综合预测统计信息
+        
+        @param {str} county_name - 县市名称
+        @returns {Optional[Dict[str, Any]]} 综合统计信息,如果没有数据则返回None
+        """
+        try:
+            crop_stats = self.get_crop_cd_statistics(county_name)
+            effective_stats = self.get_effective_cd_statistics(county_name)
+            
+            if not crop_stats and not effective_stats:
+                return None
+            
+            return {
+                "县市名称": county_name,
+                "作物Cd统计": crop_stats,
+                "有效态Cd统计": effective_stats,
+                "生成时间": datetime.now().isoformat()
+            }
+            
+        except Exception as e:
+            self.logger.error(f"获取综合统计信息失败: {str(e)}")
+            return None
+    
+
+    
+    def get_all_counties_statistics(self) -> Dict[str, Any]:
+        """
+        获取所有支持县市的统计概览
+        
+        @returns {Dict[str, Any]} 所有县市的统计概览
+        """
+        try:
+            all_stats = {
+                "支持县市总数": len(self.supported_counties),
+                "统计生成时间": datetime.now().isoformat(),
+                "县市统计": {},
+                "汇总信息": {
+                    "有作物Cd数据的县市": 0,
+                    "有有效态Cd数据的县市": 0,
+                    "数据完整的县市": 0
+                }
+            }
+            
+            for county_name in self.supported_counties.keys():
+                county_stats = {
+                    "县市名称": county_name,
+                    "有作物Cd数据": False,
+                    "有有效态Cd数据": False,
+                    "数据完整": False,
+                    "最新更新时间": None
+                }
+                
+                # 检查作物Cd数据
+                crop_stats = self.get_crop_cd_statistics(county_name)
+                if crop_stats:
+                    county_stats["有作物Cd数据"] = True
+                    county_stats["作物Cd概要"] = {
+                        "数据点数": crop_stats["基础统计"]["数据点总数"],
+                        "均值": crop_stats["基础统计"]["均值"],
+                        "最大值": crop_stats["基础统计"]["最大值"]
+                    }
+                    all_stats["汇总信息"]["有作物Cd数据的县市"] += 1
+                
+                # 检查有效态Cd数据
+                effective_stats = self.get_effective_cd_statistics(county_name)
+                if effective_stats:
+                    county_stats["有有效态Cd数据"] = True
+                    county_stats["有效态Cd概要"] = {
+                        "数据点数": effective_stats["基础统计"]["数据点总数"],
+                        "均值": effective_stats["基础统计"]["均值"],
+                        "最大值": effective_stats["基础统计"]["最大值"]
+                    }
+                    all_stats["汇总信息"]["有有效态Cd数据的县市"] += 1
+                
+                # 检查数据完整性
+                if county_stats["有作物Cd数据"] and county_stats["有有效态Cd数据"]:
+                    county_stats["数据完整"] = True
+                    all_stats["汇总信息"]["数据完整的县市"] += 1
+                
+                all_stats["县市统计"][county_name] = county_stats
+            
+            return all_stats
+            
+        except Exception as e:
+            self.logger.error(f"获取所有县市统计概览失败: {str(e)}")
+            return {
+                "error": f"获取统计概览失败: {str(e)}",
+                "支持县市总数": len(self.supported_counties),
+                "统计生成时间": datetime.now().isoformat()
+            }
+    
+    # =============================================================================
+    # 辅助统计方法
+    # =============================================================================
+    
+
+    
+    def _calculate_histogram_data(self, predictions: pd.Series, bins: int = 20) -> Dict[str, Any]:
+        """
+        计算分布直方图数据
+        
+        @param {pd.Series} predictions - 预测值
+        @param {int} bins - 直方图区间数
+        @returns {Dict[str, Any]} 直方图数据
+        """
+        try:
+            import numpy as np
+            
+            hist, bin_edges = np.histogram(predictions, bins=bins)
+            
+            # 计算区间中心点
+            bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
+            
+            return {
+                "区间数": int(bins),
+                "频次": [int(count) for count in hist],
+                "区间中心": [float(center) for center in bin_centers],
+                "区间边界": [float(edge) for edge in bin_edges],
+                "总频次": int(hist.sum())
+            }
+            
+        except Exception as e:
+            self.logger.error(f"计算直方图数据失败: {str(e)}")
+            return {}
+    
+    def _calculate_spatial_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
+        """
+        计算空间统计信息
+        
+        @param {pd.DataFrame} df - 包含坐标和预测值的数据框
+        @returns {Dict[str, Any]} 空间统计信息
+        """
+        try:
+            spatial_stats = {
+                "经度范围": {
+                    "最小值": float(df['longitude'].min()),
+                    "最大值": float(df['longitude'].max()),
+                    "跨度": float(df['longitude'].max() - df['longitude'].min())
+                },
+                "纬度范围": {
+                    "最小值": float(df['latitude'].min()),
+                    "最大值": float(df['latitude'].max()),
+                    "跨度": float(df['latitude'].max() - df['latitude'].min())
+                }
+            }
+            
+            return spatial_stats
+            
+        except Exception as e:
+            self.logger.error(f"计算空间统计信息失败: {str(e)}")
+            return {}
+    
+
+    
+
+    

+ 19 - 5
app/utils/cd_prediction_wrapper.py

@@ -9,6 +9,7 @@ import os
 import sys
 import logging
 import subprocess
+import time
 from typing import Dict, Any, Optional
 from datetime import datetime
 
@@ -203,11 +204,12 @@ class CdPredictionWrapper:
         
         return output_files
     
-    def get_latest_outputs(self, output_type: str = "all") -> Dict[str, Optional[str]]:
+    def get_latest_outputs(self, output_type: str = "all", model_type: str = None) -> Dict[str, Optional[str]]:
         """
         获取最新的输出文件
         
         @param {str} output_type - 输出类型 ("maps", "histograms", "rasters", "all")
+        @param {str} model_type - 模型类型 ("crop", "effective", None为获取所有)
         @returns {Dict[str, Optional[str]]} 最新输出文件路径
         """
         try:
@@ -223,7 +225,8 @@ class CdPredictionWrapper:
                 if os.path.exists(figures_dir):
                     for file in os.listdir(figures_dir):
                         if "Prediction" in file and "results" in file and file.endswith(('.jpg', '.png')):
-                            map_files.append(os.path.join(figures_dir, file))
+                            file_path = os.path.join(figures_dir, file)
+                            map_files.append(file_path)
                 
                 latest_files["latest_map"] = max(map_files, key=os.path.getctime) if map_files else None
             
@@ -233,7 +236,8 @@ class CdPredictionWrapper:
                 if os.path.exists(figures_dir):
                     for file in os.listdir(figures_dir):
                         if ("frequency" in file.lower() or "histogram" in file.lower()) and file.endswith(('.jpg', '.png')):
-                            histogram_files.append(os.path.join(figures_dir, file))
+                            file_path = os.path.join(figures_dir, file)
+                            histogram_files.append(file_path)
                 
                 latest_files["latest_histogram"] = max(histogram_files, key=os.path.getctime) if histogram_files else None
             
@@ -243,12 +247,22 @@ class CdPredictionWrapper:
                 if os.path.exists(raster_dir):
                     for file in os.listdir(raster_dir):
                         if file.startswith('output') and file.endswith('.tif'):
-                            raster_files.append(os.path.join(raster_dir, file))
+                            file_path = os.path.join(raster_dir, file)
+                            raster_files.append(file_path)
                 
                 latest_files["latest_raster"] = max(raster_files, key=os.path.getctime) if raster_files else None
             
+            # 添加调试信息
+            self.logger.info(f"获取最新输出文件 - 模型类型: {model_type}, 输出类型: {output_type}")
+            for key, value in latest_files.items():
+                if value:
+                    self.logger.info(f"  {key}: {os.path.basename(value)}")
+                else:
+                    self.logger.warning(f"  {key}: 未找到文件")
+            
             return latest_files
             
         except Exception as e:
             self.logger.error(f"获取最新输出文件失败: {str(e)}")
-            return {} 
+            return {}
+ 

+ 70 - 0
migrations/versions/beeaf68d0ee1_creat_farmland_orm.py

@@ -0,0 +1,70 @@
+"""creat_farmland_orm
+
+Revision ID: beeaf68d0ee1
+Revises: f0d12e4fab12
+Create Date: 2025-06-14 17:07:57.976453
+
+"""
+from alembic import op
+import sqlalchemy as sa
+import geoalchemy2
+
+
+# revision identifiers, used by Alembic.
+revision = 'beeaf68d0ee1'
+down_revision = 'f0d12e4fab12'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """升级数据库到当前版本"""
+    # ### commands auto generated by Alembic - please adjust! ###
+    
+    # 检查表是否已存在,如果不存在才创建
+    connection = op.get_bind()
+    inspector = sa.inspect(connection)
+    
+    if 'Farmland_data' not in inspector.get_table_names():
+        op.create_table('Farmland_data',
+        sa.Column('Farmland_ID', sa.Integer(), nullable=False, comment='区域农业用地矢量点编号'),
+        sa.Column('Sample_ID', sa.Integer(), autoincrement=True, nullable=False, comment='采样自增的ID'),
+        sa.Column('lon', sa.Float(), nullable=True, comment='经度'),
+        sa.Column('lan', sa.Float(), nullable=True, comment='纬度'),
+        sa.Column('Type', sa.Float(), nullable=True, comment='用地类型:旱地(0)、水田(1)、水浇地(2)'),
+        sa.Column('geom', geoalchemy2.types.Geometry(geometry_type='POINT', srid=4326, from_text='ST_GeomFromEWKT', name='geometry'), nullable=True, comment='点几何对象'),
+        sa.PrimaryKeyConstraint('Farmland_ID', 'Sample_ID')
+        )
+        
+        # 为表添加注释
+        op.execute("COMMENT ON TABLE \"Farmland_data\" IS '耕地样点空间位置与索引数据表,存储农业用地的坐标、类型和空间几何信息'")
+    
+    # 使用原生SQL安全创建索引
+    try:
+        op.execute('CREATE INDEX IF NOT EXISTS idx_Farmland_data_geom ON "Farmland_data" USING gist (geom)')
+    except Exception as e:
+        print(f"索引创建警告: {e}")
+    
+    # ### end Alembic commands ###
+
+
+def downgrade():
+    """将数据库降级到上一版本"""
+    # ### commands auto generated by Alembic - please adjust! ###
+    
+    # 安全地删除索引和表,检查存在性
+    connection = op.get_bind()
+    inspector = sa.inspect(connection)
+    
+    if 'Farmland_data' in inspector.get_table_names():
+        # 检查索引是否存在,存在才删除
+        indexes = inspector.get_indexes('Farmland_data')
+        index_names = [idx['name'] for idx in indexes]
+        
+        if 'idx_Farmland_data_geom' in index_names:
+            op.drop_index('idx_Farmland_data_geom', table_name='Farmland_data', postgresql_using='gist')
+            
+        # 删除表
+        op.drop_table('Farmland_data')
+    
+    # ### end Alembic commands ###

+ 258 - 0
scripts/db_health_check.py

@@ -0,0 +1,258 @@
+"""
+数据库健康检查工具
+@description: 检查数据库连接状态、迁移状态和表结构完整性
+@author: AcidMap Team
+@version: 1.0.0
+"""
+
+import os
+import sys
+import logging
+from datetime import datetime
+
+# 添加项目根目录到Python路径
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from app.database import engine, Base
+from alembic.config import Config
+from alembic.runtime.migration import MigrationContext
+from alembic.script import ScriptDirectory
+from sqlalchemy import text
+import traceback
+
+# 设置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+class DatabaseHealthChecker:
+    """
+    数据库健康检查器
+    
+    @description: 全面检查数据库状态,包括连接、迁移、表结构等
+    """
+    
+    def __init__(self):
+        self.engine = engine
+        self.alembic_cfg = Config(os.path.join(os.path.dirname(os.path.dirname(__file__)), "alembic.ini"))
+        self.results = {
+            "timestamp": datetime.now().isoformat(),
+            "connection": False,
+            "migration_status": "unknown",
+            "tables_exist": False,
+            "issues": [],
+            "recommendations": []
+        }
+    
+    def check_database_connection(self):
+        """
+        检查数据库连接
+        
+        @returns: bool 连接是否成功
+        """
+        try:
+            with self.engine.connect() as conn:
+                # 执行简单查询测试连接
+                result = conn.execute(text("SELECT 1"))
+                result.fetchone()
+                
+            self.results["connection"] = True
+            logger.info("✓ 数据库连接正常")
+            return True
+            
+        except Exception as e:
+            self.results["connection"] = False
+            self.results["issues"].append(f"数据库连接失败: {str(e)}")
+            logger.error(f"✗ 数据库连接失败: {str(e)}")
+            return False
+    
+    def check_migration_status(self):
+        """
+        检查数据库迁移状态
+        
+        @returns: dict 迁移状态信息
+        """
+        try:
+            with self.engine.connect() as connection:
+                context = MigrationContext.configure(connection)
+                current_rev = context.get_current_revision()
+                
+            script_dir = ScriptDirectory.from_config(self.alembic_cfg)
+            head_rev = script_dir.get_current_head()
+            
+            migration_info = {
+                "current_revision": current_rev,
+                "head_revision": head_rev,
+                "is_up_to_date": current_rev == head_rev
+            }
+            
+            if migration_info["is_up_to_date"]:
+                self.results["migration_status"] = "up_to_date"
+                logger.info(f"✓ 数据库迁移状态正常 (版本: {current_rev})")
+            else:
+                self.results["migration_status"] = "outdated"
+                self.results["issues"].append(f"数据库版本过时: 当前 {current_rev}, 最新 {head_rev}")
+                self.results["recommendations"].append("执行 'python db_migrate.py upgrade' 升级数据库")
+                logger.warning(f"⚠ 数据库需要升级: {current_rev} -> {head_rev}")
+            
+            return migration_info
+            
+        except Exception as e:
+            self.results["migration_status"] = "error"
+            self.results["issues"].append(f"迁移状态检查失败: {str(e)}")
+            logger.error(f"✗ 迁移状态检查失败: {str(e)}")
+            return {"error": str(e)}
+    
+    def check_table_structure(self):
+        """
+        检查表结构完整性
+        
+        @returns: dict 表结构检查结果
+        """
+        try:
+            with self.engine.connect() as conn:
+                # 获取所有表名
+                tables_query = text("""
+                    SELECT table_name 
+                    FROM information_schema.tables 
+                    WHERE table_schema = 'public'
+                """)
+                existing_tables = [row[0] for row in conn.execute(tables_query).fetchall()]
+                
+                # 获取模型定义的表名
+                model_tables = [table.name for table in Base.metadata.tables.values()]
+                
+                # 检查缺失的表
+                missing_tables = set(model_tables) - set(existing_tables)
+                extra_tables = set(existing_tables) - set(model_tables) - {'alembic_version'}
+                
+                table_info = {
+                    "existing_tables": existing_tables,
+                    "model_tables": model_tables,
+                    "missing_tables": list(missing_tables),
+                    "extra_tables": list(extra_tables)
+                }
+                
+                if missing_tables:
+                    self.results["tables_exist"] = False
+                    self.results["issues"].append(f"缺失表: {', '.join(missing_tables)}")
+                    self.results["recommendations"].append("执行数据库升级或重新创建表结构")
+                    logger.warning(f"⚠ 缺失表: {', '.join(missing_tables)}")
+                else:
+                    self.results["tables_exist"] = True
+                    logger.info("✓ 所有必需的表都存在")
+                
+                if extra_tables:
+                    logger.info(f"发现额外表: {', '.join(extra_tables)}")
+                
+                return table_info
+                
+        except Exception as e:
+            self.results["tables_exist"] = False
+            self.results["issues"].append(f"表结构检查失败: {str(e)}")
+            logger.error(f"✗ 表结构检查失败: {str(e)}")
+            return {"error": str(e)}
+    
+    def check_spatial_extensions(self):
+        """
+        检查空间扩展 (PostGIS)
+        
+        @returns: dict 空间扩展状态
+        """
+        try:
+            with self.engine.connect() as conn:
+                # 检查PostGIS扩展
+                postgis_query = text("""
+                    SELECT extname, extversion 
+                    FROM pg_extension 
+                    WHERE extname = 'postgis'
+                """)
+                postgis_result = conn.execute(postgis_query).fetchall()
+                
+                if postgis_result:
+                    version = postgis_result[0][1]
+                    logger.info(f"✓ PostGIS 扩展已安装 (版本: {version})")
+                    return {"installed": True, "version": version}
+                else:
+                    self.results["issues"].append("PostGIS 扩展未安装")
+                    self.results["recommendations"].append("安装 PostGIS 扩展: CREATE EXTENSION postgis;")
+                    logger.warning("⚠ PostGIS 扩展未安装")
+                    return {"installed": False}
+                    
+        except Exception as e:
+            self.results["issues"].append(f"空间扩展检查失败: {str(e)}")
+            logger.error(f"✗ 空间扩展检查失败: {str(e)}")
+            return {"error": str(e)}
+    
+    def run_full_check(self):
+        """
+        执行完整的健康检查
+        
+        @returns: dict 完整的检查结果
+        """
+        logger.info("开始数据库健康检查...")
+        logger.info("=" * 50)
+        
+        # 检查数据库连接
+        if not self.check_database_connection():
+            logger.error("数据库连接失败,跳过后续检查")
+            return self.results
+        
+        # 检查迁移状态
+        migration_info = self.check_migration_status()
+        
+        # 检查表结构
+        table_info = self.check_table_structure()
+        
+        # 检查空间扩展
+        spatial_info = self.check_spatial_extensions()
+        
+        # 汇总结果
+        self.results.update({
+            "migration_info": migration_info,
+            "table_info": table_info,
+            "spatial_info": spatial_info
+        })
+        
+        logger.info("=" * 50)
+        logger.info("健康检查完成")
+        
+        # 打印总结
+        if self.results["issues"]:
+            logger.warning(f"发现 {len(self.results['issues'])} 个问题:")
+            for issue in self.results["issues"]:
+                logger.warning(f"  - {issue}")
+        
+        if self.results["recommendations"]:
+            logger.info(f"建议执行 {len(self.results['recommendations'])} 项操作:")
+            for rec in self.results["recommendations"]:
+                logger.info(f"  - {rec}")
+        
+        if not self.results["issues"]:
+            logger.info("✓ 数据库状态良好")
+        
+        return self.results
+
+def main():
+    """
+    主函数
+    """
+    try:
+        checker = DatabaseHealthChecker()
+        results = checker.run_full_check()
+        
+        # 根据结果设置退出码
+        if results["issues"]:
+            sys.exit(1)  # 有问题时退出码为1
+        else:
+            sys.exit(0)  # 正常时退出码为0
+            
+    except Exception as e:
+        logger.error(f"健康检查执行失败: {str(e)}")
+        logger.error(f"错误详情: {traceback.format_exc()}")
+        sys.exit(2)  # 检查失败时退出码为2
+
+if __name__ == "__main__":
+    main() 

+ 0 - 0
app/scripts/import_counties.py → scripts/import_counties.py


+ 252 - 0
scripts/import_farmland_data.py

@@ -0,0 +1,252 @@
+"""
+农田数据导入脚本
+@description: 从Excel文件读取Farmland数据并导入到Farmland_data表
+"""
+
+import os
+import sys
+import pandas as pd
+import logging
+from datetime import datetime
+from sqlalchemy.orm import sessionmaker
+from geoalchemy2 import WKTElement
+
+# 添加项目根目录到Python路径
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from app.database import engine, SessionLocal
+from app.models.farmland import FarmlandData
+
+# 设置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+class FarmlandDataImporter:
+    """
+    农田数据导入器
+    
+    @description: 从Excel文件读取农田数据并导入到数据库
+    """
+    
+    def __init__(self, excel_path, sheet_name='Farmland'):
+        """
+        初始化导入器
+        
+        @param {str} excel_path - Excel文件路径
+        @param {str} sheet_name - Sheet名称,默认为'Farmland'
+        """
+        self.excel_path = excel_path
+        self.sheet_name = sheet_name
+        self.type_mapping = {
+            '旱': 0.0,
+            '水田': 1.0, 
+            '水浇地': 2.0
+        }
+        
+    def read_excel_data(self):
+        """
+        读取Excel文件数据
+        
+        @returns: DataFrame 读取的数据
+        """
+        try:
+            logger.info(f"开始读取Excel文件: {self.excel_path}")
+            logger.info(f"Sheet名称: {self.sheet_name}")
+            
+            # 检查文件是否存在
+            if not os.path.exists(self.excel_path):
+                raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
+            
+            # 读取Excel文件
+            df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
+            
+            logger.info(f"成功读取数据,共 {len(df)} 行")
+            logger.info(f"数据列: {list(df.columns)}")
+            
+            # 显示前几行数据供确认
+            logger.info("前5行数据预览:")
+            logger.info(df.head().to_string())
+            
+            return df
+            
+        except Exception as e:
+            logger.error(f"读取Excel文件失败: {str(e)}")
+            raise
+    
+    def validate_data(self, df):
+        """
+        验证数据格式和完整性
+        
+        @param {DataFrame} df - 要验证的数据
+        @returns: DataFrame 验证后的数据
+        """
+        try:
+            logger.info("开始验证数据...")
+            
+            # 检查必需的列是否存在
+            required_columns = ['Farmland_ID', 'Sample_ID', 'lon', 'lan', 'Type']
+            missing_columns = [col for col in required_columns if col not in df.columns]
+            
+            if missing_columns:
+                raise ValueError(f"缺少必需的列: {missing_columns}")
+            
+            # 检查数据类型
+            logger.info("检查数据类型...")
+            
+            # 转换数值类型
+            df['Farmland_ID'] = pd.to_numeric(df['Farmland_ID'], errors='coerce')
+            df['Sample_ID'] = pd.to_numeric(df['Sample_ID'], errors='coerce') 
+            df['lon'] = pd.to_numeric(df['lon'], errors='coerce')
+            df['lan'] = pd.to_numeric(df['lan'], errors='coerce')
+            
+            # 检查是否有无效的数值
+            if df[['Farmland_ID', 'Sample_ID', 'lon', 'lan']].isnull().any().any():
+                logger.warning("发现无效的数值,将跳过这些行")
+                invalid_rows = df[df[['Farmland_ID', 'Sample_ID', 'lon', 'lan']].isnull().any(axis=1)]
+                logger.warning(f"无效行数: {len(invalid_rows)}")
+                df = df.dropna(subset=['Farmland_ID', 'Sample_ID', 'lon', 'lan'])
+            
+            # 转换Type字段
+            logger.info("转换Type字段...")
+            df['Type_Numeric'] = df['Type'].map(self.type_mapping)
+            
+            # 检查未知的Type值
+            unknown_types = df[df['Type_Numeric'].isnull()]['Type'].unique()
+            if len(unknown_types) > 0:
+                logger.warning(f"发现未知的Type值: {unknown_types}")
+                logger.warning("将为未知Type设置默认值0.0(旱地)")
+                df['Type_Numeric'] = df['Type_Numeric'].fillna(0.0)
+            
+            logger.info(f"数据验证完成,有效数据 {len(df)} 行")
+            
+            return df
+            
+        except Exception as e:
+            logger.error(f"数据验证失败: {str(e)}")
+            raise
+    
+    def create_geometry(self, lon, lat):
+        """
+        创建PostGIS Point几何对象
+        
+        @param {float} lon - 经度
+        @param {float} lat - 纬度
+        @returns: WKTElement 几何对象
+        """
+        return WKTElement(f'POINT({lon} {lat})', srid=4326)
+    
+    def import_data(self, df):
+        """
+        将数据导入到数据库
+        
+        @param {DataFrame} df - 要导入的数据
+        """
+        try:
+            logger.info("开始导入数据到数据库...")
+            
+            # 创建数据库会话
+            db = SessionLocal()
+            
+            try:
+                # 检查是否有重复数据
+                existing_count = db.query(FarmlandData).count()
+                logger.info(f"数据库中现有数据: {existing_count} 条")
+                
+                # 批量创建对象
+                batch_size = 1000
+                total_rows = len(df)
+                imported_count = 0
+                
+                for i in range(0, total_rows, batch_size):
+                    batch_df = df.iloc[i:i+batch_size]
+                    batch_objects = []
+                    
+                    for _, row in batch_df.iterrows():
+                        try:
+                            # 创建FarmlandData对象
+                            farmland_data = FarmlandData(
+                                farmland_id=int(row['Farmland_ID']),
+                                sample_id=int(row['Sample_ID']),
+                                lon=float(row['lon']),
+                                lan=float(row['lan']),
+                                type=float(row['Type_Numeric']),
+                                geom=self.create_geometry(row['lon'], row['lan'])
+                            )
+                            batch_objects.append(farmland_data)
+                            
+                        except Exception as e:
+                            logger.warning(f"跳过行 {i+_}: {str(e)}")
+                            continue
+                    
+                    if batch_objects:
+                        # 批量插入
+                        db.add_all(batch_objects)
+                        db.commit()
+                        imported_count += len(batch_objects)
+                        logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
+                
+                logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
+                
+                # 验证导入结果
+                final_count = db.query(FarmlandData).count()
+                logger.info(f"导入后数据库总数据: {final_count} 条")
+                
+            except Exception as e:
+                db.rollback()
+                logger.error(f"数据导入失败,已回滚: {str(e)}")
+                raise
+            finally:
+                db.close()
+                
+        except Exception as e:
+            logger.error(f"数据导入过程失败: {str(e)}")
+            raise
+    
+    def run_import(self):
+        """
+        执行完整的导入流程
+        """
+        try:
+            logger.info("=" * 60)
+            logger.info("开始农田数据导入流程")
+            logger.info("=" * 60)
+            
+            # 1. 读取Excel数据
+            df = self.read_excel_data()
+            
+            # 2. 验证数据
+            df = self.validate_data(df)
+            
+            # 3. 导入数据
+            self.import_data(df)
+            
+            logger.info("=" * 60)
+            logger.info("农田数据导入流程完成!")
+            logger.info("=" * 60)
+            
+        except Exception as e:
+            logger.error(f"导入流程失败: {str(e)}")
+            raise
+
+def main():
+    """
+    主函数
+    """
+    # Excel文件路径
+    excel_path = r"C:\Users\drzha\Desktop\0614\数据库对应数据.xlsx"
+    sheet_name = "Farmland"
+    
+    try:
+        # 创建导入器并执行导入
+        importer = FarmlandDataImporter(excel_path, sheet_name)
+        importer.run_import()
+        
+    except Exception as e:
+        logger.error(f"程序执行失败: {str(e)}")
+        sys.exit(1)
+
+if __name__ == "__main__":
+    main() 

BIN
soilgd.sql