""" 络合模型输入数据导入脚本 @description: 从Excel文件读取MSM_input数据并导入到MSM_input_data表 """ import os import sys import pandas as pd import numpy as np import logging from datetime import datetime # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app.database import engine, SessionLocal from app.models.MSM_input import MSMInputData # 确保已创建MSMInputData模型 from sqlalchemy.orm import sessionmaker # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class MSMInputDataImporter: """ 络合模型输入数据导入器 @description: 从Excel文件读取MSM输入数据并导入到MSM_input_data表 """ def __init__(self, excel_path, sheet_name='MSM_input'): """ 初始化导入器 @param {str} excel_path - Excel文件路径 @param {str} sheet_name - Sheet名称,默认为'MSM_input' """ self.excel_path = excel_path self.sheet_name = sheet_name # 定义必需列 self.required_columns = [ 'Farmland_ID', 'Sample_ID', 'Var:', 'CO2[g].tot', 'watervolume', 'SL', 'pH', 'Ca+2.tot', 'Mg+2.tot', 'K+.tot', 'Na+.tot', 'Cl-.tot', 'Cd.tot', 'HFO_kgkg', 'CLAY_kgkg', 'HA_kgkg', 'FA_kgkg' ] def read_excel_data(self): """ 读取Excel文件数据 @returns: DataFrame 读取的数据 """ try: logger.info(f"开始读取Excel文件: {self.excel_path}") logger.info(f"Sheet名称: {self.sheet_name}") # 检查文件是否存在 if not os.path.exists(self.excel_path): raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}") # 读取Excel文件 df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name) logger.info(f"成功读取数据,共 {len(df)} 行") logger.info(f"数据列: {list(df.columns)}") # 显示前几行数据供确认 logger.info("前5行数据预览:") logger.info(df.head().to_string()) return df except Exception as e: logger.error(f"读取Excel文件失败: {str(e)}") raise def validate_data(self, df): """ 验证数据格式和完整性 @param {DataFrame} df - 要验证的数据 @returns: DataFrame 验证后的数据 """ try: logger.info("开始验证数据...") # 检查必需的列是否存在 missing_columns = [col for col in self.required_columns if col not in df.columns] if missing_columns: raise ValueError(f"缺少必需的列: {missing_columns}") # 检查Farmland_ID和Sample_ID是否重复 duplicates = df.duplicated(subset=['Farmland_ID', 'Sample_ID', 'Var:']) if duplicates.any(): dup_rows = df[duplicates] logger.warning(f"发现 {len(dup_rows)} 条重复记录(基于Farmland_ID和Sample_ID和Var:)") logger.info("重复记录示例:\n" + dup_rows.head().to_string()) # 删除重复行,保留第一个出现的 df = df.drop_duplicates(subset=['Farmland_ID', 'Sample_ID', 'Var:'], keep='first') logger.info(f"删除重复记录后剩余 {len(df)} 行数据") # 处理字符串列 string_columns = ['Var:'] for col in string_columns: if col in df.columns: df[col] = df[col].astype(str).fillna('') # 处理数值列 numeric_columns = [ 'CO2[g].tot', 'watervolume', 'SL', 'pH', 'Ca+2.tot', 'Mg+2.tot', 'K+.tot', 'Na+.tot', 'Cl-.tot', 'Cd.tot', 'HFO_kgkg', 'CLAY_kgkg', 'HA_kgkg', 'FA_kgkg' ] for col in numeric_columns: if col in df.columns: # 尝试转换为数值类型 df[col] = pd.to_numeric(df[col], errors='coerce') # 检查空值 null_count = df[col].isnull().sum() if null_count > 0: logger.warning(f"列 {col} 中有 {null_count} 个空值或无效值") # 对于关键计算列,如果没有数据可能需要设置默认值或跳过 if col in ['pH', 'Cd.tot']: logger.error(f"关键列 {col} 存在空值,需要处理") invalid_rows = df[df[col].isnull()] logger.info("问题行:\n" + invalid_rows.head().to_string()) # 标记为无效 df[f'{col}_invalid'] = df[col].isnull() # 处理Farmland_ID和Sample_ID for col in ['Farmland_ID', 'Sample_ID']: if col in df.columns: # 尝试转换为整数 df[col] = pd.to_numeric(df[col], errors='coerce').fillna(-1) # 检查无效值 invalid_ids = df[df[col] < 0] if not invalid_ids.empty: logger.warning(f"列 {col} 中有 {len(invalid_ids)} 条无效值") logger.info("问题行:\n" + invalid_ids.head().to_string()) # 标记为无效 df[f'{col}_invalid'] = df[col] < 0 # 验证计算列的逻辑一致性 if 'Cl-.tot' in df.columns and all( [c in df.columns for c in ['Ca+2.tot', 'Mg+2.tot', 'K+.tot', 'Na+.tot']]): # 计算理论Cl-总量 theoretical_cl = (df['Ca+2.tot'] + df['Mg+2.tot']) * 2 + df['K+.tot'] + df['Na+.tot'] # 检查与提供的Cl-总量的差异 cl_diff = abs(df['Cl-.tot'] - theoretical_cl) / theoretical_cl outlier_mask = cl_diff > 0.1 # 超过10%差异的视为异常 if outlier_mask.sum() > 0: logger.warning(f"发现 {outlier_mask.sum()} 条记录的Cl-.tot值与计算值存在显著差异(>10%)") df['Cl-.tot_consistency'] = ~outlier_mask logger.info(f"数据验证完成,有效数据 {len(df)} 行") return df except Exception as e: logger.error(f"数据验证失败: {str(e)}") raise def create_msm_input_object(self, row): """ 创建MSM输入数据对象 @param {pd.Series} row - 数据行 @returns: MSMInputData 对象 """ try: # 处理无效数据 invalid_fields = [] for col in self.required_columns: if f'{col}_invalid' in row and row[f'{col}_invalid']: invalid_fields.append(col) if invalid_fields: logger.warning( f"跳过无效行: Farmland_ID={row['Farmland_ID']}, Sample_ID={row['Sample_ID']}, 无效字段: {', '.join(invalid_fields)}") return None # 创建对象 return MSMInputData( farmland_id=int(row['Farmland_ID']), sample_id=int(row['Sample_ID']), var=row['Var:'], co2_tot=row['CO2[g].tot'], water_volume=row['watervolume'], sl_ratio=row['SL'], ph_value=row['pH'], ca_tot=row['Ca+2.tot'], mg_tot=row['Mg+2.tot'], k_tot=row['K+.tot'], na_tot=row['Na+.tot'], cl_tot=row['Cl-.tot'], cd_tot=row['Cd.tot'], hfo_kgkg=row['HFO_kgkg'], clay_kgkg=row['CLAY_kgkg'], ha_kgkg=row['HA_kgkg'], fa_kgkg=row['FA_kgkg'] ) except KeyError as e: logger.warning(f"创建对象时缺少必要字段: {str(e)}") return None except Exception as e: logger.warning(f"创建MSMInputData对象失败: {str(e)}") return None def import_data(self, df): """ 将数据导入到数据库 @param {DataFrame} df - 要导入的数据 """ try: logger.info("开始导入数据到数据库...") # 创建数据库会话 db = SessionLocal() try: # 检查现有数据量 existing_count = db.query(MSMInputData).count() logger.info(f"数据库中现有MSM输入数据记录: {existing_count} 条") # 批量创建对象并导入 total_rows = len(df) imported_count = 0 skipped_count = 0 invalid_count = 0 batch_size = 100 objects_to_insert = [] # 准备批量处理 for i, row in df.iterrows(): # 跳过前处理无效数据 invalid = False for col in self.required_columns: if f'{col}_invalid' in row and row[f'{col}_invalid']: invalid = True break if invalid: invalid_count += 1 continue try: obj = self.create_msm_input_object(row) if not obj: skipped_count += 1 continue objects_to_insert.append(obj) imported_count += 1 # 每100条提交一次 if len(objects_to_insert) >= batch_size: db.add_all(objects_to_insert) db.commit() logger.info(f"已批量导入 {imported_count}/{total_rows} 条数据") objects_to_insert = [] except Exception as e: logger.warning(f"处理行 {i} 时出错: {str(e)}") skipped_count += 1 db.rollback() # 提交剩余数据 if objects_to_insert: db.add_all(objects_to_insert) db.commit() # 更新统计信息 new_count = db.query(MSMInputData).count() added_count = new_count - existing_count logger.info(f"MSM输入数据导入完成!") logger.info(f"尝试导入行数: {total_rows}") logger.info(f"成功导入: {imported_count} 条") logger.info(f"跳过无效数据: {invalid_count} 条") logger.info(f"处理失败: {skipped_count} 条") logger.info(f"数据库中新增加: {added_count} 条记录") logger.info(f"数据库总记录: {new_count} 条") except Exception as e: db.rollback() logger.error(f"数据导入失败,已回滚: {str(e)}") raise finally: db.close() except Exception as e: logger.error(f"数据导入过程失败: {str(e)}") raise def run_import(self): """ 执行完整的导入流程 """ try: logger.info("=" * 60) logger.info("开始MSM输入数据导入流程") logger.info("=" * 60) # 1. 读取Excel数据 df = self.read_excel_data() # 2. 验证数据 df = self.validate_data(df) # 3. 导入数据 self.import_data(df) logger.info("=" * 60) logger.info("MSM输入数据导入流程完成!") logger.info("=" * 60) except Exception as e: logger.error(f"导入流程失败: {str(e)}") raise def main(): """ 主函数 """ # Excel文件路径 excel_path = r"D:\destkop\数据库对应数据.xlsx" # 根据实际路径修改 sheet_name = "MSM_input" # 确保Excel中有这个sheet try: # 创建导入器并执行导入 importer = MSMInputDataImporter(excel_path, sheet_name) importer.run_import() except Exception as e: logger.error(f"程序执行失败: {str(e)}") sys.exit(1) if __name__ == "__main__": main()