Эх сурвалжийг харах

farmland数据脚本导入&文档更新、sql备份文件更新

drggboy 1 өдөр өмнө
parent
commit
789ff0a403

+ 166 - 30
PROJECT_RULES.md

@@ -365,35 +365,171 @@ sqlacodegen postgresql://postgres:123456789Qq@localhost/soilgd --tables table1,t
 - `python reset_db.py` - 重置数据库
 - `psql -U postgres -d soilgd -f soilgd.sql` - 从备份文件恢复数据库
 
-#### 5.2.4 GeoJSON数据导入流程
-
-对于地理空间数据的导入:
-
-1. **准备GeoJSON文件**: 确保文件格式正确
-2. **创建地理空间模型**: 使用GeoAlchemy2定义几何字段
-3. **编写导入脚本**: 
-   ```python
-   # 示例: scripts/import_geojson.py
-   import geopandas as gpd
-   from app.models import YourGeoModel
-   from app.database import SessionLocal
-   
-   def import_geojson_data(file_path: str):
-       """导入GeoJSON数据到数据库"""
-       gdf = gpd.read_file(file_path)
-       db = SessionLocal()
-       try:
-           for _, row in gdf.iterrows():
-               record = YourGeoModel(
-                   name=row['name'],
-                   geometry=row['geometry'].wkt
-               )
-               db.add(record)
-           db.commit()
-       finally:
-           db.close()
-   ```
-4. **执行导入**: `python scripts/import_geojson.py`
+#### 5.2.4 数据导入脚本开发规范
+
+##### 5.2.4.1 脚本开发原则
+- **统一模式**: 所有导入脚本应遵循统一的结构和错误处理模式
+- **参考现有脚本**: 可参考 `scripts/import_counties.py` (GeoJSON导入) 和 `scripts/import_farmland_data.py` (Excel导入)
+- **完整日志**: 实现详细的操作日志记录
+- **事务安全**: 确保数据完整性,支持回滚机制
+- **批量处理**: 大数据集使用批量插入优化性能
+
+##### 5.2.4.2 数据导入脚本模板结构
+
+```python
+"""
+数据导入脚本模板
+@description: [描述导入的数据类型和来源]
+"""
+
+import os
+import sys
+import logging
+# 添加项目根目录到Python路径
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from app.database import SessionLocal
+from app.models.[model_file] import [ModelClass]
+
+# 设置日志
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
+
+class [DataType]Importer:
+    """
+    [数据类型]导入器
+    """
+    def __init__(self, file_path: str):
+        self.file_path = file_path
+    
+    def read_data(self):
+        """读取数据文件"""
+        pass
+    
+    def validate_data(self, data):
+        """验证数据格式和完整性"""
+        pass
+    
+    def import_data(self, data):
+        """将数据导入到数据库"""
+        db = SessionLocal()
+        try:
+            # 批量处理逻辑
+            # 事务提交
+            db.commit()
+            logger.info("数据导入完成")
+        except Exception as e:
+            db.rollback()
+            logger.error(f"数据导入失败: {str(e)}")
+            raise
+        finally:
+            db.close()
+    
+    def run_import(self):
+        """执行完整的导入流程"""
+        try:
+            data = self.read_data()
+            validated_data = self.validate_data(data)
+            self.import_data(validated_data)
+        except Exception as e:
+            logger.error(f"导入流程失败: {str(e)}")
+            raise
+
+def main():
+    file_path = "path/to/data/file"
+    importer = [DataType]Importer(file_path)
+    importer.run_import()
+
+if __name__ == "__main__":
+    main()
+```
+
+##### 5.2.4.3 GeoJSON数据导入参考
+
+**参考脚本**: `scripts/import_counties.py`
+
+主要特点:
+- 使用JSON模块读取GeoJSON文件
+- 通过模型类的`from_geojson_feature`方法处理几何数据
+- 完整的错误处理和日志记录
+- 事务安全保证
+
+```python
+# 核心导入逻辑示例
+def import_counties_from_geojson(file_path: str, db: Session):
+    with open(file_path, 'r', encoding='utf-8') as f:
+        geojson_data = json.load(f)
+    
+    for feature in geojson_data['features']:
+        try:
+            county = County.from_geojson_feature(feature)
+            db.add(county)
+            logger.info(f"成功导入: {county.name}")
+        except Exception as e:
+            logger.error(f"导入失败: {str(e)}")
+            continue
+    
+    db.commit()
+```
+
+##### 5.2.4.4 Excel数据导入参考
+
+**参考脚本**: `scripts/import_farmland_data.py`
+
+主要特点:
+- 使用pandas读取Excel文件和指定sheet
+- 支持数据类型转换和映射(如Type字段映射)
+- 自动生成PostGIS几何对象
+- 批量插入优化(1000条/批次)
+- 数据验证和清洗
+
+```python
+# 核心导入逻辑示例
+class FarmlandDataImporter:
+    def __init__(self, excel_path, sheet_name='Farmland'):
+        self.excel_path = excel_path
+        self.sheet_name = sheet_name
+        self.type_mapping = {'旱': 0.0, '水田': 1.0, '水浇地': 2.0}
+    
+    def read_excel_data(self):
+        df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
+        return df
+    
+    def validate_data(self, df):
+        # 数据类型转换
+        df['Type_Numeric'] = df['Type'].map(self.type_mapping)
+        return df
+    
+    def create_geometry(self, lon, lat):
+        return WKTElement(f'POINT({lon} {lat})', srid=4326)
+    
+    def import_data(self, df):
+        # 批量导入逻辑
+        batch_size = 1000
+        for i in range(0, len(df), batch_size):
+            batch_objects = []
+            # ... 创建批次对象
+            db.add_all(batch_objects)
+            db.commit()
+```
+
+##### 5.2.4.5 数据导入最佳实践
+
+1. **文件路径检查**: 导入前验证数据文件是否存在
+2. **数据预览**: 读取数据后显示前几行供确认
+3. **字段映射**: 建立清晰的字段映射表(如类型编码转换)
+4. **几何数据处理**: 自动生成PostGIS兼容的几何对象
+5. **批量操作**: 大数据集分批处理,避免内存溢出
+6. **进度跟踪**: 显示导入进度和统计信息
+7. **异常处理**: 跳过无效数据行,记录详细错误信息
+8. **结果验证**: 导入完成后验证数据库中的记录数量
+
+##### 5.2.4.6 常用数据源类型
+
+- **GeoJSON**: 地理空间矢量数据(参考`import_counties.py`)
+- **Excel/CSV**: 结构化表格数据(参考`import_farmland_data.py`)
+- **Shapefile**: GIS矢量数据(使用geopandas读取)
+- **GeoTIFF**: 栅格地理数据(使用rasterio处理)
 
 #### 5.2.5 注意事项
 - 备份数据库后再执行迁移
