import_EffCd_output.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. """
  2. EffCd_output数据导入脚本
  3. @description: 从Excel文件读取EffCd_output数据并导入到EffCd_output_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. from sqlalchemy.orm import sessionmaker
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. from app.database import engine, SessionLocal
  14. from app.models.EffCd_output import EffCdOutputData # 需创建对应的ORM模型
  15. # 设置日志
  16. logging.basicConfig(
  17. level=logging.INFO,
  18. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  19. )
  20. logger = logging.getLogger(__name__)
  21. class EffCdOutputDataImporter:
  22. """
  23. EffCd输出数据导入器
  24. @description: 从Excel文件读取EffCd输出数据并导入到数据库
  25. """
  26. def __init__(self, excel_path, sheet_name='EffCd_output'):
  27. """
  28. 初始化导入器
  29. @param {str} excel_path - Excel文件路径
  30. @param {str} sheet_name - Sheet名称,默认为'EffCd_output'
  31. """
  32. self.excel_path = excel_path
  33. self.sheet_name = sheet_name
  34. # 定义必需字段列表
  35. self.required_columns = ['Farmland_ID', 'Sample_ID', 'LnEffCd']
  36. def read_excel_data(self):
  37. """
  38. 读取Excel文件数据
  39. @returns: DataFrame 读取的数据
  40. """
  41. try:
  42. logger.info(f"开始读取Excel文件: {self.excel_path}")
  43. logger.info(f"Sheet名称: {self.sheet_name}")
  44. # 检查文件是否存在
  45. if not os.path.exists(self.excel_path):
  46. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  47. # 读取Excel文件
  48. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  49. logger.info(f"成功读取数据,共 {len(df)} 行")
  50. logger.info(f"数据列: {list(df.columns)}")
  51. # 显示前几行数据供确认
  52. logger.info("前5行数据预览:")
  53. logger.info(df.head().to_string())
  54. return df
  55. except Exception as e:
  56. logger.error(f"读取Excel文件失败: {str(e)}")
  57. raise
  58. def validate_data(self, df):
  59. """
  60. 验证数据格式和完整性
  61. @param {DataFrame} df - 要验证的数据
  62. @returns: DataFrame 验证后的数据
  63. """
  64. try:
  65. logger.info("开始验证数据...")
  66. # 检查必需的列是否存在
  67. missing_columns = [col for col in self.required_columns if col not in df.columns]
  68. if missing_columns:
  69. raise ValueError(f"缺少必需的列: {missing_columns}")
  70. # 检查数据类型
  71. logger.info("检查数据类型...")
  72. # 转换数值类型
  73. for col in self.required_columns:
  74. df[col] = pd.to_numeric(df[col], errors='coerce')
  75. # 检查是否有无效的数值
  76. if df[self.required_columns].isnull().any().any():
  77. logger.warning("发现无效的数值,将跳过这些行")
  78. invalid_rows = df[df[self.required_columns].isnull().any(axis=1)]
  79. logger.warning(f"无效行数: {len(invalid_rows)}")
  80. df = df.dropna(subset=self.required_columns)
  81. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  82. return df
  83. except Exception as e:
  84. logger.error(f"数据验证失败: {str(e)}")
  85. raise
  86. def import_data(self, df):
  87. """
  88. 将数据导入到数据库
  89. @param {DataFrame} df - 要导入的数据
  90. """
  91. try:
  92. logger.info("开始导入数据到数据库...")
  93. # 创建数据库会话
  94. db = SessionLocal()
  95. try:
  96. # 检查是否有重复数据
  97. existing_count = db.query(EffCdOutputData).count()
  98. logger.info(f"数据库中现有数据: {existing_count} 条")
  99. # 批量创建对象
  100. batch_size = 1000
  101. total_rows = len(df)
  102. imported_count = 0
  103. for i in range(0, total_rows, batch_size):
  104. batch_df = df.iloc[i:i + batch_size]
  105. batch_objects = []
  106. for _, row in batch_df.iterrows():
  107. try:
  108. # 创建EffCdOutputData对象
  109. effcd_output = EffCdOutputData(
  110. farmland_id=int(row['Farmland_ID']),
  111. sample_id=int(row['Sample_ID']),
  112. ln_eff_cd=float(row['LnEffCd'])
  113. )
  114. batch_objects.append(effcd_output)
  115. except Exception as e:
  116. logger.warning(f"跳过行 {i + _}: {str(e)}")
  117. continue
  118. if batch_objects:
  119. # 批量插入
  120. db.add_all(batch_objects)
  121. db.commit()
  122. imported_count += len(batch_objects)
  123. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  124. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  125. # 验证导入结果
  126. final_count = db.query(EffCdOutputData).count()
  127. logger.info(f"导入后数据库总数据: {final_count} 条")
  128. except Exception as e:
  129. db.rollback()
  130. logger.error(f"数据导入失败,已回滚: {str(e)}")
  131. raise
  132. finally:
  133. db.close()
  134. except Exception as e:
  135. logger.error(f"数据导入过程失败: {str(e)}")
  136. raise
  137. def run_import(self):
  138. """
  139. 执行完整的导入流程
  140. """
  141. try:
  142. logger.info("=" * 60)
  143. logger.info("开始EffCd输出数据导入流程")
  144. logger.info("=" * 60)
  145. # 1. 读取Excel数据
  146. df = self.read_excel_data()
  147. # 2. 验证数据
  148. df = self.validate_data(df)
  149. # 3. 导入数据
  150. self.import_data(df)
  151. logger.info("=" * 60)
  152. logger.info("EffCd输出数据导入流程完成!")
  153. logger.info("=" * 60)
  154. except Exception as e:
  155. logger.error(f"导入流程失败: {str(e)}")
  156. raise
  157. def main():
  158. """
  159. 主函数
  160. """
  161. # Excel文件路径
  162. excel_path = r"D:\destkop\数据库对应数据.xlsx" # 与原始文件相同
  163. sheet_name = "EffCd_output" # 指定对应的sheet名称
  164. try:
  165. # 创建导入器并执行导入
  166. importer = EffCdOutputDataImporter(excel_path, sheet_name)
  167. importer.run_import()
  168. except Exception as e:
  169. logger.error(f"程序执行失败: {str(e)}")
  170. sys.exit(1)
  171. if __name__ == "__main__":
  172. main()