import_cross_section.py 8.2 KB


  1. """
  2. Cross_section数据导入脚本
  3. @description: 从Excel文件读取河流断面数据并导入到cross_section表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. from sqlalchemy.orm import sessionmaker
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. from app.database import engine, SessionLocal
  14. from app.models.cross_section import CrossSection # 需创建对应的ORM模型
  15. # 设置日志
  16. logging.basicConfig(
  17. level=logging.INFO,
  18. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  19. )
  20. logger = logging.getLogger(__name__)
  21. class CrossSectionDataImporter:
  22. """
  23. 河流断面数据导入器
  24. @description: 从Excel文件读取河流断面数据并导入到数据库
  25. """
  26. def __init__(self, excel_path, sheet_name='Cross_section'):
  27. """
  28. 初始化导入器
  29. @param {str} excel_path - Excel文件路径
  30. @param {str} sheet_name - Sheet名称,默认为'Cross_section'
  31. """
  32. self.excel_path = excel_path
  33. self.sheet_name = sheet_name
  34. # 定义必需字段列表(根据数据库设计文档,除ID外)
  35. self.required_columns = ['River', 'Position', 'County', 'Lon', 'Lan', 'Cd']
  36. def read_excel_data(self):
  37. """
  38. 读取Excel文件数据
  39. @returns: DataFrame 读取的数据
  40. """
  41. try:
  42. logger.info(f"开始读取Excel文件: {self.excel_path}")
  43. logger.info(f"Sheet名称: {self.sheet_name}")
  44. # 检查文件是否存在
  45. if not os.path.exists(self.excel_path):
  46. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  47. # 读取Excel文件
  48. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  49. logger.info(f"成功读取数据,共 {len(df)} 行")
  50. logger.info(f"数据列: {list(df.columns)}")
  51. # 显示前几行数据供确认
  52. logger.info("前5行数据预览:")
  53. logger.info(df.head().to_string())
  54. return df
  55. except Exception as e:
  56. logger.error(f"读取Excel文件失败: {str(e)}")
  57. raise
  58. def validate_data(self, df):
  59. """
  60. 验证数据格式和完整性
  61. @param {DataFrame} df - 要验证的数据
  62. @returns: DataFrame 验证后的数据
  63. """
  64. try:
  65. logger.info("开始验证数据...")
  66. # 检查必需的列是否存在
  67. missing_columns = [col for col in self.required_columns if col not in df.columns]
  68. if missing_columns:
  69. raise ValueError(f"缺少必需的列: {missing_columns}")
  70. # 将列名转换为小写(带下划线)以匹配数据库字段
  71. df.columns = [col.lower() for col in df.columns]
  72. required_columns_lower = [col.lower() for col in self.required_columns]
  73. # 检查数据类型
  74. logger.info("检查数据类型...")
  75. # 转换数值类型
  76. numeric_columns = ['lon', 'lan', 'cd']
  77. for col in numeric_columns:
  78. if col in df.columns:
  79. # 对于数值列,转换为浮点数
  80. df[col] = pd.to_numeric(df[col], errors='coerce')
  81. # 处理空值 - 所有字段必须非空
  82. original_count = len(df)
  83. df = df.dropna(subset=required_columns_lower)
  84. new_count = len(df)
  85. if new_count < original_count:
  86. logger.warning(f"删除空值行 {original_count - new_count} 行")
  87. # 验证经纬度范围
  88. longitude_errors = df[(df['lon'] < -180) | (df['lon'] > 180)]
  89. latitude_errors = df[(df['lan'] < -90) | (df['lan'] > 90)]
  90. if not longitude_errors.empty or not latitude_errors.empty:
  91. logger.warning("发现经纬度无效值,将删除这些行")
  92. # 保留有效经纬度行
  93. valid_mask = (df['lon'].between(-180, 180)) & (df['lan'].between(-90, 90))
  94. invalid_df = df[~valid_mask]
  95. df = df[valid_mask]
  96. logger.warning(f"删除无效经纬度数据 {len(invalid_df)} 行")
  97. if not invalid_df.empty:
  98. logger.warning("部分无效行示例:")
  99. logger.warning(invalid_df[['lon', 'lan', 'position']].head().to_string())
  100. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  101. return df
  102. except Exception as e:
  103. logger.error(f"数据验证失败: {str(e)}")
  104. raise
  105. def import_data(self, df):
  106. """
  107. 将数据导入到数据库
  108. @param {DataFrame} df - 要导入的数据
  109. """
  110. try:
  111. logger.info("开始导入数据到数据库...")
  112. # 创建数据库会话
  113. db = SessionLocal()
  114. try:
  115. # 检查是否有重复数据
  116. existing_count = db.query(CrossSection).count()
  117. logger.info(f"数据库中现有数据: {existing_count} 条")
  118. # 批量创建对象
  119. batch_size = 1000
  120. total_rows = len(df)
  121. imported_count = 0
  122. for i in range(0, total_rows, batch_size):
  123. batch_df = df.iloc[i:i + batch_size]
  124. batch_objects = []
  125. for _, row in batch_df.iterrows():
  126. try:
  127. # 创建CrossSectionData对象
  128. cross_section = CrossSection(
  129. river_name=str(row['river']),
  130. position=str(row['position']),
  131. county=str(row['county']),
  132. longitude=float(row['lon']),
  133. latitude=float(row['lan']),
  134. cd_concentration=float(row['cd'])
  135. )
  136. batch_objects.append(cross_section)
  137. except Exception as e:
  138. logger.warning(f"跳过行 {i + _}: {str(e)}")
  139. continue
  140. if batch_objects:
  141. # 批量插入
  142. db.add_all(batch_objects)
  143. db.commit()
  144. imported_count += len(batch_objects)
  145. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  146. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  147. # 验证导入结果
  148. final_count = db.query(CrossSection).count()
  149. logger.info(f"导入后数据库总数据: {final_count} 条")
  150. except Exception as e:
  151. db.rollback()
  152. logger.error(f"数据导入失败,已回滚: {str(e)}")
  153. raise
  154. finally:
  155. db.close()
  156. except Exception as e:
  157. logger.error(f"数据导入过程失败: {str(e)}")
  158. raise
  159. def run_import(self):
  160. """
  161. 执行完整的导入流程
  162. """
  163. try:
  164. logger.info("=" * 60)
  165. logger.info("开始河流断面数据导入流程")
  166. logger.info("=" * 60)
  167. # 1. 读取Excel数据
  168. df = self.read_excel_data()
  169. # 2. 验证数据
  170. df = self.validate_data(df)
  171. # 3. 导入数据
  172. self.import_data(df)
  173. logger.info("=" * 60)
  174. logger.info("河流断面数据导入流程完成!")
  175. logger.info = "=" * 60
  176. except Exception as e:
  177. logger.error(f"导入流程失败: {str(e)}")
  178. raise
  179. def main():
  180. """
  181. 主函数
  182. """
  183. # Excel文件路径
  184. excel_path = r"D:\destkop\数据库对应数据.xlsx"
  185. sheet_name = "Cross_section"
  186. try:
  187. # 创建导入器并执行导入
  188. importer = CrossSectionDataImporter(excel_path, sheet_name)
  189. importer.run_import()
  190. except Exception as e:
  191. logger.error(f"程序执行失败: {str(e)}")
  192. sys.exit(1)
  193. if __name__ == "__main__":
  194. main()