import_atmo_company.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. """
  2. Atmo_company数据导入脚本
  3. @description: 从Excel文件读取涉重企业数据并导入到atmo_company表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from sqlalchemy.orm import sessionmaker
  10. # 添加项目根目录到Python路径
  11. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  12. from app.database import engine, SessionLocal
  13. from app.models.atmo_company import AtmoCompany # 需创建对应的ORM模型
  14. # 设置日志
  15. logging.basicConfig(
  16. level=logging.INFO,
  17. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  18. )
  19. logger = logging.getLogger(__name__)
  20. class AtmoCompanyDataImporter:
  21. """
  22. 涉重企业数据导入器
  23. @description: 从Excel文件读取涉重企业数据并导入到数据库
  24. """
  25. def __init__(self, excel_path, sheet_name='Atmo_company'):
  26. """
  27. 初始化导入器
  28. @param {str} excel_path - Excel文件路径
  29. @param {str} sheet_name - Sheet名称,默认为'Atmo_company'
  30. """
  31. self.excel_path = excel_path
  32. self.sheet_name = sheet_name
  33. # 定义必需字段列表(根据数据库设计文档,除了ID)
  34. self.required_columns = [
  35. 'longitude', 'latitude', 'company_name', 'company_type', 'county', 'particulate_emission'
  36. ]
  37. # 数值型字段列表
  38. self.numeric_columns = ['longitude', 'latitude', 'particulate_emission']
  39. def read_excel_data(self):
  40. """
  41. 读取Excel文件数据
  42. @returns: DataFrame 读取的数据
  43. """
  44. try:
  45. logger.info(f"开始读取Excel文件: {self.excel_path}")
  46. logger.info(f"Sheet名称: {self.sheet_name}")
  47. # 检查文件是否存在
  48. if not os.path.exists(self.excel_path):
  49. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  50. # 读取Excel文件
  51. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  52. logger.info(f"成功读取数据,共 {len(df)} 行")
  53. logger.info(f"数据列: {list(df.columns)}")
  54. # 显示前几行数据供确认
  55. logger.info("前5行数据预览:")
  56. logger.info(df.head().to_string())
  57. return df
  58. except Exception as e:
  59. logger.error(f"读取Excel文件失败: {str(e)}")
  60. raise
  61. def validate_data(self, df):
  62. """
  63. 验证数据格式和完整性
  64. @param {DataFrame} df - 要验证的数据
  65. @returns: DataFrame 验证后的数据
  66. """
  67. try:
  68. logger.info("开始验证数据...")
  69. # 检查必需的列是否存在
  70. missing_columns = [col for col in self.required_columns if col not in df.columns]
  71. if missing_columns:
  72. raise ValueError(f"缺少必需的列: {missing_columns}")
  73. # 将列名转换为小写(带下划线)
  74. df.columns = [col.lower() for col in df.columns]
  75. # 检查数据类型
  76. logger.info("检查数据类型...")
  77. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  78. return df
  79. except Exception as e:
  80. logger.error(f"数据验证失败: {str(e)}")
  81. raise
  82. def import_data(self, df):
  83. """
  84. 将数据导入到数据库
  85. @param {DataFrame} df - 要导入的数据
  86. """
  87. try:
  88. logger.info("开始导入数据到数据库...")
  89. # 创建数据库会话
  90. db = SessionLocal()
  91. try:
  92. # 检查是否有重复数据
  93. existing_count = db.query(AtmoCompany).count()
  94. logger.info(f"数据库中现有数据: {existing_count} 条")
  95. # 批量创建对象
  96. batch_size = 1000
  97. total_rows = len(df)
  98. imported_count = 0
  99. for i in range(0, total_rows, batch_size):
  100. batch_df = df.iloc[i:i + batch_size]
  101. batch_objects = []
  102. for _, row in batch_df.iterrows():
  103. try:
  104. # 创建AtmoCompanyData对象
  105. atmo_company = AtmoCompany(
  106. longitude=float(row['longitude']),
  107. latitude=float(row['latitude']),
  108. company_name=str(row['company_name']),
  109. company_type=str(row['company_type']),
  110. county=str(row['county']),
  111. particulate_emission=float(row['particulate_emission'])
  112. )
  113. batch_objects.append(atmo_company)
  114. except Exception as e:
  115. logger.warning(f"跳过行 {i + _}: {str(e)}")
  116. continue
  117. if batch_objects:
  118. # 批量插入
  119. db.add_all(batch_objects)
  120. db.commit()
  121. imported_count += len(batch_objects)
  122. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  123. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  124. # 验证导入结果
  125. final_count = db.query(AtmoCompany).count()
  126. logger.info(f"导入后数据库总数据: {final_count} 条")
  127. except Exception as e:
  128. db.rollback()
  129. logger.error(f"数据导入失败,已回滚: {str(e)}")
  130. raise
  131. finally:
  132. db.close()
  133. except Exception as e:
  134. logger.error(f"数据导入过程失败: {str(e)}")
  135. raise
  136. def run_import(self):
  137. """
  138. 执行完整的导入流程
  139. """
  140. try:
  141. logger.info("=" * 60)
  142. logger.info("开始涉重企业数据导入流程")
  143. logger.info("=" * 60)
  144. # 1. 读取Excel数据
  145. df = self.read_excel_data()
  146. # 2. 验证数据
  147. df = self.validate_data(df)
  148. # 3. 导入数据
  149. self.import_data(df)
  150. logger.info("=" * 60)
  151. logger.info("涉重企业数据导入流程完成!")
  152. logger.info("=" * 60)
  153. except Exception as e:
  154. logger.error(f"导入流程失败: {str(e)}")
  155. raise
  156. def main():
  157. """
  158. 主函数
  159. """
  160. # Excel文件路径
  161. excel_path = r"D:\destkop\数据库对应数据.xlsx" # 与原始文件相同
  162. sheet_name = "Atmo_company" # 指定对应的sheet名称
  163. try:
  164. # 创建导入器并执行导入
  165. importer = AtmoCompanyDataImporter(excel_path, sheet_name)
  166. importer.run_import()
  167. except Exception as e:
  168. logger.error(f"程序执行失败: {str(e)}")
  169. sys.exit(1)
  170. if __name__ == "__main__":
  171. main()