123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- """
- 土壤数据导入脚本
- @description: 从Excel文件读取Soil数据并导入到Soil_data表
- """
- import os
- import sys
- import pandas as pd
- import logging
- from datetime import datetime
- from sqlalchemy.orm import sessionmaker
- # 添加项目根目录到Python路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from app.database import engine, SessionLocal
- from app.models.soil import SoilData # 假设已创建SoilData模型
- # 设置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class SoilDataImporter:
- """
- 土壤数据导入器
- @description: 从Excel文件读取土壤数据并导入到数据库
- """
- def __init__(self, excel_path, sheet_name='Soil'):
- """
- 初始化导入器
- @param {str} excel_path - Excel文件路径
- @param {str} sheet_name - Sheet名称,默认为'Soil'
- """
- self.excel_path = excel_path
- self.sheet_name = sheet_name
- # 定义默认值
- self.default_values = {
- 'DX_Cd': 0.023,
- 'DB_Cd': 0.368
- }
- def read_excel_data(self):
- """
- 读取Excel文件数据
- @returns: DataFrame 读取的数据
- """
- try:
- logger.info(f"开始读取Excel文件: {self.excel_path}")
- logger.info(f"Sheet名称: {self.sheet_name}")
- # 检查文件是否存在
- if not os.path.exists(self.excel_path):
- raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
- # 读取Excel文件
- df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
- logger.info(f"成功读取数据,共 {len(df)} 行")
- logger.info(f"数据列: {list(df.columns)}")
- # 显示前几行数据供确认
- logger.info("前5行数据预览:")
- logger.info(df.head().to_string())
- return df
- except Exception as e:
- logger.error(f"读取Excel文件失败: {str(e)}")
- raise
- def validate_data(self, df):
- """
- 验证数据格式和完整性
- @param {DataFrame} df - 要验证的数据
- @returns: DataFrame 验证后的数据
- """
- try:
- logger.info("开始验证数据...")
- # 检查必需的列是否存在
- required_columns = ['Farmland_ID', 'Sample_ID']
- missing_columns = [col for col in required_columns if col not in df.columns]
- if missing_columns:
- raise ValueError(f"缺少必需的列: {missing_columns}")
- # 检查数据类型
- logger.info("检查数据类型...")
- # 转换数值类型
- numeric_cols = [
- 'Farmland_ID', 'Sample_ID', '0002IDW', 'bd020_90', 'POR_Layer',
- 'ExAl_IDW', 'ExCa_IDW', 'ExK_IDW', 'ExMg_IDW', 'ExNa_IDW',
- 'Fed_IDW', 'SOM_IDW', 'IDW_2013PC_Cd', 'IDW_2018XC_Cd', 'IDW_2023SP_Cd',
- 'IDW_2013PC_pH', 'IDW_2018XC_pH', 'IDW_2023SP_pH', '002_0002IDW',
- '02_002IDW', '2_02IDW', 'AvaK_IDW', 'AvaP_IDW', 'CEC_IDW', 'EC_IDW',
- 'OC-Fe_0-30', 'SAvaK_IDW', 'TAl_IDW', 'TCa_IDW', 'TCd_IDW', 'TEB_IDW',
- 'TExH_IDW', 'TFe_IDW', 'TK_IDW', 'TMg_IDW', 'TMn_IDW', 'TN_IDW',
- 'TP_IDW', 'TS_IDW', 'DQCJ_Cd', 'GGS_Cd', 'DX_Cd', 'DB_Cd'
- ]
- # 只处理存在的列
- existing_numeric_cols = [col for col in numeric_cols if col in df.columns]
- # 转换数值类型
- for col in existing_numeric_cols:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- # 应用默认值
- for col, default_val in self.default_values.items():
- if col in df.columns:
- df[col].fillna(default_val, inplace=True)
- # 删除Farmland_ID或Sample_ID为空的行
- initial_count = len(df)
- df = df.dropna(subset=['Farmland_ID', 'Sample_ID'])
- if len(df) < initial_count:
- logger.warning(f"删除 {initial_count - len(df)} 行缺少必需值的数据")
- logger.info(f"数据验证完成,有效数据 {len(df)} 行")
- return df
- except Exception as e:
- logger.error(f"数据验证失败: {str(e)}")
- raise
- def import_data(self, df):
- """
- 将数据导入到数据库
- @param {DataFrame} df - 要导入的数据
- """
- try:
- logger.info("开始导入数据到数据库...")
- # 创建数据库会话
- db = SessionLocal()
- try:
- # 检查是否有重复数据
- existing_count = db.query(SoilData).count()
- logger.info(f"数据库中现有数据: {existing_count} 条")
- # 批量创建对象
- batch_size = 1000
- total_rows = len(df)
- imported_count = 0
- for i in range(0, total_rows, batch_size):
- batch_df = df.iloc[i:i + batch_size]
- batch_objects = []
- for _, row in batch_df.iterrows():
- try:
- # 创建SoilData对象
- soil_data = SoilData(
- farmland_id=int(row['Farmland_ID']),
- sample_id=int(row['Sample_ID']),
- clay_0002IDW=row.get('0002IDW'),
- bd020_90=row.get('bd020_90'),
- por_layer=row.get('POR_Layer'),
- exal_idw=row.get('ExAl_IDW'),
- exca_idw=row.get('ExCa_IDW'),
- exk_idw=row.get('ExK_IDW'),
- exmg_idw=row.get('ExMg_IDW'),
- exna_idw=row.get('ExNa_IDW'),
- fed_idw=row.get('Fed_IDW'),
- som_idw=row.get('SOM_IDW'),
- idw_2013pc_cd=row.get('IDW_2013PC_Cd'),
- idw_2018xc_cd=row.get('IDW_2018XC_Cd'),
- idw_2023sp_cd=row.get('IDW_2023SP_Cd'),
- idw_2013pc_ph=row.get('IDW_2013PC_pH'),
- idw_2018xc_ph=row.get('IDW_2018XC_pH'),
- idw_2023sp_ph=row.get('IDW_2023SP_pH'),
- silt_002_0002IDW=row.get('002_0002IDW'),
- sand_02_002IDW=row.get('02_002IDW'),
- gravel_2_02IDW=row.get('2_02IDW'),
- avak_idw=row.get('AvaK_IDW'),
- avap_idw=row.get('AvaP_IDW'),
- cec_idw=row.get('CEC_IDW'),
- ec_idw=row.get('EC_IDW'),
- oc_fe_0_30=row.get('OC-Fe_0-30'),
- savak_idw=row.get('SAvaK_IDW'),
- tal_idw=row.get('TAl_IDW'),
- tca_idw=row.get('TCa_IDW'),
- tcd_idw=row.get('TCd_IDW'),
- teb_idw=row.get('TEB_IDW'),
- texh_idw=row.get('TExH_IDW'),
- tfe_idw=row.get('TFe_IDW'),
- tk_idw=row.get('TK_IDW'),
- tmg_idw=row.get('TMg_IDW'),
- tmn_idw=row.get('TMn_IDW'),
- tn_idw=row.get('TN_IDW'),
- tp_idw=row.get('TP_IDW'),
- ts_idw=row.get('TS_IDW'),
- dqcj_cd=row.get('DQCJ_Cd'),
- ggs_cd=row.get('GGS_Cd'),
- dx_cd=row.get('DX_Cd', self.default_values['DX_Cd']),
- db_cd=row.get('DB_Cd', self.default_values['DB_Cd'])
- )
- batch_objects.append(soil_data)
- except Exception as e:
- logger.warning(f"跳过行 {i + _}: {str(e)}")
- continue
- if batch_objects:
- # 批量插入
- db.add_all(batch_objects)
- db.commit()
- imported_count += len(batch_objects)
- logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
- logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
- # 验证导入结果
- final_count = db.query(SoilData).count()
- logger.info(f"导入后数据库总数据: {final_count} 条")
- except Exception as e:
- db.rollback()
- logger.error(f"数据导入失败,已回滚: {str(e)}")
- raise
- finally:
- db.close()
- except Exception as e:
- logger.error(f"数据导入过程失败: {str(e)}")
- raise
- def run_import(self):
- """
- 执行完整的导入流程
- """
- try:
- logger.info("=" * 60)
- logger.info("开始土壤数据导入流程")
- logger.info("=" * 60)
- # 1. 读取Excel数据
- df = self.read_excel_data()
- # 2. 验证数据
- df = self.validate_data(df)
- # 3. 导入数据
- self.import_data(df)
- logger.info("=" * 60)
- logger.info("土壤数据导入流程完成!")
- logger.info("=" * 60)
- except Exception as e:
- logger.error(f"导入流程失败: {str(e)}")
- raise
- def main():
- """
- 主函数
- """
- # Excel文件路径
- excel_path = r"D:\destkop\数据库对应数据.xlsx"
- sheet_name = "Soil"
- try:
- # 创建导入器并执行导入
- importer = SoilDataImporter(excel_path, sheet_name)
- importer.run_import()
- except Exception as e:
- logger.error(f"程序执行失败: {str(e)}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|