import_assessment.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. """
  2. 评价数据导入脚本
  3. @description: 从Excel文件读取Assessment评价数据并导入到Assessment表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. # 添加项目根目录到Python路径
  11. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  12. from app.database import engine, SessionLocal
  13. from app.models.assessment import Assessment # 确保已创建Assessment模型
  14. # 设置日志
  15. logging.basicConfig(
  16. level=logging.INFO,
  17. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  18. )
  19. logger = logging.getLogger(__name__)
  20. class AssessmentDataImporter:
  21. """
  22. 评价数据导入器
  23. @description: 从Excel文件读取评价数据并导入到Assessment表
  24. """
  25. def __init__(self, excel_path, sheet_name='Assessment'):
  26. """
  27. 初始化导入器
  28. @param {str} excel_path - Excel文件路径
  29. @param {str} sheet_name - Sheet名称,默认为'Assessment'
  30. """
  31. self.excel_path = excel_path
  32. self.sheet_name = sheet_name
  33. # 用地类型映射
  34. self.land_use_mapping = {
  35. '旱地': 0.0,
  36. '水田': 1.0,
  37. '水浇地': 2.0
  38. }
  39. # 定义必需列
  40. self.required_columns = [
  41. 'Farmland_ID',
  42. 'Sample_ID',
  43. 'Type',
  44. 'IDW_2023SP_Cd',
  45. 'IDW_2023SP_pH',
  46. 'SOM_IDW',
  47. 'safety_production_threshold',
  48. 'pollution_risk_screening_value'
  49. ]
  50. def read_excel_data(self):
  51. """
  52. 读取Excel文件数据
  53. @returns: DataFrame 读取的数据
  54. """
  55. try:
  56. logger.info(f"开始读取Excel文件: {self.excel_path}")
  57. logger.info(f"Sheet名称: {self.sheet_name}")
  58. # 检查文件是否存在
  59. if not os.path.exists(self.excel_path):
  60. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  61. # 读取Excel文件
  62. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  63. logger.info(f"成功读取数据,共 {len(df)} 行")
  64. logger.info(f"数据列: {list(df.columns)}")
  65. # 显示前几行数据供确认
  66. logger.info("前5行数据预览:")
  67. logger.info(df.head().to_string())
  68. return df
  69. except Exception as e:
  70. logger.error(f"读取Excel文件失败: {str(e)}")
  71. raise
  72. def validate_data(self, df):
  73. """
  74. 验证数据格式和完整性
  75. @param {DataFrame} df - 要验证的数据
  76. @returns: DataFrame 验证后的数据
  77. """
  78. try:
  79. logger.info("开始验证数据...")
  80. # 检查必需的列是否存在
  81. missing_columns = [col for col in self.required_columns if col not in df.columns]
  82. if missing_columns:
  83. raise ValueError(f"缺少必需的列: {missing_columns}")
  84. # 检查Farmland_ID和Sample_ID是否重复
  85. duplicates = df.duplicated(subset=['Farmland_ID', 'Sample_ID'])
  86. if duplicates.any():
  87. dup_rows = df[duplicates]
  88. logger.warning(f"发现 {len(dup_rows)} 条重复记录(基于Farmland_ID和Sample_ID)")
  89. logger.info("重复记录示例:\n" + dup_rows.head().to_string())
  90. # 删除重复行,保留第一个出现的
  91. df = df.drop_duplicates(subset=['Farmland_ID', 'Sample_ID'], keep='first')
  92. logger.info(f"删除重复记录后剩余 {len(df)} 行数据")
  93. # 转换数值类型
  94. numeric_columns = [
  95. 'IDW_2023SP_Cd',
  96. 'IDW_2023SP_pH',
  97. 'SOM_IDW',
  98. 'safety_production_threshold',
  99. 'pollution_risk_screening_value'
  100. ]
  101. for col in numeric_columns:
  102. if col in df.columns:
  103. # 尝试转换为数值类型
  104. df[col] = pd.to_numeric(df[col], errors='coerce')
  105. # 检查空值
  106. if df[col].isnull().any():
  107. invalid_rows = df[df[col].isnull()]
  108. logger.warning(f"列 {col} 中有无效值,行号: {list(invalid_rows.index)}")
  109. # 标记为无效但保留行,稍后处理
  110. df[f'{col}_valid'] = ~df[col].isnull()
  111. # 转换Farmland_ID和Sample_ID为整数
  112. for col in ['Farmland_ID', 'Sample_ID']:
  113. if col in df.columns:
  114. # 首先转换为浮点类型,再尝试转整数
  115. df[col] = pd.to_numeric(df[col], errors='coerce').fillna(-1)
  116. df[col] = df[col].astype(int)
  117. # 检查无效值
  118. if (df[col] < 0).any():
  119. invalid_rows = df[df[col] < 0]
  120. logger.warning(f"列 {col} 中有无效值,行号: {list(invalid_rows.index)}")
  121. df[f'{col}_valid'] = (df[col] >= 0)
  122. # 用地类型转换
  123. if 'Type' in df.columns:
  124. # 尝试直接转换为数值
  125. df['Type_Numeric'] = pd.to_numeric(df['Type'], errors='coerce')
  126. # 处理无法转换的类型
  127. unknown_types = df[df['Type_Numeric'].isnull()]['Type'].unique()
  128. if len(unknown_types) > 0:
  129. logger.info(f"发现未知用地类型: {unknown_types}, 尝试映射...")
  130. # 使用映射转换
  131. df['Type_Mapped'] = df['Type'].map(self.land_use_mapping)
  132. # 合并两种转换方式
  133. df['Final_Type'] = df['Type_Numeric'].fillna(df['Type_Mapped'])
  134. else:
  135. df['Final_Type'] = df['Type_Numeric']
  136. # 检查是否还有无效值
  137. if df['Final_Type'].isnull().any():
  138. invalid_rows = df[df['Final_Type'].isnull()]
  139. logger.warning(f"列 Type 中有无法识别的值,行号: {list(invalid_rows.index)}")
  140. logger.info("为无效值设置默认值0.0(旱地)")
  141. df['Final_Type'] = df['Final_Type'].fillna(0.0)
  142. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  143. return df
  144. except Exception as e:
  145. logger.error(f"数据验证失败: {str(e)}")
  146. raise
  147. def create_assessment_object(self, row):
  148. """
  149. 创建评价数据对象
  150. @param {pd.Series} row - 数据行
  151. @returns: Assessment 对象
  152. """
  153. try:
  154. return Assessment(
  155. farmland_id=row['Farmland_ID'],
  156. sample_id=row['Sample_ID'],
  157. type=row['Final_Type'],
  158. idw_2023sp_cd=row['IDW_2023SP_Cd'],
  159. idw_2023sp_ph=row['IDW_2023SP_pH'],
  160. som_idw=row['SOM_IDW'],
  161. safety_production_threshold=row['safety_production_threshold'],
  162. pollution_risk_screening_value=row['pollution_risk_screening_value']
  163. )
  164. except KeyError as e:
  165. logger.warning(f"创建对象时缺少必要字段: {str(e)}")
  166. return None
  167. except Exception as e:
  168. logger.warning(f"创建Assessment对象失败: {str(e)}")
  169. return None
  170. def import_data(self, df):
  171. """
  172. 将数据导入到数据库
  173. @param {DataFrame} df - 要导入的数据
  174. """
  175. try:
  176. logger.info("开始导入数据到数据库...")
  177. # 创建数据库会话
  178. db = SessionLocal()
  179. try:
  180. # 检查现有数据量
  181. existing_count = db.query(Assessment).count()
  182. logger.info(f"数据库中现有评价数据记录: {existing_count} 条")
  183. # 批量创建对象并导入
  184. total_rows = len(df)
  185. imported_count = 0
  186. skipped_count = 0
  187. invalid_count = 0
  188. # 分批处理数据
  189. for i, row in df.iterrows():
  190. try:
  191. # 检查是否有效行(所有关键字段都有效)
  192. is_valid = True
  193. for col in self.required_columns:
  194. if f'{col}_valid' in row and not row[f'{col}_valid']:
  195. is_valid = False
  196. break
  197. if not is_valid:
  198. invalid_count += 1
  199. logger.debug(f"跳过无效行 {i}: 存在无效值")
  200. continue
  201. # 创建Assessment对象
  202. assessment = self.create_assessment_object(row)
  203. if not assessment:
  204. skipped_count += 1
  205. continue
  206. # 添加到会话
  207. db.add(assessment)
  208. imported_count += 1
  209. # 每50条提交一次
  210. if imported_count % 50 == 0:
  211. db.commit()
  212. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  213. except Exception as e:
  214. logger.warning(f"导入行 {i} 时出错: {str(e)}")
  215. skipped_count += 1
  216. db.rollback()
  217. # 提交剩余数据
  218. db.commit()
  219. # 更新统计信息
  220. new_count = db.query(Assessment).count()
  221. added_count = new_count - existing_count
  222. logger.info(f"评价数据导入完成!")
  223. logger.info(f"成功导入: {imported_count} 条")
  224. logger.info(f"跳过无效数据: {invalid_count} 条")
  225. logger.info(f"处理失败: {skipped_count} 条")
  226. logger.info(f"数据库中新增加: {added_count} 条记录")
  227. logger.info(f"数据库总记录: {new_count} 条")
  228. except Exception as e:
  229. db.rollback()
  230. logger.error(f"数据导入失败,已回滚: {str(e)}")
  231. raise
  232. finally:
  233. db.close()
  234. except Exception as e:
  235. logger.error(f"数据导入过程失败: {str(e)}")
  236. raise
  237. def run_import(self):
  238. """
  239. 执行完整的导入流程
  240. """
  241. try:
  242. logger.info("=" * 60)
  243. logger.info("开始评价数据导入流程")
  244. logger.info("=" * 60)
  245. # 1. 读取Excel数据
  246. df = self.read_excel_data()
  247. # 2. 验证数据
  248. df = self.validate_data(df)
  249. # 3. 导入数据
  250. self.import_data(df)
  251. logger.info("=" * 60)
  252. logger.info("评价数据导入流程完成!")
  253. logger.info("=" * 60)
  254. except Exception as e:
  255. logger.error(f"导入流程失败: {str(e)}")
  256. raise
  257. def main():
  258. """
  259. 主函数
  260. """
  261. # Excel文件路径
  262. excel_path = r"D:\destkop\数据库对应数据.xlsx" # 根据实际路径修改
  263. sheet_name = "Assessment" # 确保Excel中有这个sheet
  264. try:
  265. # 创建导入器并执行导入
  266. importer = AssessmentDataImporter(excel_path, sheet_name)
  267. importer.run_import()
  268. except Exception as e:
  269. logger.error(f"程序执行失败: {str(e)}")
  270. sys.exit(1)
  271. if __name__ == "__main__":
  272. main()