import_FluxCd_output.py 8.2 KB


  1. """
  2. FluxCd_output数据导入脚本
  3. @description: 从Excel文件读取FluxCd_output数据并导入到fluxcd_output_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. from sqlalchemy.orm import sessionmaker
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. from app.database import engine, SessionLocal
  14. from app.models.FluxCd_output import FluxCdOutputData # 需创建对应的ORM模型
  15. # 设置日志
  16. logging.basicConfig(
  17. level=logging.INFO,
  18. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  19. )
  20. logger = logging.getLogger(__name__)
  21. class FluxCdOutputDataImporter:
  22. """
  23. FluxCd输出数据导入器
  24. @description: 从Excel文件读取FluxCd输出数据并导入到数据库
  25. """
  26. def __init__(self, excel_path, sheet_name='FluxCd_output'):
  27. """
  28. 初始化导入器
  29. @param {str} excel_path - Excel文件路径
  30. @param {str} sheet_name - Sheet名称,默认为'FluxCd_output'
  31. """
  32. self.excel_path = excel_path
  33. self.sheet_name = sheet_name
  34. # 定义必需字段列表(设计文档中的原始列名)
  35. self.required_columns = [
  36. 'Farmland_ID', 'Sample_ID', 'In_Cd',
  37. 'Out_Cd', 'Net_Cd', 'End_Cd'
  38. ]
  39. def read_excel_data(self):
  40. """
  41. 读取Excel文件数据
  42. @returns: DataFrame 读取的数据
  43. """
  44. try:
  45. logger.info(f"开始读取Excel文件: {self.excel_path}")
  46. logger.info(f"Sheet名称: {self.sheet_name}")
  47. # 检查文件是否存在
  48. if not os.path.exists(self.excel_path):
  49. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  50. # 读取Excel文件
  51. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  52. logger.info(f"成功读取数据,共 {len(df)} 行")
  53. logger.info(f"数据列: {list(df.columns)}")
  54. # 显示前几行数据供确认
  55. logger.info("前5行数据预览:")
  56. logger.info(df.head().to_string())
  57. return df
  58. except Exception as e:
  59. logger.error(f"读取Excel文件失败: {str(e)}")
  60. raise
  61. def validate_data(self, df):
  62. """
  63. 验证数据格式和完整性
  64. @param {DataFrame} df - 要验证的数据
  65. @returns: DataFrame 验证后的数据
  66. """
  67. try:
  68. logger.info("开始验证数据...")
  69. # 检查必需的列是否存在
  70. missing_columns = [col for col in self.required_columns if col not in df.columns]
  71. if missing_columns:
  72. raise ValueError(f"缺少必需的列: {missing_columns}")
  73. # 将列名转换为小写(带下划线)
  74. df.columns = [col.lower() for col in df.columns]
  75. required_columns_lower = [col.lower() for col in self.required_columns]
  76. # 检查数据类型
  77. logger.info("检查数据类型...")
  78. # 转换数值类型
  79. for col in required_columns_lower:
  80. df[col] = pd.to_numeric(df[col], errors='coerce')
  81. # 处理空值 - 所有字段必须非空
  82. if df.isnull().any().any():
  83. logger.warning("发现空值,将删除包含空值的行")
  84. # 找出空值行
  85. invalid_rows = df[df.isnull().any(axis=1)]
  86. logger.warning(f"无效行数: {len(invalid_rows)}")
  87. # 删除空值行
  88. df = df.dropna()
  89. # 验证逻辑关系:Net_Cd = In_Cd - Out_Cd
  90. tolerance = 1e-6
  91. net_cd_calculated = df['in_cd'] - df['out_cd']
  92. mismatches = abs(df['net_cd'] - net_cd_calculated) > tolerance
  93. if mismatches.any():
  94. mismatched_indices = mismatches[mismatches].index.tolist()
  95. logger.warning(f"发现 {len(mismatched_indices)} 行 Net_Cd 值与计算值不一致:")
  96. for i in mismatched_indices[:5]: # 只显示前5个示例
  97. logger.warning(f"行 {i}: Net_Cd={df.at[i, 'net_cd']}, 计算值={net_cd_calculated[i]}")
  98. # 用计算值覆盖原始值
  99. df['net_cd'] = net_cd_calculated
  100. logger.info("已自动修正 Net_Cd 值为 In_Cd - Out_Cd")
  101. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  102. return df
  103. except Exception as e:
  104. logger.error(f"数据验证失败: {str(e)}")
  105. raise
  106. def import_data(self, df):
  107. """
  108. 将数据导入到数据库
  109. @param {DataFrame} df - 要导入的数据
  110. """
  111. try:
  112. logger.info("开始导入数据到数据库...")
  113. # 创建数据库会话
  114. db = SessionLocal()
  115. try:
  116. # 检查是否有重复数据
  117. existing_count = db.query(FluxCdOutputData).count()
  118. logger.info(f"数据库中现有数据: {existing_count} 条")
  119. # 批量创建对象
  120. batch_size = 1000
  121. total_rows = len(df)
  122. imported_count = 0
  123. for i in range(0, total_rows, batch_size):
  124. batch_df = df.iloc[i:i + batch_size]
  125. batch_objects = []
  126. for _, row in batch_df.iterrows():
  127. try:
  128. # 创建FluxCdOutputData对象
  129. fluxcd_output = FluxCdOutputData(
  130. farmland_id=int(row['farmland_id']),
  131. sample_id=int(row['sample_id']),
  132. in_cd=float(row['in_cd']),
  133. out_cd=float(row['out_cd']),
  134. net_cd=float(row['net_cd']),
  135. end_cd=float(row['end_cd'])
  136. )
  137. batch_objects.append(fluxcd_output)
  138. except Exception as e:
  139. logger.warning(f"跳过行 {i + _}: {str(e)}")
  140. continue
  141. if batch_objects:
  142. # 批量插入
  143. db.add_all(batch_objects)
  144. db.commit()
  145. imported_count += len(batch_objects)
  146. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  147. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  148. # 验证导入结果
  149. final_count = db.query(FluxCdOutputData).count()
  150. logger.info(f"导入后数据库总数据: {final_count} 条")
  151. except Exception as e:
  152. db.rollback()
  153. logger.error(f"数据导入失败,已回滚: {str(e)}")
  154. raise
  155. finally:
  156. db.close()
  157. except Exception as e:
  158. logger.error(f"数据导入过程失败: {str(e)}")
  159. raise
  160. def run_import(self):
  161. """
  162. 执行完整的导入流程
  163. """
  164. try:
  165. logger.info("=" * 60)
  166. logger.info("开始FluxCd输出数据导入流程")
  167. logger.info("=" * 60)
  168. # 1. 读取Excel数据
  169. df = self.read_excel_data()
  170. # 2. 验证数据
  171. df = self.validate_data(df)
  172. # 3. 导入数据
  173. self.import_data(df)
  174. logger.info("=" * 60)
  175. logger.info("FluxCd输出数据导入流程完成!")
  176. logger.info("=" * 60)
  177. except Exception as e:
  178. logger.error(f"导入流程失败: {str(e)}")
  179. raise
  180. def main():
  181. """
  182. 主函数
  183. """
  184. # Excel文件路径
  185. excel_path = r"D:\destkop\数据库对应数据.xlsx" # 与原始文件相同
  186. sheet_name = "FluxCd_output" # 指定对应的sheet名称
  187. try:
  188. # 创建导入器并执行导入
  189. importer = FluxCdOutputDataImporter(excel_path, sheet_name)
  190. importer.run_import()
  191. except Exception as e:
  192. logger.error(f"程序执行失败: {str(e)}")
  193. sys.exit(1)
  194. if __name__ == "__main__":
  195. main()