import_soil_data.py 10 KB


  1. """
  2. 土壤数据导入脚本
  3. @description: 从Excel文件读取Soil数据并导入到Soil_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. from sqlalchemy.orm import sessionmaker
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. from app.database import engine, SessionLocal
  14. from app.models.soil import SoilData # 假设已创建SoilData模型
  15. # 设置日志
  16. logging.basicConfig(
  17. level=logging.INFO,
  18. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  19. )
  20. logger = logging.getLogger(__name__)
  21. class SoilDataImporter:
  22. """
  23. 土壤数据导入器
  24. @description: 从Excel文件读取土壤数据并导入到数据库
  25. """
  26. def __init__(self, excel_path, sheet_name='Soil'):
  27. """
  28. 初始化导入器
  29. @param {str} excel_path - Excel文件路径
  30. @param {str} sheet_name - Sheet名称,默认为'Soil'
  31. """
  32. self.excel_path = excel_path
  33. self.sheet_name = sheet_name
  34. # 定义默认值
  35. self.default_values = {
  36. 'DX_Cd': 0.023,
  37. 'DB_Cd': 0.368
  38. }
  39. def read_excel_data(self):
  40. """
  41. 读取Excel文件数据
  42. @returns: DataFrame 读取的数据
  43. """
  44. try:
  45. logger.info(f"开始读取Excel文件: {self.excel_path}")
  46. logger.info(f"Sheet名称: {self.sheet_name}")
  47. # 检查文件是否存在
  48. if not os.path.exists(self.excel_path):
  49. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  50. # 读取Excel文件
  51. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  52. logger.info(f"成功读取数据,共 {len(df)} 行")
  53. logger.info(f"数据列: {list(df.columns)}")
  54. # 显示前几行数据供确认
  55. logger.info("前5行数据预览:")
  56. logger.info(df.head().to_string())
  57. return df
  58. except Exception as e:
  59. logger.error(f"读取Excel文件失败: {str(e)}")
  60. raise
  61. def validate_data(self, df):
  62. """
  63. 验证数据格式和完整性
  64. @param {DataFrame} df - 要验证的数据
  65. @returns: DataFrame 验证后的数据
  66. """
  67. try:
  68. logger.info("开始验证数据...")
  69. # 检查必需的列是否存在
  70. required_columns = ['Farmland_ID', 'Sample_ID']
  71. missing_columns = [col for col in required_columns if col not in df.columns]
  72. if missing_columns:
  73. raise ValueError(f"缺少必需的列: {missing_columns}")
  74. # 检查数据类型
  75. logger.info("检查数据类型...")
  76. # 转换数值类型
  77. numeric_cols = [
  78. 'Farmland_ID', 'Sample_ID', '0002IDW', 'bd020_90', 'POR_Layer',
  79. 'ExAl_IDW', 'ExCa_IDW', 'ExK_IDW', 'ExMg_IDW', 'ExNa_IDW',
  80. 'Fed_IDW', 'SOM_IDW', 'IDW_2013PC_Cd', 'IDW_2018XC_Cd', 'IDW_2023SP_Cd',
  81. 'IDW_2013PC_pH', 'IDW_2018XC_pH', 'IDW_2023SP_pH', '002_0002IDW',
  82. '02_002IDW', '2_02IDW', 'AvaK_IDW', 'AvaP_IDW', 'CEC_IDW', 'EC_IDW',
  83. 'OC-Fe_0-30', 'SAvaK_IDW', 'TAl_IDW', 'TCa_IDW', 'TCd_IDW', 'TEB_IDW',
  84. 'TExH_IDW', 'TFe_IDW', 'TK_IDW', 'TMg_IDW', 'TMn_IDW', 'TN_IDW',
  85. 'TP_IDW', 'TS_IDW', 'DQCJ_Cd', 'GGS_Cd', 'DX_Cd', 'DB_Cd'
  86. ]
  87. # 只处理存在的列
  88. existing_numeric_cols = [col for col in numeric_cols if col in df.columns]
  89. # 转换数值类型
  90. for col in existing_numeric_cols:
  91. df[col] = pd.to_numeric(df[col], errors='coerce')
  92. # 应用默认值
  93. for col, default_val in self.default_values.items():
  94. if col in df.columns:
  95. df[col].fillna(default_val, inplace=True)
  96. # 删除Farmland_ID或Sample_ID为空的行
  97. initial_count = len(df)
  98. df = df.dropna(subset=['Farmland_ID', 'Sample_ID'])
  99. if len(df) < initial_count:
  100. logger.warning(f"删除 {initial_count - len(df)} 行缺少必需值的数据")
  101. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  102. return df
  103. except Exception as e:
  104. logger.error(f"数据验证失败: {str(e)}")
  105. raise
  106. def import_data(self, df):
  107. """
  108. 将数据导入到数据库
  109. @param {DataFrame} df - 要导入的数据
  110. """
  111. try:
  112. logger.info("开始导入数据到数据库...")
  113. # 创建数据库会话
  114. db = SessionLocal()
  115. try:
  116. # 检查是否有重复数据
  117. existing_count = db.query(SoilData).count()
  118. logger.info(f"数据库中现有数据: {existing_count} 条")
  119. # 批量创建对象
  120. batch_size = 1000
  121. total_rows = len(df)
  122. imported_count = 0
  123. for i in range(0, total_rows, batch_size):
  124. batch_df = df.iloc[i:i + batch_size]
  125. batch_objects = []
  126. for _, row in batch_df.iterrows():
  127. try:
  128. # 创建SoilData对象
  129. soil_data = SoilData(
  130. farmland_id=int(row['Farmland_ID']),
  131. sample_id=int(row['Sample_ID']),
  132. clay_0002IDW=row.get('0002IDW'),
  133. bd020_90=row.get('bd020_90'),
  134. por_layer=row.get('POR_Layer'),
  135. exal_idw=row.get('ExAl_IDW'),
  136. exca_idw=row.get('ExCa_IDW'),
  137. exk_idw=row.get('ExK_IDW'),
  138. exmg_idw=row.get('ExMg_IDW'),
  139. exna_idw=row.get('ExNa_IDW'),
  140. fed_idw=row.get('Fed_IDW'),
  141. som_idw=row.get('SOM_IDW'),
  142. idw_2013pc_cd=row.get('IDW_2013PC_Cd'),
  143. idw_2018xc_cd=row.get('IDW_2018XC_Cd'),
  144. idw_2023sp_cd=row.get('IDW_2023SP_Cd'),
  145. idw_2013pc_ph=row.get('IDW_2013PC_pH'),
  146. idw_2018xc_ph=row.get('IDW_2018XC_pH'),
  147. idw_2023sp_ph=row.get('IDW_2023SP_pH'),
  148. silt_002_0002IDW=row.get('002_0002IDW'),
  149. sand_02_002IDW=row.get('02_002IDW'),
  150. gravel_2_02IDW=row.get('2_02IDW'),
  151. avak_idw=row.get('AvaK_IDW'),
  152. avap_idw=row.get('AvaP_IDW'),
  153. cec_idw=row.get('CEC_IDW'),
  154. ec_idw=row.get('EC_IDW'),
  155. oc_fe_0_30=row.get('OC-Fe_0-30'),
  156. savak_idw=row.get('SAvaK_IDW'),
  157. tal_idw=row.get('TAl_IDW'),
  158. tca_idw=row.get('TCa_IDW'),
  159. tcd_idw=row.get('TCd_IDW'),
  160. teb_idw=row.get('TEB_IDW'),
  161. texh_idw=row.get('TExH_IDW'),
  162. tfe_idw=row.get('TFe_IDW'),
  163. tk_idw=row.get('TK_IDW'),
  164. tmg_idw=row.get('TMg_IDW'),
  165. tmn_idw=row.get('TMn_IDW'),
  166. tn_idw=row.get('TN_IDW'),
  167. tp_idw=row.get('TP_IDW'),
  168. ts_idw=row.get('TS_IDW'),
  169. dqcj_cd=row.get('DQCJ_Cd'),
  170. ggs_cd=row.get('GGS_Cd'),
  171. dx_cd=row.get('DX_Cd', self.default_values['DX_Cd']),
  172. db_cd=row.get('DB_Cd', self.default_values['DB_Cd'])
  173. )
  174. batch_objects.append(soil_data)
  175. except Exception as e:
  176. logger.warning(f"跳过行 {i + _}: {str(e)}")
  177. continue
  178. if batch_objects:
  179. # 批量插入
  180. db.add_all(batch_objects)
  181. db.commit()
  182. imported_count += len(batch_objects)
  183. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  184. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  185. # 验证导入结果
  186. final_count = db.query(SoilData).count()
  187. logger.info(f"导入后数据库总数据: {final_count} 条")
  188. except Exception as e:
  189. db.rollback()
  190. logger.error(f"数据导入失败,已回滚: {str(e)}")
  191. raise
  192. finally:
  193. db.close()
  194. except Exception as e:
  195. logger.error(f"数据导入过程失败: {str(e)}")
  196. raise
  197. def run_import(self):
  198. """
  199. 执行完整的导入流程
  200. """
  201. try:
  202. logger.info("=" * 60)
  203. logger.info("开始土壤数据导入流程")
  204. logger.info("=" * 60)
  205. # 1. 读取Excel数据
  206. df = self.read_excel_data()
  207. # 2. 验证数据
  208. df = self.validate_data(df)
  209. # 3. 导入数据
  210. self.import_data(df)
  211. logger.info("=" * 60)
  212. logger.info("土壤数据导入流程完成!")
  213. logger.info("=" * 60)
  214. except Exception as e:
  215. logger.error(f"导入流程失败: {str(e)}")
  216. raise
  217. def main():
  218. """
  219. 主函数
  220. """
  221. # Excel文件路径
  222. excel_path = r"D:\destkop\数据库对应数据.xlsx"
  223. sheet_name = "Soil"
  224. try:
  225. # 创建导入器并执行导入
  226. importer = SoilDataImporter(excel_path, sheet_name)
  227. importer.run_import()
  228. except Exception as e:
  229. logger.error(f"程序执行失败: {str(e)}")
  230. sys.exit(1)
  231. if __name__ == "__main__":
  232. main()