""" 络合模型输出数据导入脚本 @description: 从Excel文件读取MSM_output数据并导入到MSM_output_data表 """ import os import sys import pandas as pd import logging from datetime import datetime # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app.database import engine, SessionLocal from app.models.MSM_output import MSMOutputData # 确保已创建MSMOutputData模型 # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class MSMOutputDataImporter: """ 络合模型输出数据导入器 @description: 从Excel文件读取MSM输出数据并导入到MSM_output_data表 """ def __init__(self, excel_path, sheet_name='MSM_output'): """ 初始化导入器 @param {str} excel_path - Excel文件路径 @param {str} sheet_name - Sheet名称,默认为'MSM_output' """ self.excel_path = excel_path self.sheet_name = sheet_name # 定义必需列 self.required_columns = [ 'Farmland_ID', 'Sample_ID', 'Var:', 'Cd.solution' ] def read_excel_data(self): """ 读取Excel文件数据 @returns: DataFrame 读取的数据 """ try: logger.info(f"开始读取Excel文件: {self.excel_path}") logger.info(f"Sheet名称: {self.sheet_name}") # 检查文件是否存在 if not os.path.exists(self.excel_path): raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}") # 读取Excel文件 df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name) logger.info(f"成功读取数据,共 {len(df)} 行") logger.info(f"数据列: {list(df.columns)}") # 显示前几行数据供确认 logger.info("前5行数据预览:") logger.info(df.head().to_string()) return df except Exception as e: logger.error(f"读取Excel文件失败: {str(e)}") raise def validate_data(self, df): """ 验证数据格式和完整性 @param {DataFrame} df - 要验证的数据 @returns: DataFrame 验证后的数据 """ try: logger.info("开始验证数据...") # 检查必需的列是否存在 missing_columns = [col for col in self.required_columns if col not in df.columns] if missing_columns: raise ValueError(f"缺少必需的列: {missing_columns}") # 检查Farmland_ID、Sample_ID和Var:是否重复 duplicates = df.duplicated(subset=['Farmland_ID', 'Sample_ID', 'Var:']) if duplicates.any(): dup_rows = df[duplicates] logger.warning(f"发现 {len(dup_rows)} 条重复记录(基于Farmland_ID, Sample_ID和Var:)") logger.info("重复记录示例:\n" + dup_rows.head().to_string()) # 删除重复行,保留第一个出现的 df = df.drop_duplicates(subset=['Farmland_ID', 'Sample_ID', 'Var:'], keep='first') logger.info(f"删除重复记录后剩余 {len(df)} 行数据") # 处理字符串列 string_columns = ['Var:'] for col in string_columns: if col in df.columns: df[col] = df[col].astype(str).fillna('') # 处理数值列 numeric_columns = ['Cd.solution'] for col in numeric_columns: if col in df.columns: # 尝试转换为数值类型 df[col] = pd.to_numeric(df[col], errors='coerce') # 检查空值 null_count = df[col].isnull().sum() if null_count > 0: logger.warning(f"列 {col} 中有 {null_count} 个空值或无效值") # 标记为无效 df[f'{col}_invalid'] = df[col].isnull() # 处理Farmland_ID和Sample_ID for col in ['Farmland_ID', 'Sample_ID']: if col in df.columns: # 尝试转换为整数 df[col] = pd.to_numeric(df[col], errors='coerce').fillna(-1) # 检查无效值 invalid_ids = df[df[col] < 0] if not invalid_ids.empty: logger.warning(f"列 {col} 中有 {len(invalid_ids)} 条无效值") logger.info("问题行:\n" + invalid_ids.head().to_string()) # 标记为无效 df[f'{col}_invalid'] = df[col] < 0 logger.info(f"数据验证完成,有效数据 {len(df)} 行") return df except Exception as e: logger.error(f"数据验证失败: {str(e)}") raise def create_msm_output_object(self, row): """ 创建MSM输出数据对象 @param {pd.Series} row - 数据行 @returns: MSMOutputData 对象 """ try: # 处理无效数据 invalid_fields = [] for col in self.required_columns: if f'{col}_invalid' in row and row[f'{col}_invalid']: invalid_fields.append(col) if invalid_fields: logger.warning( f"跳过无效行: Farmland_ID={row['Farmland_ID']}, Sample_ID={row['Sample_ID']}, 无效字段: {', '.join(invalid_fields)}") return None # 创建对象 return MSMOutputData( farmland_id=int(row['Farmland_ID']), sample_id=int(row['Sample_ID']), var=row['Var:'], cd_solution=row['Cd.solution'] ) except KeyError as e: logger.warning(f"创建对象时缺少必要字段: {str(e)}") return None except Exception as e: logger.warning(f"创建MSMOutputData对象失败: {str(e)}") return None def import_data(self, df): """ 将数据导入到数据库 @param {DataFrame} df - 要导入的数据 """ try: logger.info("开始导入数据到数据库...") # 创建数据库会话 db = SessionLocal() try: # 检查现有数据量 existing_count = db.query(MSMOutputData).count() logger.info(f"数据库中现有MSM输出数据记录: {existing_count} 条") # 批量创建对象并导入 total_rows = len(df) imported_count = 0 skipped_count = 0 invalid_count = 0 batch_size = 100 objects_to_insert = [] # 准备批量处理 for i, row in df.iterrows(): # 跳过前处理无效数据 invalid = False for col in self.required_columns: if f'{col}_invalid' in row and row[f'{col}_invalid']: invalid = True break if invalid: invalid_count += 1 continue try: obj = self.create_msm_output_object(row) if not obj: skipped_count += 1 continue objects_to_insert.append(obj) imported_count += 1 # 每100条提交一次 if len(objects_to_insert) >= batch_size: db.add_all(objects_to_insert) db.commit() logger.info(f"已批量导入 {imported_count}/{total_rows} 条数据") objects_to_insert = [] except Exception as e: logger.warning(f"处理行 {i} 时出错: {str(e)}") skipped_count += 1 db.rollback() # 提交剩余数据 if objects_to_insert: db.add_all(objects_to_insert) db.commit() # 更新统计信息 new_count = db.query(MSMOutputData).count() added_count = new_count - existing_count logger.info(f"MSM输出数据导入完成!") logger.info(f"尝试导入行数: {total_rows}") logger.info(f"成功导入: {imported_count} 条") logger.info(f"跳过无效数据: {invalid_count} 条") logger.info(f"处理失败: {skipped_count} 条") logger.info(f"数据库中新增加: {added_count} 条记录") logger.info(f"数据库总记录: {new_count} 条") except Exception as e: db.rollback() logger.error(f"数据导入失败,已回滚: {str(e)}") raise finally: db.close() except Exception as e: logger.error(f"数据导入过程失败: {str(e)}") raise def run_import(self): """ 执行完整的导入流程 """ try: logger.info("=" * 60) logger.info("开始MSM输出数据导入流程") logger.info("=" * 60) # 1. 读取Excel数据 df = self.read_excel_data() # 2. 验证数据 df = self.validate_data(df) # 3. 导入数据 self.import_data(df) logger.info("=" * 60) logger.info("MSM输出数据导入流程完成!") logger.info("=" * 60) except Exception as e: logger.error(f"导入流程失败: {str(e)}") raise def main(): """ 主函数 """ # Excel文件路径 excel_path = r"D:\destkop\数据库对应数据.xlsx" # 根据实际路径修改 sheet_name = "MSM_output" # 确保Excel中有这个sheet try: # 创建导入器并执行导入 importer = MSMOutputDataImporter(excel_path, sheet_name) importer.run_import() except Exception as e: logger.error(f"程序执行失败: {str(e)}") sys.exit(1) if __name__ == "__main__": main()