import_agricultural.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. """
  2. Agricultural数据导入脚本
  3. @description: 从Excel文件读取agricultural_data数据并导入到agricultural_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. from sqlalchemy.orm import sessionmaker
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. from app.database import engine, SessionLocal
  14. from app.models.agricultural import AgriculturalData # 需创建对应的ORM模型
  15. # 设置日志
  16. logging.basicConfig(
  17. level=logging.INFO,
  18. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  19. )
  20. logger = logging.getLogger(__name__)
  21. class AgriculturalDataImporter:
  22. """
  23. 农业投入品数据导入器
  24. @description: 从Excel文件读取农业投入品数据并导入到数据库
  25. """
  26. def __init__(self, excel_path, sheet_name='Agricultural'):
  27. """
  28. 初始化导入器
  29. @param {str} excel_path - Excel文件路径
  30. @param {str} sheet_name - Sheet名称,默认为'agricultural_data'
  31. """
  32. self.excel_path = excel_path
  33. self.sheet_name = sheet_name
  34. # 定义必需字段列表(根据数据库设计文档)
  35. self.required_columns = [
  36. 'county_name', 'crop_sowing_area', 'nitrogen_usage',
  37. 'phosphorus_usage', 'potassium_usage', 'compound_usage',
  38. 'organic_usage', 'pesticide_usage', 'farmyard_usage',
  39. 'plastic_film_usage', 'nitrogen_cd_flux', 'phosphorus_cd_flux',
  40. 'potassium_cd_flux', 'compound_cd_flux', 'organic_cd_flux',
  41. 'pesticide_cd_flux', 'farmyard_cd_flux', 'plastic_film_cd_flux',
  42. 'total_cd_flux', 'data_year'
  43. ]
  44. # 数值型字段列表
  45. self.numeric_columns = [
  46. 'crop_sowing_area', 'nitrogen_usage', 'phosphorus_usage',
  47. 'potassium_usage', 'compound_usage', 'organic_usage',
  48. 'pesticide_usage', 'farmyard_usage', 'plastic_film_usage',
  49. 'nitrogen_cd_flux', 'phosphorus_cd_flux', 'potassium_cd_flux',
  50. 'compound_cd_flux', 'organic_cd_flux', 'pesticide_cd_flux',
  51. 'farmyard_cd_flux', 'plastic_film_cd_flux', 'total_cd_flux',
  52. 'data_year'
  53. ]
  54. def read_excel_data(self):
  55. """
  56. 读取Excel文件数据
  57. @returns: DataFrame 读取的数据
  58. """
  59. try:
  60. logger.info(f"开始读取Excel文件: {self.excel_path}")
  61. logger.info(f"Sheet名称: {self.sheet_name}")
  62. # 检查文件是否存在
  63. if not os.path.exists(self.excel_path):
  64. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  65. # 读取Excel文件
  66. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  67. logger.info(f"成功读取数据,共 {len(df)} 行")
  68. logger.info(f"数据列: {list(df.columns)}")
  69. # 显示前几行数据供确认
  70. logger.info("前5行数据预览:")
  71. logger.info(df.head().to_string())
  72. return df
  73. except Exception as e:
  74. logger.error(f"读取Excel文件失败: {str(e)}")
  75. raise
  76. def validate_data(self, df):
  77. """
  78. 验证数据格式和完整性
  79. @param {DataFrame} df - 要验证的数据
  80. @returns: DataFrame 验证后的数据
  81. """
  82. try:
  83. logger.info("开始验证数据...")
  84. # 检查必需的列是否存在
  85. missing_columns = [col for col in self.required_columns if col not in df.columns]
  86. if missing_columns:
  87. raise ValueError(f"缺少必需的列: {missing_columns}")
  88. # 将列名转换为小写(带下划线)
  89. df.columns = [col.lower() for col in df.columns]
  90. required_columns_lower = [col.lower() for col in self.required_columns]
  91. numeric_columns_lower = [col.lower() for col in self.numeric_columns]
  92. # 检查数据类型
  93. logger.info("检查数据类型...")
  94. # 转换数值类型
  95. for col in numeric_columns_lower:
  96. if col in df.columns:
  97. # 对于数值列,转换为浮点数
  98. df[col] = pd.to_numeric(df[col], errors='coerce')
  99. # 处理特殊字段data_year(转换为整数)
  100. if col == 'data_year':
  101. df[col] = df[col].astype(pd.Int64Dtype(), errors='ignore')
  102. # 处理空值 - 所有字段必须非空(除了县市名称可能是文本)
  103. empty_columns = df.isnull().any()
  104. empty_cols = [col for col in empty_columns.index if empty_columns[col]]
  105. if empty_cols:
  106. logger.warning(f"发现以下列存在空值: {', '.join(empty_cols)}")
  107. # 对于数值列,如果有空值,填充为0
  108. for col in numeric_columns_lower:
  109. if col in df.columns and df[col].isnull().any():
  110. df[col] = df[col].fillna(0)
  111. logger.info(f"已将 {col} 的空值替换为0")
  112. # 再次检查县市名称
  113. if 'county_name' in df.columns and df['county_name'].isnull().any():
  114. logger.warning("县市名称存在空值,填充为'未知区域'")
  115. df['county_name'] = df['county_name'].fillna('未知区域')
  116. # 验证逻辑关系:总镉输入通量是否等于各分项之和
  117. tolerance = 1e-6
  118. total_calculated = (
  119. df['nitrogen_cd_flux'] + df['phosphorus_cd_flux'] +
  120. df['potassium_cd_flux'] + df['compound_cd_flux'] +
  121. df['organic_cd_flux'] + df['pesticide_cd_flux'] +
  122. df['farmyard_cd_flux'] + df['plastic_film_cd_flux']
  123. )
  124. mismatches = abs(df['total_cd_flux'] - total_calculated) > tolerance
  125. if mismatches.any():
  126. mismatched_indices = mismatches[mismatches].index.tolist()
  127. logger.warning(f"发现 {len(mismatched_indices)} 行 total_cd_flux 值与各分项之和不一致:")
  128. for i in mismatched_indices[:5]: # 只显示前5个示例
  129. logger.warning(f"行 {i}: total_cd_flux={df.at[i, 'total_cd_flux']}, 计算值={total_calculated[i]}")
  130. # 用计算值覆盖原始值
  131. df['total_cd_flux'] = total_calculated
  132. logger.info("已自动修正 total_cd_flux 值为各分项之和")
  133. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  134. return df
  135. except Exception as e:
  136. logger.error(f"数据验证失败: {str(e)}")
  137. raise
  138. def import_data(self, df):
  139. """
  140. 将数据导入到数据库
  141. @param {DataFrame} df - 要导入的数据
  142. """
  143. try:
  144. logger.info("开始导入数据到数据库...")
  145. # 创建数据库会话
  146. db = SessionLocal()
  147. try:
  148. # 检查是否有重复数据
  149. existing_count = db.query(AgriculturalData).count()
  150. logger.info(f"数据库中现有数据: {existing_count} 条")
  151. # 批量创建对象
  152. batch_size = 1000
  153. total_rows = len(df)
  154. imported_count = 0
  155. for i in range(0, total_rows, batch_size):
  156. batch_df = df.iloc[i:i + batch_size]
  157. batch_objects = []
  158. for _, row in batch_df.iterrows():
  159. try:
  160. # 创建AgriculturalData对象
  161. agricultural_data = AgriculturalData(
  162. county_name=str(row['county_name']),
  163. crop_sowing_area=float(row['crop_sowing_area']),
  164. nitrogen_usage=float(row['nitrogen_usage']),
  165. phosphorus_usage=float(row['phosphorus_usage']),
  166. potassium_usage=float(row['potassium_usage']),
  167. compound_usage=float(row['compound_usage']),
  168. organic_usage=float(row['organic_usage']),
  169. pesticide_usage=float(row['pesticide_usage']),
  170. farmyard_usage=float(row['farmyard_usage']),
  171. plastic_film_usage=float(row['plastic_film_usage']),
  172. nitrogen_cd_flux=float(row['nitrogen_cd_flux']),
  173. phosphorus_cd_flux=float(row['phosphorus_cd_flux']),
  174. potassium_cd_flux=float(row['potassium_cd_flux']),
  175. compound_cd_flux=float(row['compound_cd_flux']),
  176. organic_cd_flux=float(row['organic_cd_flux']),
  177. pesticide_cd_flux=float(row['pesticide_cd_flux']),
  178. farmyard_cd_flux=float(row['farmyard_cd_flux']),
  179. plastic_film_cd_flux=float(row['plastic_film_cd_flux']),
  180. total_cd_flux=float(row['total_cd_flux']),
  181. data_year=int(row['data_year'])
  182. )
  183. batch_objects.append(agricultural_data)
  184. except Exception as e:
  185. logger.warning(f"跳过行 {i + _}: {str(e)}")
  186. continue
  187. if batch_objects:
  188. # 批量插入
  189. db.add_all(batch_objects)
  190. db.commit()
  191. imported_count += len(batch_objects)
  192. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  193. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  194. # 验证导入结果
  195. final_count = db.query(AgriculturalData).count()
  196. logger.info(f"导入后数据库总数据: {final_count} 条")
  197. except Exception as e:
  198. db.rollback()
  199. logger.error(f"数据导入失败,已回滚: {str(e)}")
  200. raise
  201. finally:
  202. db.close()
  203. except Exception as e:
  204. logger.error(f"数据导入过程失败: {str(e)}")
  205. raise
  206. def run_import(self):
  207. """
  208. 执行完整的导入流程
  209. """
  210. try:
  211. logger.info("=" * 60)
  212. logger.info("开始农业投入品数据导入流程")
  213. logger.info("=" * 60)
  214. # 1. 读取Excel数据
  215. df = self.read_excel_data()
  216. # 2. 验证数据
  217. df = self.validate_data(df)
  218. # 3. 导入数据
  219. self.import_data(df)
  220. logger.info("=" * 60)
  221. logger.info("农业投入品数据导入流程完成!")
  222. logger.info("=" * 60)
  223. except Exception as e:
  224. logger.error(f"导入流程失败: {str(e)}")
  225. raise
  226. def main():
  227. """
  228. 主函数
  229. """
  230. # Excel文件路径
  231. excel_path = r"D:\destkop\数据库对应数据.xlsx" # 与原始文件相同
  232. sheet_name = "Agricultural" # 指定对应的sheet名称
  233. try:
  234. # 创建导入器并执行导入
  235. importer = AgriculturalDataImporter(excel_path, sheet_name)
  236. importer.run_import()
  237. except Exception as e:
  238. logger.error(f"程序执行失败: {str(e)}")
  239. sys.exit(1)
  240. if __name__ == "__main__":
  241. main()