import_MSM_output.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. """
  2. 络合模型输出数据导入脚本
  3. @description: 从Excel文件读取MSM_output数据并导入到MSM_output_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. # 添加项目根目录到Python路径
  11. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  12. from app.database import engine, SessionLocal
  13. from app.models.MSM_output import MSMOutputData # 确保已创建MSMOutputData模型
  14. # 设置日志
  15. logging.basicConfig(
  16. level=logging.INFO,
  17. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  18. )
  19. logger = logging.getLogger(__name__)
  20. class MSMOutputDataImporter:
  21. """
  22. 络合模型输出数据导入器
  23. @description: 从Excel文件读取MSM输出数据并导入到MSM_output_data表
  24. """
  25. def __init__(self, excel_path, sheet_name='MSM_output'):
  26. """
  27. 初始化导入器
  28. @param {str} excel_path - Excel文件路径
  29. @param {str} sheet_name - Sheet名称,默认为'MSM_output'
  30. """
  31. self.excel_path = excel_path
  32. self.sheet_name = sheet_name
  33. # 定义必需列
  34. self.required_columns = [
  35. 'Farmland_ID',
  36. 'Sample_ID',
  37. 'Var:',
  38. 'Cd.solution'
  39. ]
  40. def read_excel_data(self):
  41. """
  42. 读取Excel文件数据
  43. @returns: DataFrame 读取的数据
  44. """
  45. try:
  46. logger.info(f"开始读取Excel文件: {self.excel_path}")
  47. logger.info(f"Sheet名称: {self.sheet_name}")
  48. # 检查文件是否存在
  49. if not os.path.exists(self.excel_path):
  50. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  51. # 读取Excel文件
  52. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  53. logger.info(f"成功读取数据,共 {len(df)} 行")
  54. logger.info(f"数据列: {list(df.columns)}")
  55. # 显示前几行数据供确认
  56. logger.info("前5行数据预览:")
  57. logger.info(df.head().to_string())
  58. return df
  59. except Exception as e:
  60. logger.error(f"读取Excel文件失败: {str(e)}")
  61. raise
  62. def validate_data(self, df):
  63. """
  64. 验证数据格式和完整性
  65. @param {DataFrame} df - 要验证的数据
  66. @returns: DataFrame 验证后的数据
  67. """
  68. try:
  69. logger.info("开始验证数据...")
  70. # 检查必需的列是否存在
  71. missing_columns = [col for col in self.required_columns if col not in df.columns]
  72. if missing_columns:
  73. raise ValueError(f"缺少必需的列: {missing_columns}")
  74. # 检查Farmland_ID、Sample_ID和Var:是否重复
  75. duplicates = df.duplicated(subset=['Farmland_ID', 'Sample_ID', 'Var:'])
  76. if duplicates.any():
  77. dup_rows = df[duplicates]
  78. logger.warning(f"发现 {len(dup_rows)} 条重复记录(基于Farmland_ID, Sample_ID和Var:)")
  79. logger.info("重复记录示例:\n" + dup_rows.head().to_string())
  80. # 删除重复行,保留第一个出现的
  81. df = df.drop_duplicates(subset=['Farmland_ID', 'Sample_ID', 'Var:'], keep='first')
  82. logger.info(f"删除重复记录后剩余 {len(df)} 行数据")
  83. # 处理字符串列
  84. string_columns = ['Var:']
  85. for col in string_columns:
  86. if col in df.columns:
  87. df[col] = df[col].astype(str).fillna('')
  88. # 处理数值列
  89. numeric_columns = ['Cd.solution']
  90. for col in numeric_columns:
  91. if col in df.columns:
  92. # 尝试转换为数值类型
  93. df[col] = pd.to_numeric(df[col], errors='coerce')
  94. # 检查空值
  95. null_count = df[col].isnull().sum()
  96. if null_count > 0:
  97. logger.warning(f"列 {col} 中有 {null_count} 个空值或无效值")
  98. # 标记为无效
  99. df[f'{col}_invalid'] = df[col].isnull()
  100. # 处理Farmland_ID和Sample_ID
  101. for col in ['Farmland_ID', 'Sample_ID']:
  102. if col in df.columns:
  103. # 尝试转换为整数
  104. df[col] = pd.to_numeric(df[col], errors='coerce').fillna(-1)
  105. # 检查无效值
  106. invalid_ids = df[df[col] < 0]
  107. if not invalid_ids.empty:
  108. logger.warning(f"列 {col} 中有 {len(invalid_ids)} 条无效值")
  109. logger.info("问题行:\n" + invalid_ids.head().to_string())
  110. # 标记为无效
  111. df[f'{col}_invalid'] = df[col] < 0
  112. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  113. return df
  114. except Exception as e:
  115. logger.error(f"数据验证失败: {str(e)}")
  116. raise
  117. def create_msm_output_object(self, row):
  118. """
  119. 创建MSM输出数据对象
  120. @param {pd.Series} row - 数据行
  121. @returns: MSMOutputData 对象
  122. """
  123. try:
  124. # 处理无效数据
  125. invalid_fields = []
  126. for col in self.required_columns:
  127. if f'{col}_invalid' in row and row[f'{col}_invalid']:
  128. invalid_fields.append(col)
  129. if invalid_fields:
  130. logger.warning(
  131. f"跳过无效行: Farmland_ID={row['Farmland_ID']}, Sample_ID={row['Sample_ID']}, 无效字段: {', '.join(invalid_fields)}")
  132. return None
  133. # 创建对象
  134. return MSMOutputData(
  135. farmland_id=int(row['Farmland_ID']),
  136. sample_id=int(row['Sample_ID']),
  137. var=row['Var:'],
  138. cd_solution=row['Cd.solution']
  139. )
  140. except KeyError as e:
  141. logger.warning(f"创建对象时缺少必要字段: {str(e)}")
  142. return None
  143. except Exception as e:
  144. logger.warning(f"创建MSMOutputData对象失败: {str(e)}")
  145. return None
  146. def import_data(self, df):
  147. """
  148. 将数据导入到数据库
  149. @param {DataFrame} df - 要导入的数据
  150. """
  151. try:
  152. logger.info("开始导入数据到数据库...")
  153. # 创建数据库会话
  154. db = SessionLocal()
  155. try:
  156. # 检查现有数据量
  157. existing_count = db.query(MSMOutputData).count()
  158. logger.info(f"数据库中现有MSM输出数据记录: {existing_count} 条")
  159. # 批量创建对象并导入
  160. total_rows = len(df)
  161. imported_count = 0
  162. skipped_count = 0
  163. invalid_count = 0
  164. batch_size = 100
  165. objects_to_insert = []
  166. # 准备批量处理
  167. for i, row in df.iterrows():
  168. # 跳过前处理无效数据
  169. invalid = False
  170. for col in self.required_columns:
  171. if f'{col}_invalid' in row and row[f'{col}_invalid']:
  172. invalid = True
  173. break
  174. if invalid:
  175. invalid_count += 1
  176. continue
  177. try:
  178. obj = self.create_msm_output_object(row)
  179. if not obj:
  180. skipped_count += 1
  181. continue
  182. objects_to_insert.append(obj)
  183. imported_count += 1
  184. # 每100条提交一次
  185. if len(objects_to_insert) >= batch_size:
  186. db.add_all(objects_to_insert)
  187. db.commit()
  188. logger.info(f"已批量导入 {imported_count}/{total_rows} 条数据")
  189. objects_to_insert = []
  190. except Exception as e:
  191. logger.warning(f"处理行 {i} 时出错: {str(e)}")
  192. skipped_count += 1
  193. db.rollback()
  194. # 提交剩余数据
  195. if objects_to_insert:
  196. db.add_all(objects_to_insert)
  197. db.commit()
  198. # 更新统计信息
  199. new_count = db.query(MSMOutputData).count()
  200. added_count = new_count - existing_count
  201. logger.info(f"MSM输出数据导入完成!")
  202. logger.info(f"尝试导入行数: {total_rows}")
  203. logger.info(f"成功导入: {imported_count} 条")
  204. logger.info(f"跳过无效数据: {invalid_count} 条")
  205. logger.info(f"处理失败: {skipped_count} 条")
  206. logger.info(f"数据库中新增加: {added_count} 条记录")
  207. logger.info(f"数据库总记录: {new_count} 条")
  208. except Exception as e:
  209. db.rollback()
  210. logger.error(f"数据导入失败,已回滚: {str(e)}")
  211. raise
  212. finally:
  213. db.close()
  214. except Exception as e:
  215. logger.error(f"数据导入过程失败: {str(e)}")
  216. raise
  217. def run_import(self):
  218. """
  219. 执行完整的导入流程
  220. """
  221. try:
  222. logger.info("=" * 60)
  223. logger.info("开始MSM输出数据导入流程")
  224. logger.info("=" * 60)
  225. # 1. 读取Excel数据
  226. df = self.read_excel_data()
  227. # 2. 验证数据
  228. df = self.validate_data(df)
  229. # 3. 导入数据
  230. self.import_data(df)
  231. logger.info("=" * 60)
  232. logger.info("MSM输出数据导入流程完成!")
  233. logger.info("=" * 60)
  234. except Exception as e:
  235. logger.error(f"导入流程失败: {str(e)}")
  236. raise
  237. def main():
  238. """
  239. 主函数
  240. """
  241. # Excel文件路径
  242. excel_path = r"D:\destkop\数据库对应数据.xlsx" # 根据实际路径修改
  243. sheet_name = "MSM_output" # 确保Excel中有这个sheet
  244. try:
  245. # 创建导入器并执行导入
  246. importer = MSMOutputDataImporter(excel_path, sheet_name)
  247. importer.run_import()
  248. except Exception as e:
  249. logger.error(f"程序执行失败: {str(e)}")
  250. sys.exit(1)
  251. if __name__ == "__main__":
  252. main()