import_MSM_input.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. """
  2. 络合模型输入数据导入脚本
  3. @description: 从Excel文件读取MSM_input数据并导入到MSM_input_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import numpy as np
  9. import logging
  10. from datetime import datetime
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. from app.database import engine, SessionLocal
  14. from app.models.MSM_input import MSMInputData # 确保已创建MSMInputData模型
  15. from sqlalchemy.orm import sessionmaker
  16. # 设置日志
  17. logging.basicConfig(
  18. level=logging.INFO,
  19. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  20. )
  21. logger = logging.getLogger(__name__)
  22. class MSMInputDataImporter:
  23. """
  24. 络合模型输入数据导入器
  25. @description: 从Excel文件读取MSM输入数据并导入到MSM_input_data表
  26. """
  27. def __init__(self, excel_path, sheet_name='MSM_input'):
  28. """
  29. 初始化导入器
  30. @param {str} excel_path - Excel文件路径
  31. @param {str} sheet_name - Sheet名称,默认为'MSM_input'
  32. """
  33. self.excel_path = excel_path
  34. self.sheet_name = sheet_name
  35. # 定义必需列
  36. self.required_columns = [
  37. 'Farmland_ID',
  38. 'Sample_ID',
  39. 'Var:',
  40. 'CO2[g].tot',
  41. 'watervolume',
  42. 'SL',
  43. 'pH',
  44. 'Ca+2.tot',
  45. 'Mg+2.tot',
  46. 'K+.tot',
  47. 'Na+.tot',
  48. 'Cl-.tot',
  49. 'Cd.tot',
  50. 'HFO_kgkg',
  51. 'CLAY_kgkg',
  52. 'HA_kgkg',
  53. 'FA_kgkg'
  54. ]
  55. def read_excel_data(self):
  56. """
  57. 读取Excel文件数据
  58. @returns: DataFrame 读取的数据
  59. """
  60. try:
  61. logger.info(f"开始读取Excel文件: {self.excel_path}")
  62. logger.info(f"Sheet名称: {self.sheet_name}")
  63. # 检查文件是否存在
  64. if not os.path.exists(self.excel_path):
  65. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  66. # 读取Excel文件
  67. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  68. logger.info(f"成功读取数据,共 {len(df)} 行")
  69. logger.info(f"数据列: {list(df.columns)}")
  70. # 显示前几行数据供确认
  71. logger.info("前5行数据预览:")
  72. logger.info(df.head().to_string())
  73. return df
  74. except Exception as e:
  75. logger.error(f"读取Excel文件失败: {str(e)}")
  76. raise
  77. def validate_data(self, df):
  78. """
  79. 验证数据格式和完整性
  80. @param {DataFrame} df - 要验证的数据
  81. @returns: DataFrame 验证后的数据
  82. """
  83. try:
  84. logger.info("开始验证数据...")
  85. # 检查必需的列是否存在
  86. missing_columns = [col for col in self.required_columns if col not in df.columns]
  87. if missing_columns:
  88. raise ValueError(f"缺少必需的列: {missing_columns}")
  89. # 检查Farmland_ID和Sample_ID是否重复
  90. duplicates = df.duplicated(subset=['Farmland_ID', 'Sample_ID', 'Var:'])
  91. if duplicates.any():
  92. dup_rows = df[duplicates]
  93. logger.warning(f"发现 {len(dup_rows)} 条重复记录(基于Farmland_ID和Sample_ID和Var:)")
  94. logger.info("重复记录示例:\n" + dup_rows.head().to_string())
  95. # 删除重复行,保留第一个出现的
  96. df = df.drop_duplicates(subset=['Farmland_ID', 'Sample_ID', 'Var:'], keep='first')
  97. logger.info(f"删除重复记录后剩余 {len(df)} 行数据")
  98. # 处理字符串列
  99. string_columns = ['Var:']
  100. for col in string_columns:
  101. if col in df.columns:
  102. df[col] = df[col].astype(str).fillna('')
  103. # 处理数值列
  104. numeric_columns = [
  105. 'CO2[g].tot', 'watervolume', 'SL', 'pH',
  106. 'Ca+2.tot', 'Mg+2.tot', 'K+.tot', 'Na+.tot',
  107. 'Cl-.tot', 'Cd.tot', 'HFO_kgkg', 'CLAY_kgkg',
  108. 'HA_kgkg', 'FA_kgkg'
  109. ]
  110. for col in numeric_columns:
  111. if col in df.columns:
  112. # 尝试转换为数值类型
  113. df[col] = pd.to_numeric(df[col], errors='coerce')
  114. # 检查空值
  115. null_count = df[col].isnull().sum()
  116. if null_count > 0:
  117. logger.warning(f"列 {col} 中有 {null_count} 个空值或无效值")
  118. # 对于关键计算列,如果没有数据可能需要设置默认值或跳过
  119. if col in ['pH', 'Cd.tot']:
  120. logger.error(f"关键列 {col} 存在空值,需要处理")
  121. invalid_rows = df[df[col].isnull()]
  122. logger.info("问题行:\n" + invalid_rows.head().to_string())
  123. # 标记为无效
  124. df[f'{col}_invalid'] = df[col].isnull()
  125. # 处理Farmland_ID和Sample_ID
  126. for col in ['Farmland_ID', 'Sample_ID']:
  127. if col in df.columns:
  128. # 尝试转换为整数
  129. df[col] = pd.to_numeric(df[col], errors='coerce').fillna(-1)
  130. # 检查无效值
  131. invalid_ids = df[df[col] < 0]
  132. if not invalid_ids.empty:
  133. logger.warning(f"列 {col} 中有 {len(invalid_ids)} 条无效值")
  134. logger.info("问题行:\n" + invalid_ids.head().to_string())
  135. # 标记为无效
  136. df[f'{col}_invalid'] = df[col] < 0
  137. # 验证计算列的逻辑一致性
  138. if 'Cl-.tot' in df.columns and all(
  139. [c in df.columns for c in ['Ca+2.tot', 'Mg+2.tot', 'K+.tot', 'Na+.tot']]):
  140. # 计算理论Cl-总量
  141. theoretical_cl = (df['Ca+2.tot'] + df['Mg+2.tot']) * 2 + df['K+.tot'] + df['Na+.tot']
  142. # 检查与提供的Cl-总量的差异
  143. cl_diff = abs(df['Cl-.tot'] - theoretical_cl) / theoretical_cl
  144. outlier_mask = cl_diff > 0.1 # 超过10%差异的视为异常
  145. if outlier_mask.sum() > 0:
  146. logger.warning(f"发现 {outlier_mask.sum()} 条记录的Cl-.tot值与计算值存在显著差异(>10%)")
  147. df['Cl-.tot_consistency'] = ~outlier_mask
  148. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  149. return df
  150. except Exception as e:
  151. logger.error(f"数据验证失败: {str(e)}")
  152. raise
  153. def create_msm_input_object(self, row):
  154. """
  155. 创建MSM输入数据对象
  156. @param {pd.Series} row - 数据行
  157. @returns: MSMInputData 对象
  158. """
  159. try:
  160. # 处理无效数据
  161. invalid_fields = []
  162. for col in self.required_columns:
  163. if f'{col}_invalid' in row and row[f'{col}_invalid']:
  164. invalid_fields.append(col)
  165. if invalid_fields:
  166. logger.warning(
  167. f"跳过无效行: Farmland_ID={row['Farmland_ID']}, Sample_ID={row['Sample_ID']}, 无效字段: {', '.join(invalid_fields)}")
  168. return None
  169. # 创建对象
  170. return MSMInputData(
  171. farmland_id=int(row['Farmland_ID']),
  172. sample_id=int(row['Sample_ID']),
  173. var=row['Var:'],
  174. co2_tot=row['CO2[g].tot'],
  175. water_volume=row['watervolume'],
  176. sl_ratio=row['SL'],
  177. ph_value=row['pH'],
  178. ca_tot=row['Ca+2.tot'],
  179. mg_tot=row['Mg+2.tot'],
  180. k_tot=row['K+.tot'],
  181. na_tot=row['Na+.tot'],
  182. cl_tot=row['Cl-.tot'],
  183. cd_tot=row['Cd.tot'],
  184. hfo_kgkg=row['HFO_kgkg'],
  185. clay_kgkg=row['CLAY_kgkg'],
  186. ha_kgkg=row['HA_kgkg'],
  187. fa_kgkg=row['FA_kgkg']
  188. )
  189. except KeyError as e:
  190. logger.warning(f"创建对象时缺少必要字段: {str(e)}")
  191. return None
  192. except Exception as e:
  193. logger.warning(f"创建MSMInputData对象失败: {str(e)}")
  194. return None
  195. def import_data(self, df):
  196. """
  197. 将数据导入到数据库
  198. @param {DataFrame} df - 要导入的数据
  199. """
  200. try:
  201. logger.info("开始导入数据到数据库...")
  202. # 创建数据库会话
  203. db = SessionLocal()
  204. try:
  205. # 检查现有数据量
  206. existing_count = db.query(MSMInputData).count()
  207. logger.info(f"数据库中现有MSM输入数据记录: {existing_count} 条")
  208. # 批量创建对象并导入
  209. total_rows = len(df)
  210. imported_count = 0
  211. skipped_count = 0
  212. invalid_count = 0
  213. batch_size = 100
  214. objects_to_insert = []
  215. # 准备批量处理
  216. for i, row in df.iterrows():
  217. # 跳过前处理无效数据
  218. invalid = False
  219. for col in self.required_columns:
  220. if f'{col}_invalid' in row and row[f'{col}_invalid']:
  221. invalid = True
  222. break
  223. if invalid:
  224. invalid_count += 1
  225. continue
  226. try:
  227. obj = self.create_msm_input_object(row)
  228. if not obj:
  229. skipped_count += 1
  230. continue
  231. objects_to_insert.append(obj)
  232. imported_count += 1
  233. # 每100条提交一次
  234. if len(objects_to_insert) >= batch_size:
  235. db.add_all(objects_to_insert)
  236. db.commit()
  237. logger.info(f"已批量导入 {imported_count}/{total_rows} 条数据")
  238. objects_to_insert = []
  239. except Exception as e:
  240. logger.warning(f"处理行 {i} 时出错: {str(e)}")
  241. skipped_count += 1
  242. db.rollback()
  243. # 提交剩余数据
  244. if objects_to_insert:
  245. db.add_all(objects_to_insert)
  246. db.commit()
  247. # 更新统计信息
  248. new_count = db.query(MSMInputData).count()
  249. added_count = new_count - existing_count
  250. logger.info(f"MSM输入数据导入完成!")
  251. logger.info(f"尝试导入行数: {total_rows}")
  252. logger.info(f"成功导入: {imported_count} 条")
  253. logger.info(f"跳过无效数据: {invalid_count} 条")
  254. logger.info(f"处理失败: {skipped_count} 条")
  255. logger.info(f"数据库中新增加: {added_count} 条记录")
  256. logger.info(f"数据库总记录: {new_count} 条")
  257. except Exception as e:
  258. db.rollback()
  259. logger.error(f"数据导入失败,已回滚: {str(e)}")
  260. raise
  261. finally:
  262. db.close()
  263. except Exception as e:
  264. logger.error(f"数据导入过程失败: {str(e)}")
  265. raise
  266. def run_import(self):
  267. """
  268. 执行完整的导入流程
  269. """
  270. try:
  271. logger.info("=" * 60)
  272. logger.info("开始MSM输入数据导入流程")
  273. logger.info("=" * 60)
  274. # 1. 读取Excel数据
  275. df = self.read_excel_data()
  276. # 2. 验证数据
  277. df = self.validate_data(df)
  278. # 3. 导入数据
  279. self.import_data(df)
  280. logger.info("=" * 60)
  281. logger.info("MSM输入数据导入流程完成!")
  282. logger.info("=" * 60)
  283. except Exception as e:
  284. logger.error(f"导入流程失败: {str(e)}")
  285. raise
  286. def main():
  287. """
  288. 主函数
  289. """
  290. # Excel文件路径
  291. excel_path = r"D:\destkop\数据库对应数据.xlsx" # 根据实际路径修改
  292. sheet_name = "MSM_input" # 确保Excel中有这个sheet
  293. try:
  294. # 创建导入器并执行导入
  295. importer = MSMInputDataImporter(excel_path, sheet_name)
  296. importer.run_import()
  297. except Exception as e:
  298. logger.error(f"程序执行失败: {str(e)}")
  299. sys.exit(1)
  300. if __name__ == "__main__":
  301. main()