import_FluxCd_input.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. """
  2. FluxCd_input数据导入脚本
  3. @description: 从Excel文件读取FluxCd_input数据并导入到fluxcd_input_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. from sqlalchemy.orm import sessionmaker
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. from app.database import engine, SessionLocal
  14. from app.models.FluxCd_input import FluxCdInputData # 需创建对应的ORM模型
  15. # 设置日志
  16. logging.basicConfig(
  17. level=logging.INFO,
  18. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  19. )
  20. logger = logging.getLogger(__name__)
  21. class FluxCdInputDataImporter:
  22. """
  23. FluxCd输入数据导入器
  24. @description: 从Excel文件读取FluxCd输入数据并导入到数据库
  25. """
  26. def __init__(self, excel_path, sheet_name='FluxCd_input'):
  27. """
  28. 初始化导入器
  29. @param {str} excel_path - Excel文件路径
  30. @param {str} sheet_name - Sheet名称,默认为'FluxCd_input'
  31. """
  32. self.excel_path = excel_path
  33. self.sheet_name = sheet_name
  34. # 定义必需字段列表(设计文档中的原始列名)
  35. self.required_columns = [
  36. 'Farmland_ID', 'Sample_ID', 'Initial_Cd',
  37. 'DQCJ_Cd', 'GGS_Cd', 'NCP_Cd',
  38. 'DX_Cd', 'DB_Cd', 'ZL_Cd', 'JG_Cd'
  39. ]
  40. # 默认值设置(针对允许空的字段)
  41. self.default_values = {
  42. 'DX_Cd': 0.023,
  43. 'DB_Cd': 0.368
  44. }
  45. def read_excel_data(self):
  46. """
  47. 读取Excel文件数据
  48. @returns: DataFrame 读取的数据
  49. """
  50. try:
  51. logger.info(f"开始读取Excel文件: {self.excel_path}")
  52. logger.info(f"Sheet名称: {self.sheet_name}")
  53. # 检查文件是否存在
  54. if not os.path.exists(self.excel_path):
  55. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  56. # 读取Excel文件
  57. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  58. logger.info(f"成功读取数据,共 {len(df)} 行")
  59. logger.info(f"数据列: {list(df.columns)}")
  60. # 显示前几行数据供确认
  61. logger.info("前5行数据预览:")
  62. logger.info(df.head().to_string())
  63. return df
  64. except Exception as e:
  65. logger.error(f"读取Excel文件失败: {str(e)}")
  66. raise
  67. def validate_data(self, df):
  68. """
  69. 验证数据格式和完整性
  70. @param {DataFrame} df - 要验证的数据
  71. @returns: DataFrame 验证后的数据
  72. """
  73. try:
  74. logger.info("开始验证数据...")
  75. # 检查必需的列是否存在
  76. missing_columns = [col for col in self.required_columns if col not in df.columns]
  77. if missing_columns:
  78. raise ValueError(f"缺少必需的列: {missing_columns}")
  79. # 将列名转换为小写(带下划线)
  80. df.columns = [col.lower() for col in df.columns]
  81. required_columns_lower = [col.lower() for col in self.required_columns]
  82. default_values_lower = {k.lower(): v for k, v in self.default_values.items()}
  83. # 检查数据类型
  84. logger.info("检查数据类型...")
  85. # 转换数值类型
  86. for col in required_columns_lower:
  87. df[col] = pd.to_numeric(df[col], errors='coerce')
  88. # 处理空值:对于有默认值的列,用默认值填充;其他列必须非空
  89. # 对于允许空且有默认值的列
  90. for col in ['dx_cd', 'db_cd']:
  91. if col in required_columns_lower:
  92. # 用默认值填充空值
  93. default_val = default_values_lower.get(col, None)
  94. if default_val is not None:
  95. df[col] = df[col].fillna(default_val)
  96. # 同时,也要确保其他非空字段没有空值(除了这两个字段,其他字段不能为空)
  97. # 其他字段如果有空值,则删除行
  98. # 先找出没有默认值的必需字段
  99. non_default_columns = [col for col in required_columns_lower if col not in ['dx_cd', 'db_cd']]
  100. if df[non_default_columns].isnull().any().any():
  101. logger.warning("发现非默认值列有无效的数值,将跳过这些行")
  102. # 找出这些行
  103. invalid_rows = df[df[non_default_columns].isnull().any(axis=1)]
  104. logger.warning(f"无效行数: {len(invalid_rows)}")
  105. # 删除这些行
  106. df = df.dropna(subset=non_default_columns)
  107. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  108. return df
  109. except Exception as e:
  110. logger.error(f"数据验证失败: {str(e)}")
  111. raise
  112. def import_data(self, df):
  113. """
  114. 将数据导入到数据库
  115. @param {DataFrame} df - 要导入的数据
  116. """
  117. try:
  118. logger.info("开始导入数据到数据库...")
  119. # 创建数据库会话
  120. db = SessionLocal()
  121. try:
  122. # 检查是否有重复数据
  123. existing_count = db.query(FluxCdInputData).count()
  124. logger.info(f"数据库中现有数据: {existing_count} 条")
  125. # 批量创建对象
  126. batch_size = 1000
  127. total_rows = len(df)
  128. imported_count = 0
  129. for i in range(0, total_rows, batch_size):
  130. batch_df = df.iloc[i:i + batch_size]
  131. batch_objects = []
  132. for _, row in batch_df.iterrows():
  133. try:
  134. # 创建FluxCdInputData对象
  135. fluxcd_input = FluxCdInputData(
  136. farmland_id=int(row['farmland_id']),
  137. sample_id=int(row['sample_id']),
  138. initial_cd=float(row['initial_cd']),
  139. atmospheric_deposition=float(row['dqcj_cd']),
  140. irrigation_input=float(row['ggs_cd']),
  141. agro_chemicals_input=float(row['ncp_cd']),
  142. groundwater_leaching=float(row['dx_cd']),
  143. surface_runoff=float(row['db_cd']),
  144. grain_removal=float(row['zl_cd']),
  145. straw_removal=float(row['jg_cd'])
  146. )
  147. batch_objects.append(fluxcd_input)
  148. except Exception as e:
  149. logger.warning(f"跳过行 {i + _}: {str(e)}")
  150. continue
  151. if batch_objects:
  152. # 批量插入
  153. db.add_all(batch_objects)
  154. db.commit()
  155. imported_count += len(batch_objects)
  156. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  157. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  158. # 验证导入结果
  159. final_count = db.query(FluxCdInputData).count()
  160. logger.info(f"导入后数据库总数据: {final_count} 条")
  161. except Exception as e:
  162. db.rollback()
  163. logger.error(f"数据导入失败,已回滚: {str(e)}")
  164. raise
  165. finally:
  166. db.close()
  167. except Exception as e:
  168. logger.error(f"数据导入过程失败: {str(e)}")
  169. raise
  170. def run_import(self):
  171. """
  172. 执行完整的导入流程
  173. """
  174. try:
  175. logger.info("=" * 60)
  176. logger.info("开始FluxCd输入数据导入流程")
  177. logger.info("=" * 60)
  178. # 1. 读取Excel数据
  179. df = self.read_excel_data()
  180. # 2. 验证数据
  181. df = self.validate_data(df)
  182. # 3. 导入数据
  183. self.import_data(df)
  184. logger.info("=" * 60)
  185. logger.info("FluxCd输入数据导入流程完成!")
  186. logger.info("=" * 60)
  187. except Exception as e:
  188. logger.error(f"导入流程失败: {str(e)}")
  189. raise
  190. def main():
  191. """
  192. 主函数
  193. """
  194. # Excel文件路径
  195. excel_path = r"D:\destkop\数据库对应数据.xlsx" # 与原始文件相同
  196. sheet_name = "FluxCd_input" # 指定对应的sheet名称
  197. try:
  198. # 创建导入器并执行导入
  199. importer = FluxCdInputDataImporter(excel_path, sheet_name)
  200. importer.run_import()
  201. except Exception as e:
  202. logger.error(f"程序执行失败: {str(e)}")
  203. sys.exit(1)
  204. if __name__ == "__main__":
  205. main()