""" 评价数据导入脚本 @description: 从Excel文件读取Assessment评价数据并导入到Assessment表 """ import os import sys import pandas as pd import logging from datetime import datetime # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app.database import engine, SessionLocal from app.models.assessment import Assessment # 确保已创建Assessment模型 # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class AssessmentDataImporter: """ 评价数据导入器 @description: 从Excel文件读取评价数据并导入到Assessment表 """ def __init__(self, excel_path, sheet_name='Assessment'): """ 初始化导入器 @param {str} excel_path - Excel文件路径 @param {str} sheet_name - Sheet名称,默认为'Assessment' """ self.excel_path = excel_path self.sheet_name = sheet_name # 用地类型映射 self.land_use_mapping = { '旱地': 0.0, '水田': 1.0, '水浇地': 2.0 } # 定义必需列 self.required_columns = [ 'Farmland_ID', 'Sample_ID', 'Type', 'IDW_2023SP_Cd', 'IDW_2023SP_pH', 'SOM_IDW', 'safety_production_threshold', 'pollution_risk_screening_value' ] def read_excel_data(self): """ 读取Excel文件数据 @returns: DataFrame 读取的数据 """ try: logger.info(f"开始读取Excel文件: {self.excel_path}") logger.info(f"Sheet名称: {self.sheet_name}") # 检查文件是否存在 if not os.path.exists(self.excel_path): raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}") # 读取Excel文件 df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name) logger.info(f"成功读取数据,共 {len(df)} 行") logger.info(f"数据列: {list(df.columns)}") # 显示前几行数据供确认 logger.info("前5行数据预览:") logger.info(df.head().to_string()) return df except Exception as e: logger.error(f"读取Excel文件失败: {str(e)}") raise def validate_data(self, df): """ 验证数据格式和完整性 @param {DataFrame} df - 要验证的数据 @returns: DataFrame 验证后的数据 """ try: logger.info("开始验证数据...") # 检查必需的列是否存在 missing_columns = [col for col in self.required_columns if col not in df.columns] if missing_columns: raise ValueError(f"缺少必需的列: {missing_columns}") # 检查Farmland_ID和Sample_ID是否重复 duplicates = df.duplicated(subset=['Farmland_ID', 'Sample_ID']) if duplicates.any(): dup_rows = df[duplicates] logger.warning(f"发现 {len(dup_rows)} 条重复记录(基于Farmland_ID和Sample_ID)") logger.info("重复记录示例:\n" + dup_rows.head().to_string()) # 删除重复行,保留第一个出现的 df = df.drop_duplicates(subset=['Farmland_ID', 'Sample_ID'], keep='first') logger.info(f"删除重复记录后剩余 {len(df)} 行数据") # 转换数值类型 numeric_columns = [ 'IDW_2023SP_Cd', 'IDW_2023SP_pH', 'SOM_IDW', 'safety_production_threshold', 'pollution_risk_screening_value' ] for col in numeric_columns: if col in df.columns: # 尝试转换为数值类型 df[col] = pd.to_numeric(df[col], errors='coerce') # 检查空值 if df[col].isnull().any(): invalid_rows = df[df[col].isnull()] logger.warning(f"列 {col} 中有无效值,行号: {list(invalid_rows.index)}") # 标记为无效但保留行,稍后处理 df[f'{col}_valid'] = ~df[col].isnull() # 转换Farmland_ID和Sample_ID为整数 for col in ['Farmland_ID', 'Sample_ID']: if col in df.columns: # 首先转换为浮点类型,再尝试转整数 df[col] = pd.to_numeric(df[col], errors='coerce').fillna(-1) df[col] = df[col].astype(int) # 检查无效值 if (df[col] < 0).any(): invalid_rows = df[df[col] < 0] logger.warning(f"列 {col} 中有无效值,行号: {list(invalid_rows.index)}") df[f'{col}_valid'] = (df[col] >= 0) # 用地类型转换 if 'Type' in df.columns: # 尝试直接转换为数值 df['Type_Numeric'] = pd.to_numeric(df['Type'], errors='coerce') # 处理无法转换的类型 unknown_types = df[df['Type_Numeric'].isnull()]['Type'].unique() if len(unknown_types) > 0: logger.info(f"发现未知用地类型: {unknown_types}, 尝试映射...") # 使用映射转换 df['Type_Mapped'] = df['Type'].map(self.land_use_mapping) # 合并两种转换方式 df['Final_Type'] = df['Type_Numeric'].fillna(df['Type_Mapped']) else: df['Final_Type'] = df['Type_Numeric'] # 检查是否还有无效值 if df['Final_Type'].isnull().any(): invalid_rows = df[df['Final_Type'].isnull()] logger.warning(f"列 Type 中有无法识别的值,行号: {list(invalid_rows.index)}") logger.info("为无效值设置默认值0.0(旱地)") df['Final_Type'] = df['Final_Type'].fillna(0.0) logger.info(f"数据验证完成,有效数据 {len(df)} 行") return df except Exception as e: logger.error(f"数据验证失败: {str(e)}") raise def create_assessment_object(self, row): """ 创建评价数据对象 @param {pd.Series} row - 数据行 @returns: Assessment 对象 """ try: return Assessment( farmland_id=row['Farmland_ID'], sample_id=row['Sample_ID'], type=row['Final_Type'], idw_2023sp_cd=row['IDW_2023SP_Cd'], idw_2023sp_ph=row['IDW_2023SP_pH'], som_idw=row['SOM_IDW'], safety_production_threshold=row['safety_production_threshold'], pollution_risk_screening_value=row['pollution_risk_screening_value'] ) except KeyError as e: logger.warning(f"创建对象时缺少必要字段: {str(e)}") return None except Exception as e: logger.warning(f"创建Assessment对象失败: {str(e)}") return None def import_data(self, df): """ 将数据导入到数据库 @param {DataFrame} df - 要导入的数据 """ try: logger.info("开始导入数据到数据库...") # 创建数据库会话 db = SessionLocal() try: # 检查现有数据量 existing_count = db.query(Assessment).count() logger.info(f"数据库中现有评价数据记录: {existing_count} 条") # 批量创建对象并导入 total_rows = len(df) imported_count = 0 skipped_count = 0 invalid_count = 0 # 分批处理数据 for i, row in df.iterrows(): try: # 检查是否有效行(所有关键字段都有效) is_valid = True for col in self.required_columns: if f'{col}_valid' in row and not row[f'{col}_valid']: is_valid = False break if not is_valid: invalid_count += 1 logger.debug(f"跳过无效行 {i}: 存在无效值") continue # 创建Assessment对象 assessment = self.create_assessment_object(row) if not assessment: skipped_count += 1 continue # 添加到会话 db.add(assessment) imported_count += 1 # 每50条提交一次 if imported_count % 50 == 0: db.commit() logger.info(f"已导入 {imported_count}/{total_rows} 条数据") except Exception as e: logger.warning(f"导入行 {i} 时出错: {str(e)}") skipped_count += 1 db.rollback() # 提交剩余数据 db.commit() # 更新统计信息 new_count = db.query(Assessment).count() added_count = new_count - existing_count logger.info(f"评价数据导入完成!") logger.info(f"成功导入: {imported_count} 条") logger.info(f"跳过无效数据: {invalid_count} 条") logger.info(f"处理失败: {skipped_count} 条") logger.info(f"数据库中新增加: {added_count} 条记录") logger.info(f"数据库总记录: {new_count} 条") except Exception as e: db.rollback() logger.error(f"数据导入失败,已回滚: {str(e)}") raise finally: db.close() except Exception as e: logger.error(f"数据导入过程失败: {str(e)}") raise def run_import(self): """ 执行完整的导入流程 """ try: logger.info("=" * 60) logger.info("开始评价数据导入流程") logger.info("=" * 60) # 1. 读取Excel数据 df = self.read_excel_data() # 2. 验证数据 df = self.validate_data(df) # 3. 导入数据 self.import_data(df) logger.info("=" * 60) logger.info("评价数据导入流程完成!") logger.info("=" * 60) except Exception as e: logger.error(f"导入流程失败: {str(e)}") raise def main(): """ 主函数 """ # Excel文件路径 excel_path = r"D:\destkop\数据库对应数据.xlsx" # 根据实际路径修改 sheet_name = "Assessment" # 确保Excel中有这个sheet try: # 创建导入器并执行导入 importer = AssessmentDataImporter(excel_path, sheet_name) importer.run_import() except Exception as e: logger.error(f"程序执行失败: {str(e)}") sys.exit(1) if __name__ == "__main__": main()