@@ -621,4 +757,4 @@ Content-Disposition: attachment; filename="crop_cd_prediction_map.jpg"
 3. 性能优化和用户体验
 4. 与现有架构的一致性
 
-**最后更新**: 2025-06-01 (Cd预测功能集成) 
+**最后更新**: 2025-06-14 (新增数据库导入脚本开发规范和农田数据模型) 

+ 1 - 0
app/models/farmland.py

@@ -21,6 +21,7 @@ class FarmlandData(Base):
     @param {geometry} geom - 点几何对象(使用PostGIS,基于经纬度生成)
     """
     __tablename__ = 'Farmland_data'
+    __table_args__ = {'comment': '耕地样点空间位置与索引数据表,存储农业用地的坐标、类型和空间几何信息'}
     
     # 主键字段 - 保持与原表结构完全一致的大小写
     farmland_id = Column('Farmland_ID', Integer, primary_key=True, comment='区域农业用地矢量点编号')

+ 3 - 0
migrations/versions/beeaf68d0ee1_creat_farmland_orm.py

@@ -35,6 +35,9 @@ def upgrade():
         sa.Column('geom', geoalchemy2.types.Geometry(geometry_type='POINT', srid=4326, from_text='ST_GeomFromEWKT', name='geometry'), nullable=True, comment='点几何对象'),
         sa.PrimaryKeyConstraint('Farmland_ID', 'Sample_ID')
         )
+        
+        # 为表添加注释
+        op.execute("COMMENT ON TABLE \"Farmland_data\" IS '耕地样点空间位置与索引数据表,存储农业用地的坐标、类型和空间几何信息'")
     
     # 使用原生SQL安全创建索引
     try:

+ 0 - 0
app/scripts/import_counties.py → scripts/import_counties.py


+ 252 - 0
scripts/import_farmland_data.py

@@ -0,0 +1,252 @@
+"""
+农田数据导入脚本
+@description: 从Excel文件读取Farmland数据并导入到Farmland_data表
+"""
+
+import os
+import sys
+import pandas as pd
+import logging
+from datetime import datetime
+from sqlalchemy.orm import sessionmaker
+from geoalchemy2 import WKTElement
+
+# 添加项目根目录到Python路径
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from app.database import engine, SessionLocal
+from app.models.farmland import FarmlandData
+
+# 设置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+class FarmlandDataImporter:
+    """
+    农田数据导入器
+    
+    @description: 从Excel文件读取农田数据并导入到数据库
+    """
+    
+    def __init__(self, excel_path, sheet_name='Farmland'):
+        """
+        初始化导入器
+        
+        @param {str} excel_path - Excel文件路径
+        @param {str} sheet_name - Sheet名称,默认为'Farmland'
+        """
+        self.excel_path = excel_path
+        self.sheet_name = sheet_name
+        self.type_mapping = {
+            '旱': 0.0,
+            '水田': 1.0, 
+            '水浇地': 2.0
+        }
+        
+    def read_excel_data(self):
+        """
+        读取Excel文件数据
+        
+        @returns: DataFrame 读取的数据
+        """
+        try:
+            logger.info(f"开始读取Excel文件: {self.excel_path}")
+            logger.info(f"Sheet名称: {self.sheet_name}")
+            
+            # 检查文件是否存在
+            if not os.path.exists(self.excel_path):
+                raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
+            
+            # 读取Excel文件
+            df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
+            
+            logger.info(f"成功读取数据,共 {len(df)} 行")
+            logger.info(f"数据列: {list(df.columns)}")
+            
+            # 显示前几行数据供确认
+            logger.info("前5行数据预览:")
+            logger.info(df.head().to_string())
+            
+            return df
+            
+        except Exception as e:
+            logger.error(f"读取Excel文件失败: {str(e)}")
+            raise
+    
+    def validate_data(self, df):
+        """
+        验证数据格式和完整性
+        
+        @param {DataFrame} df - 要验证的数据
+        @returns: DataFrame 验证后的数据
+        """
+        try:
+            logger.info("开始验证数据...")
+            
+            # 检查必需的列是否存在
+            required_columns = ['Farmland_ID', 'Sample_ID', 'lon', 'lan', 'Type']
+            missing_columns = [col for col in required_columns if col not in df.columns]
+            
+            if missing_columns:
+                raise ValueError(f"缺少必需的列: {missing_columns}")
+            
+            # 检查数据类型
+            logger.info("检查数据类型...")
+            
+            # 转换数值类型
+            df['Farmland_ID'] = pd.to_numeric(df['Farmland_ID'], errors='coerce')
+            df['Sample_ID'] = pd.to_numeric(df['Sample_ID'], errors='coerce') 
+            df['lon'] = pd.to_numeric(df['lon'], errors='coerce')
+            df['lan'] = pd.to_numeric(df['lan'], errors='coerce')
+            
+            # 检查是否有无效的数值
+            if df[['Farmland_ID', 'Sample_ID', 'lon', 'lan']].isnull().any().any():
+                logger.warning("发现无效的数值,将跳过这些行")
+                invalid_rows = df[df[['Farmland_ID', 'Sample_ID', 'lon', 'lan']].isnull().any(axis=1)]
+                logger.warning(f"无效行数: {len(invalid_rows)}")
+                df = df.dropna(subset=['Farmland_ID', 'Sample_ID', 'lon', 'lan'])
+            
+            # 转换Type字段
+            logger.info("转换Type字段...")
+            df['Type_Numeric'] = df['Type'].map(self.type_mapping)
+            
+            # 检查未知的Type值
+            unknown_types = df[df['Type_Numeric'].isnull()]['Type'].unique()
+            if len(unknown_types) > 0:
+                logger.warning(f"发现未知的Type值: {unknown_types}")
+                logger.warning("将为未知Type设置默认值0.0(旱地)")
+                df['Type_Numeric'] = df['Type_Numeric'].fillna(0.0)
+            
+            logger.info(f"数据验证完成,有效数据 {len(df)} 行")
+            
+            return df
+            
+        except Exception as e:
+            logger.error(f"数据验证失败: {str(e)}")
+            raise
+    
+    def create_geometry(self, lon, lat):
+        """
+        创建PostGIS Point几何对象
+        
+        @param {float} lon - 经度
+        @param {float} lat - 纬度
+        @returns: WKTElement 几何对象
+        """
+        return WKTElement(f'POINT({lon} {lat})', srid=4326)
+    
+    def import_data(self, df):
+        """
+        将数据导入到数据库
+        
+        @param {DataFrame} df - 要导入的数据
+        """
+        try:
+            logger.info("开始导入数据到数据库...")
+            
+            # 创建数据库会话
+            db = SessionLocal()
+            
+            try:
+                # 检查是否有重复数据
+                existing_count = db.query(FarmlandData).count()
+                logger.info(f"数据库中现有数据: {existing_count} 条")
+                
+                # 批量创建对象
+                batch_size = 1000
+                total_rows = len(df)
+                imported_count = 0
+                
+                for i in range(0, total_rows, batch_size):
+                    batch_df = df.iloc[i:i+batch_size]
+                    batch_objects = []
+                    
+                    for _, row in batch_df.iterrows():
+                        try:
+                            # 创建FarmlandData对象
+                            farmland_data = FarmlandData(
+                                farmland_id=int(row['Farmland_ID']),
+                                sample_id=int(row['Sample_ID']),
+                                lon=float(row['lon']),
+                                lan=float(row['lan']),
+                                type=float(row['Type_Numeric']),
+                                geom=self.create_geometry(row['lon'], row['lan'])
+                            )
+                            batch_objects.append(farmland_data)
+                            
+                        except Exception as e:
+                            logger.warning(f"跳过行 {i+_}: {str(e)}")
+                            continue
+                    
+                    if batch_objects:
+                        # 批量插入
+                        db.add_all(batch_objects)
+                        db.commit()
+                        imported_count += len(batch_objects)
+                        logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
+                
+                logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
+                
+                # 验证导入结果
+                final_count = db.query(FarmlandData).count()
+                logger.info(f"导入后数据库总数据: {final_count} 条")
+                
+            except Exception as e:
+                db.rollback()
+                logger.error(f"数据导入失败,已回滚: {str(e)}")
+                raise
+            finally:
+                db.close()
+                
+        except Exception as e:
+            logger.error(f"数据导入过程失败: {str(e)}")
+            raise
+    
+    def run_import(self):
+        """
+        执行完整的导入流程
+        """
+        try:
+            logger.info("=" * 60)
+            logger.info("开始农田数据导入流程")
+            logger.info("=" * 60)
+            
+            # 1. 读取Excel数据
+            df = self.read_excel_data()
+            
+            # 2. 验证数据
+            df = self.validate_data(df)
+            
+            # 3. 导入数据
+            self.import_data(df)
+            
+            logger.info("=" * 60)
+            logger.info("农田数据导入流程完成!")
+            logger.info("=" * 60)
+            
+        except Exception as e:
+            logger.error(f"导入流程失败: {str(e)}")
+            raise
+
+def main():
+    """
+    主函数
+    """
+    # Excel文件路径
+    excel_path = r"C:\Users\drzha\Desktop\0614\数据库对应数据.xlsx"
+    sheet_name = "Farmland"
+    
+    try:
+        # 创建导入器并执行导入
+        importer = FarmlandDataImporter(excel_path, sheet_name)
+        importer.run_import()
+        
+    except Exception as e:
+        logger.error(f"程序执行失败: {str(e)}")
+        sys.exit(1)
+
+if __name__ == "__main__":
+    main() 

BIN
soilgd.sql