|
@@ -0,0 +1,299 @@
|
|
|
+"""
|
|
|
+Atmo_sample数据导入脚本
|
|
|
+@description: 从Excel文件读取大气颗粒物采样数据并导入到atmo_sample_data表
|
|
|
+"""
|
|
|
+
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import pandas as pd
|
|
|
+import logging
|
|
|
+from sqlalchemy.orm import sessionmaker
|
|
|
+
|
|
|
+# 添加项目根目录到Python路径
|
|
|
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
+
|
|
|
+from app.database import engine, SessionLocal
|
|
|
+from app.models.atmo_sample import AtmoSampleData # 需创建对应的ORM模型
|
|
|
+
|
|
|
+# 设置日志
|
|
|
+logging.basicConfig(
|
|
|
+ level=logging.INFO,
|
|
|
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
|
+)
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+
|
|
|
+class AtmoSampleDataImporter:
|
|
|
+ """
|
|
|
+ 大气颗粒物采样数据导入器
|
|
|
+
|
|
|
+ @description: 从Excel文件读取大气颗粒物采样数据并导入到数据库
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, excel_path, sheet_name='Atmo_sample'):
|
|
|
+ """
|
|
|
+ 初始化导入器
|
|
|
+
|
|
|
+ @param {str} excel_path - Excel文件路径
|
|
|
+ @param {str} sheet_name - Sheet名称,默认为'Atmo_sample'
|
|
|
+ """
|
|
|
+ self.excel_path = excel_path
|
|
|
+ self.sheet_name = sheet_name
|
|
|
+
|
|
|
+ # 定义必需字段列表(根据数据库设计文档)
|
|
|
+ self.required_columns = [
|
|
|
+ 'ID', 'longitude', 'latitude', 'sampling_location',
|
|
|
+ 'start_time', 'end_time', 'cumulative_time',
|
|
|
+ 'average_flow_rate', 'cumulative_true_volume',
|
|
|
+ 'cumulative_standard_volume', 'sample_type',
|
|
|
+ 'sample_name', 'Cr_particulate', 'As_particulate',
|
|
|
+ 'Cd_particulate', 'Hg_particulate', 'Pb_particulate',
|
|
|
+ 'particle_weight', 'standard_volume',
|
|
|
+ 'particle_concentration', 'sample_code',
|
|
|
+ 'temperature', 'pressure', 'humidity',
|
|
|
+ 'wind_speed', 'wind_direction'
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 数值型字段列表
|
|
|
+ self.numeric_columns = [
|
|
|
+ 'longitude', 'latitude', 'average_flow_rate',
|
|
|
+ 'cumulative_true_volume', 'cumulative_standard_volume',
|
|
|
+ 'Cr_particulate', 'As_particulate', 'Cd_particulate',
|
|
|
+ 'Hg_particulate', 'Pb_particulate', 'particle_weight',
|
|
|
+ 'standard_volume', 'particle_concentration',
|
|
|
+ 'temperature', 'pressure', 'humidity', 'wind_speed'
|
|
|
+ ]
|
|
|
+
|
|
|
+ def read_excel_data(self):
|
|
|
+ """
|
|
|
+ 读取Excel文件数据
|
|
|
+
|
|
|
+ @returns: DataFrame 读取的数据
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ logger.info(f"开始读取Excel文件: {self.excel_path}")
|
|
|
+ logger.info(f"Sheet名称: {self.sheet_name}")
|
|
|
+
|
|
|
+ # 检查文件是否存在
|
|
|
+ if not os.path.exists(self.excel_path):
|
|
|
+ raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
|
|
|
+
|
|
|
+ # 读取Excel文件
|
|
|
+ df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
|
|
|
+
|
|
|
+ logger.info(f"成功读取数据,共 {len(df)} 行")
|
|
|
+ logger.info(f"数据列: {list(df.columns)}")
|
|
|
+
|
|
|
+ # 显示前几行数据供确认
|
|
|
+ logger.info("前5行数据预览:")
|
|
|
+ logger.info(df.head().to_string())
|
|
|
+
|
|
|
+ return df
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"读取Excel文件失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def validate_data(self, df):
|
|
|
+ """
|
|
|
+ 验证数据格式和完整性
|
|
|
+
|
|
|
+ @param {DataFrame} df - 要验证的数据
|
|
|
+ @returns: DataFrame 验证后的数据
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ logger.info("开始验证数据...")
|
|
|
+
|
|
|
+ # 检查必需的列是否存在
|
|
|
+ missing_columns = [col for col in self.required_columns if col not in df.columns]
|
|
|
+
|
|
|
+ if missing_columns:
|
|
|
+ raise ValueError(f"缺少必需的列: {missing_columns}")
|
|
|
+
|
|
|
+
|
|
|
+ # 检查数据类型
|
|
|
+ logger.info("检查数据类型...")
|
|
|
+
|
|
|
+
|
|
|
+ # 验证颗粒物浓度逻辑关系
|
|
|
+ particle_cols = ['particle_weight', 'standard_volume', 'particle_concentration']
|
|
|
+ if all(col in df.columns for col in particle_cols):
|
|
|
+ # 计算颗粒物浓度 = 颗粒物质量(mg) * 1000 / 标准体积(m³)
|
|
|
+ # 因为浓度单位是ug/m³,而颗粒物质量单位是mg(1mg = 1000ug)
|
|
|
+ calculated_concentration = df['particle_weight'] * 1000 / df['standard_volume']
|
|
|
+
|
|
|
+ tolerance = 1e-6
|
|
|
+ mismatches = abs(df['particle_concentration'] - calculated_concentration) > tolerance
|
|
|
+
|
|
|
+ if mismatches.any():
|
|
|
+ mismatched_indices = mismatches[mismatches].index.tolist()
|
|
|
+ logger.warning(f"发现 {len(mismatched_indices)} 行颗粒物浓度值不符合逻辑:")
|
|
|
+
|
|
|
+ for i in mismatched_indices[:5]:
|
|
|
+ logger.warning(
|
|
|
+ f"行 {i}: 计算值={calculated_concentration[i]}, 实际值={df.at[i, 'particle_concentration']}")
|
|
|
+
|
|
|
+ # 用计算值覆盖原始值
|
|
|
+ df['particle_concentration'] = calculated_concentration
|
|
|
+ logger.info("已自动修正颗粒物浓度值")
|
|
|
+
|
|
|
+ # 处理空值
|
|
|
+ empty_columns = df.isnull().any()
|
|
|
+ empty_cols = [col for col in empty_columns.index if empty_columns[col]]
|
|
|
+
|
|
|
+ if empty_cols:
|
|
|
+ logger.warning(f"发现以下列存在空值: {', '.join(empty_cols)}")
|
|
|
+
|
|
|
+ # 文本列填充空值
|
|
|
+ text_columns = ['id', 'sampling_location', 'start_time', 'end_time', 'cumulative_time',
|
|
|
+ 'sample_type', 'sample_name', 'sample_code', 'wind_direction']
|
|
|
+
|
|
|
+ for col in text_columns:
|
|
|
+ if col in df.columns and df[col].isnull().any():
|
|
|
+ logger.warning(f"{col}列存在空值,填充为'未知'")
|
|
|
+ df[col] = df[col].fillna('未知')
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ logger.info(f"数据验证完成,有效数据 {len(df)} 行")
|
|
|
+
|
|
|
+ return df
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"数据验证失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def import_data(self, df):
|
|
|
+ """
|
|
|
+ 将数据导入到数据库
|
|
|
+
|
|
|
+ @param {DataFrame} df - 要导入的数据
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ logger.info("开始导入数据到数据库...")
|
|
|
+
|
|
|
+ # 创建数据库会话
|
|
|
+ db = SessionLocal()
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 检查是否有重复数据
|
|
|
+ existing_count = db.query(AtmoSampleData).count()
|
|
|
+ logger.info(f"数据库中现有数据: {existing_count} 条")
|
|
|
+
|
|
|
+ # 批量创建对象
|
|
|
+ batch_size = 100
|
|
|
+ total_rows = len(df)
|
|
|
+ imported_count = 0
|
|
|
+
|
|
|
+ for i in range(0, total_rows, batch_size):
|
|
|
+ batch_df = df.iloc[i:i + batch_size]
|
|
|
+ batch_objects = []
|
|
|
+
|
|
|
+ for _, row in batch_df.iterrows():
|
|
|
+ try:
|
|
|
+ # 创建AtmoSampleData对象
|
|
|
+ atmo_sample = AtmoSampleData(
|
|
|
+ id=str(row['ID']),
|
|
|
+ longitude=float(row['longitude']),
|
|
|
+ latitude=float(row['latitude']),
|
|
|
+ sampling_location=str(row['sampling_location']),
|
|
|
+ start_time=str(row['start_time']),
|
|
|
+ end_time=str(row['end_time']),
|
|
|
+ cumulative_time=str(row['cumulative_time']),
|
|
|
+ average_flow_rate=float(row['average_flow_rate']),
|
|
|
+ cumulative_true_volume=float(row['cumulative_true_volume']),
|
|
|
+ cumulative_standard_volume=float(row['cumulative_standard_volume']),
|
|
|
+ sample_type=str(row['sample_type']),
|
|
|
+ sample_name=str(row['sample_name']),
|
|
|
+ Cr_particulate=float(row['Cr_particulate']),
|
|
|
+ As_particulate=float(row['As_particulate']),
|
|
|
+ Cd_particulate=float(row['Cd_particulate']),
|
|
|
+ Hg_particulate=float(row['Hg_particulate']),
|
|
|
+ Pb_particulate=float(row['Pb_particulate']),
|
|
|
+ particle_weight=float(row['particle_weight']),
|
|
|
+ standard_volume=float(row['standard_volume']),
|
|
|
+ particle_concentration=float(row['particle_concentration']),
|
|
|
+ sample_code=str(row['sample_code']),
|
|
|
+ temperature=float(row['temperature']),
|
|
|
+ pressure=float(row['pressure']),
|
|
|
+ humidity=float(row['humidity']),
|
|
|
+ wind_speed=float(row['wind_speed']),
|
|
|
+ wind_direction=str(row['wind_direction'])
|
|
|
+ )
|
|
|
+ batch_objects.append(atmo_sample)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"跳过行 {i + _}: {str(e)}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ if batch_objects:
|
|
|
+ # 批量插入
|
|
|
+ db.add_all(batch_objects)
|
|
|
+ db.commit()
|
|
|
+ imported_count += len(batch_objects)
|
|
|
+ logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
|
|
|
+
|
|
|
+ logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
|
|
|
+
|
|
|
+ # 验证导入结果
|
|
|
+ final_count = db.query(AtmoSampleData).count()
|
|
|
+ logger.info(f"导入后数据库总数据: {final_count} 条")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ db.rollback()
|
|
|
+ logger.error(f"数据导入失败,已回滚: {str(e)}")
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ db.close()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"数据导入过程失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def run_import(self):
|
|
|
+ """
|
|
|
+ 执行完整的导入流程
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ logger.info("=" * 60)
|
|
|
+ logger.info("开始大气颗粒物采样数据导入流程")
|
|
|
+ logger.info("=" * 60)
|
|
|
+
|
|
|
+ # 1. 读取Excel数据
|
|
|
+ df = self.read_excel_data()
|
|
|
+
|
|
|
+ # 2. 验证数据
|
|
|
+ df = self.validate_data(df)
|
|
|
+
|
|
|
+ # 3. 导入数据
|
|
|
+ self.import_data(df)
|
|
|
+
|
|
|
+ logger.info("=" * 60)
|
|
|
+ logger.info("大气颗粒物采样数据导入流程完成!")
|
|
|
+ logger.info("=" * 60)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"导入流程失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ """
|
|
|
+ 主函数
|
|
|
+ """
|
|
|
+ # Excel文件路径
|
|
|
+ excel_path = r"D:\destkop\数据库对应数据.xlsx" # 与原始文件相同
|
|
|
+ sheet_name = "Atmo_sample" # 指定对应的sheet名称
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 创建导入器并执行导入
|
|
|
+ importer = AtmoSampleDataImporter(excel_path, sheet_name)
|
|
|
+ importer.run_import()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"程序执行失败: {str(e)}")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|