import_farmland_data.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. """
  2. 农田数据导入脚本
  3. @description: 从Excel文件读取Farmland数据并导入到Farmland_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. from sqlalchemy.orm import sessionmaker
  11. from geoalchemy2 import WKTElement
  12. # 添加项目根目录到Python路径
  13. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  14. from app.database import engine, SessionLocal
  15. from app.models.farmland import FarmlandData
  16. # 设置日志
  17. logging.basicConfig(
  18. level=logging.INFO,
  19. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  20. )
  21. logger = logging.getLogger(__name__)
  22. class FarmlandDataImporter:
  23. """
  24. 农田数据导入器
  25. @description: 从Excel文件读取农田数据并导入到数据库
  26. """
  27. def __init__(self, excel_path, sheet_name='Farmland'):
  28. """
  29. 初始化导入器
  30. @param {str} excel_path - Excel文件路径
  31. @param {str} sheet_name - Sheet名称,默认为'Farmland'
  32. """
  33. self.excel_path = excel_path
  34. self.sheet_name = sheet_name
  35. self.type_mapping = {
  36. '旱': 0.0,
  37. '水田': 1.0,
  38. '水浇地': 2.0
  39. }
  40. def read_excel_data(self):
  41. """
  42. 读取Excel文件数据
  43. @returns: DataFrame 读取的数据
  44. """
  45. try:
  46. logger.info(f"开始读取Excel文件: {self.excel_path}")
  47. logger.info(f"Sheet名称: {self.sheet_name}")
  48. # 检查文件是否存在
  49. if not os.path.exists(self.excel_path):
  50. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  51. # 读取Excel文件
  52. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  53. logger.info(f"成功读取数据,共 {len(df)} 行")
  54. logger.info(f"数据列: {list(df.columns)}")
  55. # 显示前几行数据供确认
  56. logger.info("前5行数据预览:")
  57. logger.info(df.head().to_string())
  58. return df
  59. except Exception as e:
  60. logger.error(f"读取Excel文件失败: {str(e)}")
  61. raise
  62. def validate_data(self, df):
  63. """
  64. 验证数据格式和完整性
  65. @param {DataFrame} df - 要验证的数据
  66. @returns: DataFrame 验证后的数据
  67. """
  68. try:
  69. logger.info("开始验证数据...")
  70. # 检查必需的列是否存在
  71. required_columns = ['Farmland_ID', 'Sample_ID', 'lon', 'lan', 'Type']
  72. missing_columns = [col for col in required_columns if col not in df.columns]
  73. if missing_columns:
  74. raise ValueError(f"缺少必需的列: {missing_columns}")
  75. # 检查数据类型
  76. logger.info("检查数据类型...")
  77. # 转换数值类型
  78. df['Farmland_ID'] = pd.to_numeric(df['Farmland_ID'], errors='coerce')
  79. df['Sample_ID'] = pd.to_numeric(df['Sample_ID'], errors='coerce')
  80. df['lon'] = pd.to_numeric(df['lon'], errors='coerce')
  81. df['lan'] = pd.to_numeric(df['lan'], errors='coerce')
  82. # 检查是否有无效的数值
  83. if df[['Farmland_ID', 'Sample_ID', 'lon', 'lan']].isnull().any().any():
  84. logger.warning("发现无效的数值,将跳过这些行")
  85. invalid_rows = df[df[['Farmland_ID', 'Sample_ID', 'lon', 'lan']].isnull().any(axis=1)]
  86. logger.warning(f"无效行数: {len(invalid_rows)}")
  87. df = df.dropna(subset=['Farmland_ID', 'Sample_ID', 'lon', 'lan'])
  88. # 转换Type字段
  89. logger.info("转换Type字段...")
  90. df['Type_Numeric'] = df['Type'].map(self.type_mapping)
  91. # 检查未知的Type值
  92. unknown_types = df[df['Type_Numeric'].isnull()]['Type'].unique()
  93. if len(unknown_types) > 0:
  94. logger.warning(f"发现未知的Type值: {unknown_types}")
  95. logger.warning("将为未知Type设置默认值0.0(旱地)")
  96. df['Type_Numeric'] = df['Type_Numeric'].fillna(0.0)
  97. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  98. return df
  99. except Exception as e:
  100. logger.error(f"数据验证失败: {str(e)}")
  101. raise
  102. def create_geometry(self, lon, lat):
  103. """
  104. 创建PostGIS Point几何对象
  105. @param {float} lon - 经度
  106. @param {float} lat - 纬度
  107. @returns: WKTElement 几何对象
  108. """
  109. return WKTElement(f'POINT({lon} {lat})', srid=4326)
  110. def import_data(self, df):
  111. """
  112. 将数据导入到数据库
  113. @param {DataFrame} df - 要导入的数据
  114. """
  115. try:
  116. logger.info("开始导入数据到数据库...")
  117. # 创建数据库会话
  118. db = SessionLocal()
  119. try:
  120. # 检查是否有重复数据
  121. existing_count = db.query(FarmlandData).count()
  122. logger.info(f"数据库中现有数据: {existing_count} 条")
  123. # 批量创建对象
  124. batch_size = 1000
  125. total_rows = len(df)
  126. imported_count = 0
  127. for i in range(0, total_rows, batch_size):
  128. batch_df = df.iloc[i:i+batch_size]
  129. batch_objects = []
  130. for _, row in batch_df.iterrows():
  131. try:
  132. # 创建FarmlandData对象
  133. farmland_data = FarmlandData(
  134. farmland_id=int(row['Farmland_ID']),
  135. sample_id=int(row['Sample_ID']),
  136. lon=float(row['lon']),
  137. lan=float(row['lan']),
  138. type=float(row['Type_Numeric']),
  139. geom=self.create_geometry(row['lon'], row['lan'])
  140. )
  141. batch_objects.append(farmland_data)
  142. except Exception as e:
  143. logger.warning(f"跳过行 {i+_}: {str(e)}")
  144. continue
  145. if batch_objects:
  146. # 批量插入
  147. db.add_all(batch_objects)
  148. db.commit()
  149. imported_count += len(batch_objects)
  150. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  151. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  152. # 验证导入结果
  153. final_count = db.query(FarmlandData).count()
  154. logger.info(f"导入后数据库总数据: {final_count} 条")
  155. except Exception as e:
  156. db.rollback()
  157. logger.error(f"数据导入失败,已回滚: {str(e)}")
  158. raise
  159. finally:
  160. db.close()
  161. except Exception as e:
  162. logger.error(f"数据导入过程失败: {str(e)}")
  163. raise
  164. def run_import(self):
  165. """
  166. 执行完整的导入流程
  167. """
  168. try:
  169. logger.info("=" * 60)
  170. logger.info("开始农田数据导入流程")
  171. logger.info("=" * 60)
  172. # 1. 读取Excel数据
  173. df = self.read_excel_data()
  174. # 2. 验证数据
  175. df = self.validate_data(df)
  176. # 3. 导入数据
  177. self.import_data(df)
  178. logger.info("=" * 60)
  179. logger.info("农田数据导入流程完成!")
  180. logger.info("=" * 60)
  181. except Exception as e:
  182. logger.error(f"导入流程失败: {str(e)}")
  183. raise
  184. def main():
  185. """
  186. 主函数
  187. """
  188. # Excel文件路径
  189. excel_path = r"D:\destkop\数据库对应数据.xlsx"
  190. sheet_name = "Farmland"
  191. try:
  192. # 创建导入器并执行导入
  193. importer = FarmlandDataImporter(excel_path, sheet_name)
  194. importer.run_import()
  195. except Exception as e:
  196. logger.error(f"程序执行失败: {str(e)}")
  197. sys.exit(1)
  198. if __name__ == "__main__":
  199. main()