import_atmo_sample.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. """
  2. Atmo_sample数据导入脚本
  3. @description: 从Excel文件读取大气颗粒物采样数据并导入到atmo_sample_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from sqlalchemy.orm import sessionmaker
  10. # 添加项目根目录到Python路径
  11. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  12. from app.database import engine, SessionLocal
  13. from app.models.atmo_sample import AtmoSampleData # 需创建对应的ORM模型
  14. # 设置日志
  15. logging.basicConfig(
  16. level=logging.INFO,
  17. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  18. )
  19. logger = logging.getLogger(__name__)
  20. class AtmoSampleDataImporter:
  21. """
  22. 大气颗粒物采样数据导入器
  23. @description: 从Excel文件读取大气颗粒物采样数据并导入到数据库
  24. """
  25. def __init__(self, excel_path, sheet_name='Atmo_sample'):
  26. """
  27. 初始化导入器
  28. @param {str} excel_path - Excel文件路径
  29. @param {str} sheet_name - Sheet名称,默认为'Atmo_sample'
  30. """
  31. self.excel_path = excel_path
  32. self.sheet_name = sheet_name
  33. # 定义必需字段列表(根据数据库设计文档)
  34. self.required_columns = [
  35. 'ID', 'longitude', 'latitude', 'sampling_location',
  36. 'start_time', 'end_time', 'cumulative_time',
  37. 'average_flow_rate', 'cumulative_true_volume',
  38. 'cumulative_standard_volume', 'sample_type',
  39. 'sample_name', 'Cr_particulate', 'As_particulate',
  40. 'Cd_particulate', 'Hg_particulate', 'Pb_particulate',
  41. 'particle_weight', 'standard_volume',
  42. 'particle_concentration', 'sample_code',
  43. 'temperature', 'pressure', 'humidity',
  44. 'wind_speed', 'wind_direction'
  45. ]
  46. # 数值型字段列表
  47. self.numeric_columns = [
  48. 'longitude', 'latitude', 'average_flow_rate',
  49. 'cumulative_true_volume', 'cumulative_standard_volume',
  50. 'Cr_particulate', 'As_particulate', 'Cd_particulate',
  51. 'Hg_particulate', 'Pb_particulate', 'particle_weight',
  52. 'standard_volume', 'particle_concentration',
  53. 'temperature', 'pressure', 'humidity', 'wind_speed'
  54. ]
  55. def read_excel_data(self):
  56. """
  57. 读取Excel文件数据
  58. @returns: DataFrame 读取的数据
  59. """
  60. try:
  61. logger.info(f"开始读取Excel文件: {self.excel_path}")
  62. logger.info(f"Sheet名称: {self.sheet_name}")
  63. # 检查文件是否存在
  64. if not os.path.exists(self.excel_path):
  65. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  66. # 读取Excel文件
  67. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  68. logger.info(f"成功读取数据,共 {len(df)} 行")
  69. logger.info(f"数据列: {list(df.columns)}")
  70. # 显示前几行数据供确认
  71. logger.info("前5行数据预览:")
  72. logger.info(df.head().to_string())
  73. return df
  74. except Exception as e:
  75. logger.error(f"读取Excel文件失败: {str(e)}")
  76. raise
  77. def validate_data(self, df):
  78. """
  79. 验证数据格式和完整性
  80. @param {DataFrame} df - 要验证的数据
  81. @returns: DataFrame 验证后的数据
  82. """
  83. try:
  84. logger.info("开始验证数据...")
  85. # 检查必需的列是否存在
  86. missing_columns = [col for col in self.required_columns if col not in df.columns]
  87. if missing_columns:
  88. raise ValueError(f"缺少必需的列: {missing_columns}")
  89. # 检查数据类型
  90. logger.info("检查数据类型...")
  91. # 验证颗粒物浓度逻辑关系
  92. particle_cols = ['particle_weight', 'standard_volume', 'particle_concentration']
  93. if all(col in df.columns for col in particle_cols):
  94. # 计算颗粒物浓度 = 颗粒物质量(mg) * 1000 / 标准体积(m³)
  95. # 因为浓度单位是ug/m³,而颗粒物质量单位是mg(1mg = 1000ug)
  96. calculated_concentration = df['particle_weight'] * 1000 / df['standard_volume']
  97. tolerance = 1e-6
  98. mismatches = abs(df['particle_concentration'] - calculated_concentration) > tolerance
  99. if mismatches.any():
  100. mismatched_indices = mismatches[mismatches].index.tolist()
  101. logger.warning(f"发现 {len(mismatched_indices)} 行颗粒物浓度值不符合逻辑:")
  102. for i in mismatched_indices[:5]:
  103. logger.warning(
  104. f"行 {i}: 计算值={calculated_concentration[i]}, 实际值={df.at[i, 'particle_concentration']}")
  105. # 用计算值覆盖原始值
  106. df['particle_concentration'] = calculated_concentration
  107. logger.info("已自动修正颗粒物浓度值")
  108. # 处理空值
  109. empty_columns = df.isnull().any()
  110. empty_cols = [col for col in empty_columns.index if empty_columns[col]]
  111. if empty_cols:
  112. logger.warning(f"发现以下列存在空值: {', '.join(empty_cols)}")
  113. # 文本列填充空值
  114. text_columns = ['id', 'sampling_location', 'start_time', 'end_time', 'cumulative_time',
  115. 'sample_type', 'sample_name', 'sample_code', 'wind_direction']
  116. for col in text_columns:
  117. if col in df.columns and df[col].isnull().any():
  118. logger.warning(f"{col}列存在空值,填充为'未知'")
  119. df[col] = df[col].fillna('未知')
  120. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  121. return df
  122. except Exception as e:
  123. logger.error(f"数据验证失败: {str(e)}")
  124. raise
  125. def import_data(self, df):
  126. """
  127. 将数据导入到数据库
  128. @param {DataFrame} df - 要导入的数据
  129. """
  130. try:
  131. logger.info("开始导入数据到数据库...")
  132. # 创建数据库会话
  133. db = SessionLocal()
  134. try:
  135. # 检查是否有重复数据
  136. existing_count = db.query(AtmoSampleData).count()
  137. logger.info(f"数据库中现有数据: {existing_count} 条")
  138. # 批量创建对象
  139. batch_size = 100
  140. total_rows = len(df)
  141. imported_count = 0
  142. for i in range(0, total_rows, batch_size):
  143. batch_df = df.iloc[i:i + batch_size]
  144. batch_objects = []
  145. for _, row in batch_df.iterrows():
  146. try:
  147. # 创建AtmoSampleData对象
  148. atmo_sample = AtmoSampleData(
  149. id=str(row['ID']),
  150. longitude=float(row['longitude']),
  151. latitude=float(row['latitude']),
  152. sampling_location=str(row['sampling_location']),
  153. start_time=str(row['start_time']),
  154. end_time=str(row['end_time']),
  155. cumulative_time=str(row['cumulative_time']),
  156. average_flow_rate=float(row['average_flow_rate']),
  157. cumulative_true_volume=float(row['cumulative_true_volume']),
  158. cumulative_standard_volume=float(row['cumulative_standard_volume']),
  159. sample_type=str(row['sample_type']),
  160. sample_name=str(row['sample_name']),
  161. Cr_particulate=float(row['Cr_particulate']),
  162. As_particulate=float(row['As_particulate']),
  163. Cd_particulate=float(row['Cd_particulate']),
  164. Hg_particulate=float(row['Hg_particulate']),
  165. Pb_particulate=float(row['Pb_particulate']),
  166. particle_weight=float(row['particle_weight']),
  167. standard_volume=float(row['standard_volume']),
  168. particle_concentration=float(row['particle_concentration']),
  169. sample_code=str(row['sample_code']),
  170. temperature=float(row['temperature']),
  171. pressure=float(row['pressure']),
  172. humidity=float(row['humidity']),
  173. wind_speed=float(row['wind_speed']),
  174. wind_direction=str(row['wind_direction'])
  175. )
  176. batch_objects.append(atmo_sample)
  177. except Exception as e:
  178. logger.warning(f"跳过行 {i + _}: {str(e)}")
  179. continue
  180. if batch_objects:
  181. # 批量插入
  182. db.add_all(batch_objects)
  183. db.commit()
  184. imported_count += len(batch_objects)
  185. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  186. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  187. # 验证导入结果
  188. final_count = db.query(AtmoSampleData).count()
  189. logger.info(f"导入后数据库总数据: {final_count} 条")
  190. except Exception as e:
  191. db.rollback()
  192. logger.error(f"数据导入失败,已回滚: {str(e)}")
  193. raise
  194. finally:
  195. db.close()
  196. except Exception as e:
  197. logger.error(f"数据导入过程失败: {str(e)}")
  198. raise
  199. def run_import(self):
  200. """
  201. 执行完整的导入流程
  202. """
  203. try:
  204. logger.info("=" * 60)
  205. logger.info("开始大气颗粒物采样数据导入流程")
  206. logger.info("=" * 60)
  207. # 1. 读取Excel数据
  208. df = self.read_excel_data()
  209. # 2. 验证数据
  210. df = self.validate_data(df)
  211. # 3. 导入数据
  212. self.import_data(df)
  213. logger.info("=" * 60)
  214. logger.info("大气颗粒物采样数据导入流程完成!")
  215. logger.info("=" * 60)
  216. except Exception as e:
  217. logger.error(f"导入流程失败: {str(e)}")
  218. raise
  219. def main():
  220. """
  221. 主函数
  222. """
  223. # Excel文件路径
  224. excel_path = r"D:\destkop\数据库对应数据.xlsx" # 与原始文件相同
  225. sheet_name = "Atmo_sample" # 指定对应的sheet名称
  226. try:
  227. # 创建导入器并执行导入
  228. importer = AtmoSampleDataImporter(excel_path, sheet_name)
  229. importer.run_import()
  230. except Exception as e:
  231. logger.error(f"程序执行失败: {str(e)}")
  232. sys.exit(1)
  233. if __name__ == "__main__":
  234. main()