""" Agricultural数据导入脚本 @description: 从Excel文件读取agricultural_data数据并导入到agricultural_data表 """ import os import sys import pandas as pd import logging from datetime import datetime from sqlalchemy.orm import sessionmaker # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app.database import engine, SessionLocal from app.models.agricultural import AgriculturalData # 需创建对应的ORM模型 # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class AgriculturalDataImporter: """ 农业投入品数据导入器 @description: 从Excel文件读取农业投入品数据并导入到数据库 """ def __init__(self, excel_path, sheet_name='Agricultural'): """ 初始化导入器 @param {str} excel_path - Excel文件路径 @param {str} sheet_name - Sheet名称,默认为'agricultural_data' """ self.excel_path = excel_path self.sheet_name = sheet_name # 定义必需字段列表(根据数据库设计文档) self.required_columns = [ 'county_name', 'crop_sowing_area', 'nitrogen_usage', 'phosphorus_usage', 'potassium_usage', 'compound_usage', 'organic_usage', 'pesticide_usage', 'farmyard_usage', 'plastic_film_usage', 'nitrogen_cd_flux', 'phosphorus_cd_flux', 'potassium_cd_flux', 'compound_cd_flux', 'organic_cd_flux', 'pesticide_cd_flux', 'farmyard_cd_flux', 'plastic_film_cd_flux', 'total_cd_flux', 'data_year' ] # 数值型字段列表 self.numeric_columns = [ 'crop_sowing_area', 'nitrogen_usage', 'phosphorus_usage', 'potassium_usage', 'compound_usage', 'organic_usage', 'pesticide_usage', 'farmyard_usage', 'plastic_film_usage', 'nitrogen_cd_flux', 'phosphorus_cd_flux', 'potassium_cd_flux', 'compound_cd_flux', 'organic_cd_flux', 'pesticide_cd_flux', 'farmyard_cd_flux', 'plastic_film_cd_flux', 'total_cd_flux', 'data_year' ] def read_excel_data(self): """ 读取Excel文件数据 @returns: DataFrame 读取的数据 """ try: logger.info(f"开始读取Excel文件: {self.excel_path}") logger.info(f"Sheet名称: {self.sheet_name}") # 检查文件是否存在 if not os.path.exists(self.excel_path): raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}") # 读取Excel文件 df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name) logger.info(f"成功读取数据,共 {len(df)} 行") logger.info(f"数据列: {list(df.columns)}") # 显示前几行数据供确认 logger.info("前5行数据预览:") logger.info(df.head().to_string()) return df except Exception as e: logger.error(f"读取Excel文件失败: {str(e)}") raise def validate_data(self, df): """ 验证数据格式和完整性 @param {DataFrame} df - 要验证的数据 @returns: DataFrame 验证后的数据 """ try: logger.info("开始验证数据...") # 检查必需的列是否存在 missing_columns = [col for col in self.required_columns if col not in df.columns] if missing_columns: raise ValueError(f"缺少必需的列: {missing_columns}") # 将列名转换为小写(带下划线) df.columns = [col.lower() for col in df.columns] required_columns_lower = [col.lower() for col in self.required_columns] numeric_columns_lower = [col.lower() for col in self.numeric_columns] # 检查数据类型 logger.info("检查数据类型...") # 转换数值类型 for col in numeric_columns_lower: if col in df.columns: # 对于数值列,转换为浮点数 df[col] = pd.to_numeric(df[col], errors='coerce') # 处理特殊字段data_year(转换为整数) if col == 'data_year': df[col] = df[col].astype(pd.Int64Dtype(), errors='ignore') # 处理空值 - 所有字段必须非空(除了县市名称可能是文本) empty_columns = df.isnull().any() empty_cols = [col for col in empty_columns.index if empty_columns[col]] if empty_cols: logger.warning(f"发现以下列存在空值: {', '.join(empty_cols)}") # 对于数值列,如果有空值,填充为0 for col in numeric_columns_lower: if col in df.columns and df[col].isnull().any(): df[col] = df[col].fillna(0) logger.info(f"已将 {col} 的空值替换为0") # 再次检查县市名称 if 'county_name' in df.columns and df['county_name'].isnull().any(): logger.warning("县市名称存在空值,填充为'未知区域'") df['county_name'] = df['county_name'].fillna('未知区域') # 验证逻辑关系:总镉输入通量是否等于各分项之和 tolerance = 1e-6 total_calculated = ( df['nitrogen_cd_flux'] + df['phosphorus_cd_flux'] + df['potassium_cd_flux'] + df['compound_cd_flux'] + df['organic_cd_flux'] + df['pesticide_cd_flux'] + df['farmyard_cd_flux'] + df['plastic_film_cd_flux'] ) mismatches = abs(df['total_cd_flux'] - total_calculated) > tolerance if mismatches.any(): mismatched_indices = mismatches[mismatches].index.tolist() logger.warning(f"发现 {len(mismatched_indices)} 行 total_cd_flux 值与各分项之和不一致:") for i in mismatched_indices[:5]: # 只显示前5个示例 logger.warning(f"行 {i}: total_cd_flux={df.at[i, 'total_cd_flux']}, 计算值={total_calculated[i]}") # 用计算值覆盖原始值 df['total_cd_flux'] = total_calculated logger.info("已自动修正 total_cd_flux 值为各分项之和") logger.info(f"数据验证完成,有效数据 {len(df)} 行") return df except Exception as e: logger.error(f"数据验证失败: {str(e)}") raise def import_data(self, df): """ 将数据导入到数据库 @param {DataFrame} df - 要导入的数据 """ try: logger.info("开始导入数据到数据库...") # 创建数据库会话 db = SessionLocal() try: # 检查是否有重复数据 existing_count = db.query(AgriculturalData).count() logger.info(f"数据库中现有数据: {existing_count} 条") # 批量创建对象 batch_size = 1000 total_rows = len(df) imported_count = 0 for i in range(0, total_rows, batch_size): batch_df = df.iloc[i:i + batch_size] batch_objects = [] for _, row in batch_df.iterrows(): try: # 创建AgriculturalData对象 agricultural_data = AgriculturalData( county_name=str(row['county_name']), crop_sowing_area=float(row['crop_sowing_area']), nitrogen_usage=float(row['nitrogen_usage']), phosphorus_usage=float(row['phosphorus_usage']), potassium_usage=float(row['potassium_usage']), compound_usage=float(row['compound_usage']), organic_usage=float(row['organic_usage']), pesticide_usage=float(row['pesticide_usage']), farmyard_usage=float(row['farmyard_usage']), plastic_film_usage=float(row['plastic_film_usage']), nitrogen_cd_flux=float(row['nitrogen_cd_flux']), phosphorus_cd_flux=float(row['phosphorus_cd_flux']), potassium_cd_flux=float(row['potassium_cd_flux']), compound_cd_flux=float(row['compound_cd_flux']), organic_cd_flux=float(row['organic_cd_flux']), pesticide_cd_flux=float(row['pesticide_cd_flux']), farmyard_cd_flux=float(row['farmyard_cd_flux']), plastic_film_cd_flux=float(row['plastic_film_cd_flux']), total_cd_flux=float(row['total_cd_flux']), data_year=int(row['data_year']) ) batch_objects.append(agricultural_data) except Exception as e: logger.warning(f"跳过行 {i + _}: {str(e)}") continue if batch_objects: # 批量插入 db.add_all(batch_objects) db.commit() imported_count += len(batch_objects) logger.info(f"已导入 {imported_count}/{total_rows} 条数据") logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据") # 验证导入结果 final_count = db.query(AgriculturalData).count() logger.info(f"导入后数据库总数据: {final_count} 条") except Exception as e: db.rollback() logger.error(f"数据导入失败,已回滚: {str(e)}") raise finally: db.close() except Exception as e: logger.error(f"数据导入过程失败: {str(e)}") raise def run_import(self): """ 执行完整的导入流程 """ try: logger.info("=" * 60) logger.info("开始农业投入品数据导入流程") logger.info("=" * 60) # 1. 读取Excel数据 df = self.read_excel_data() # 2. 验证数据 df = self.validate_data(df) # 3. 导入数据 self.import_data(df) logger.info("=" * 60) logger.info("农业投入品数据导入流程完成!") logger.info("=" * 60) except Exception as e: logger.error(f"导入流程失败: {str(e)}") raise def main(): """ 主函数 """ # Excel文件路径 excel_path = r"D:\destkop\数据库对应数据.xlsx" # 与原始文件相同 sheet_name = "Agricultural" # 指定对应的sheet名称 try: # 创建导入器并执行导入 importer = AgriculturalDataImporter(excel_path, sheet_name) importer.run_import() except Exception as e: logger.error(f"程序执行失败: {str(e)}") sys.exit(1) if __name__ == "__main__": main()