|
- """
- Water_sample数据导入脚本
- @description: 从Excel文件读取灌溉水采样数据并导入到water_sampling_data表
- """
- import os
- import sys
- import pandas as pd
- import logging
- from datetime import datetime
- from sqlalchemy.orm import sessionmaker
- import re
- # 添加项目根目录到Python路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from app.database import engine, SessionLocal
- from app.models.water_sample import WaterSampleData # 需创建对应的ORM模型
- # 设置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class WaterSampleDataImporter:
- """
- 灌溉水采样数据导入器
- @description: 从Excel文件读取灌溉水采样数据并导入到数据库
- """
- def __init__(self, excel_path, sheet_name='Water_sample'):
- """
- 初始化导入器
- @param {str} excel_path - Excel文件路径
- @param {str} sheet_name - Sheet名称,默认为'Water_sample'
- """
- self.excel_path = excel_path
- self.sheet_name = sheet_name
- # 定义必需字段列表
- self.required_columns = [
- 'sample_code', 'lon', 'lat', 'sampling_location',
- 'sample_time', 'Cr', 'As', 'Cd', 'Hg', 'Pb', 'pH'
- ]
- # 数值型字段列表
- self.numeric_columns = [
- 'lon', 'lat', 'sampling_volume', 'Cr', 'As', 'Cd', 'Hg', 'Pb', 'pH'
- ]
- def read_excel_data(self):
- """
- 读取Excel文件数据
- @returns: DataFrame 读取的数据
- """
- try:
- logger.info(f"开始读取Excel文件: {self.excel_path}")
- logger.info(f"Sheet名称: {self.sheet_name}")
- # 检查文件是否存在
- if not os.path.exists(self.excel_path):
- raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
- # 读取Excel文件
- df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
- logger.info(f"成功读取数据,共 {len(df)} 行")
- logger.info(f"数据列: {list(df.columns)}")
- # 显示前几行数据供确认
- logger.info("前5行数据预览:")
- logger.info(df.head().to_string())
- return df
- except Exception as e:
- logger.error(f"读取Excel文件失败: {str(e)}")
- raise
- def clean_sample_time(self, time_str):
- """
- 清理和标准化样本时间格式
- @param {str} time_str - 时间字符串
- @returns: datetime 标准化的时间对象
- """
- try:
- # 尝试转换常见格式
- if isinstance(time_str, str):
- # 处理可能的格式:2024.5.20 16:37
- if '.' in time_str and ':' in time_str:
- return datetime.strptime(time_str, "%Y.%m.%d %H:%M")
- # 处理其他格式
- elif '/' in time_str:
- return datetime.strptime(time_str, "%Y/%m/%d %H:%M")
- elif '-' in time_str:
- return datetime.strptime(time_str, "%Y-%m-%d %H:%M")
- return time_str
- except Exception as e:
- logger.warning(f"无法解析时间字符串: {time_str}, 错误: {str(e)}")
- return None
- def validate_data(self, df):
- """
- 验证数据格式和完整性
- @param {DataFrame} df - 要验证的数据
- @returns: DataFrame 验证后的数据
- """
- try:
- logger.info("开始验证数据...")
- # 检查必需的列是否存在
- missing_columns = [col for col in self.required_columns if col not in df.columns]
- if missing_columns:
- raise ValueError(f"缺少必需的列: {missing_columns}")
- # 将列名转换为小写(带下划线)
- df.columns = [col.lower().replace(' ', '_') for col in df.columns]
- required_columns_lower = [col.lower().replace(' ', '_') for col in self.required_columns]
- numeric_columns_lower = [col.lower().replace(' ', '_') for col in self.numeric_columns]
- # 检查数据类型
- logger.info("检查数据类型...")
- # 转换数值类型
- for col in numeric_columns_lower:
- if col in df.columns:
- # 对于数值列,转换为浮点数
- df[col] = pd.to_numeric(df[col], errors='coerce')
- # 特殊处理时间字段
- if 'sample_time' in df.columns:
- # 清理和标准化时间格式
- df['sample_time'] = df['sample_time'].apply(self.clean_sample_time)
- # 处理空值 - 所有字段必须非空(除了文本描述)
- empty_columns = df.isnull().any()
- empty_cols = [col for col in empty_columns.index if empty_columns[col]]
- if empty_cols:
- logger.warning(f"发现以下列存在空值: {', '.join(empty_cols)}")
- # 对于数值列,如果有空值,填充为0
- for col in numeric_columns_lower:
- if col in df.columns and df[col].isnull().any():
- df[col] = df[col].fillna(0)
- logger.info(f"已将 {col} 的空值替换为0")
- # 文本列填充空值为'未知'
- text_columns = [
- 'sample_number', 'weather', 'container_material',
- 'container_color', 'sample_description',
- 'water_quality', 'water_environment',
- 'storage_method'
- ]
- for col in text_columns:
- if col in df.columns and df[col].isnull().any():
- logger.warning(f"{col}列存在空值,填充为'未知'")
- df[col] = df[col].fillna('未知')
- # 验证经纬度范围
- longitude_errors = df[(df['lon'] < -180) | (df['lon'] > 180) | (df['lon'].isna())]
- latitude_errors = df[(df['lat'] < -90) | (df['lat'] > 90) | (df['lat'].isna())]
- if not longitude_errors.empty or not latitude_errors.empty:
- logger.warning("发现经纬度无效值或空值,将删除这些行")
- # 保留有效经纬度行
- valid_mask = ~df['lon'].isna() & ~df['lat'].isna() & (df['lon'].between(-180, 180)) & (df['lat'].between(-90, 90))
- invalid_df = df[~valid_mask]
- df = df[valid_mask]
- logger.warning(f"删除无效经纬度数据 {len(invalid_df)} 行")
- if not invalid_df.empty:
- logger.warning("部分无效行示例:")
- logger.warning(invalid_df[['lon', 'lat', 'sampling_location']].head().to_string())
- logger.info(f"数据验证完成,有效数据 {len(df)} 行")
- return df
- except Exception as e:
- logger.error(f"数据验证失败: {str(e)}")
- raise
- def import_data(self, df):
- """
- 将数据导入到数据库
- @param {DataFrame} df - 要导入的数据
- """
- try:
- logger.info("开始导入数据到数据库...")
- # 创建数据库会话
- db = SessionLocal()
- try:
- # 检查是否有重复数据
- existing_count = db.query(WaterSampleData).count()
- logger.info(f"数据库中现有数据: {existing_count} 条")
- # 批量创建对象
- batch_size = 500 # 由于字段较多,适当减小批量大小
- total_rows = len(df)
- imported_count = 0
- for i in range(0, total_rows, batch_size):
- batch_df = df.iloc[i:i+batch_size]
- batch_objects = []
- for _, row in batch_df.iterrows():
- try:
- # 创建WaterSampleData对象
- water_sample = WaterSampleData(
- sample_code=str(row['sample_code']),
- sample_number=str(row['sample_number']),
- longitude=float(row['lon']),
- latitude=float(row['lat']),
- sampling_location=str(row['sampling_location']),
- sample_time=row['sample_time'], # 作为datetime对象存储
- weather=str(row['weather']),
- container_material=str(row['storage_container_material']),
- container_color=str(row['storage_container_color']),
- container_capacity=int(row['storage_container_capacity']),
- sampling_volume=float(row['sampling_volume']),
- sample_description=str(row['sample_description']),
- water_quality=str(row['water_quality']),
- water_environment=str(row['water_environment']),
- storage_method=str(row['storage_method']),
- cr_concentration=float(row['cr']),
- as_concentration=float(row['as']),
- cd_concentration=float(row['cd']),
- hg_concentration=float(row['hg']),
- pb_concentration=float(row['pb']),
- ph_value=float(row['ph'])
- )
- batch_objects.append(water_sample)
- except Exception as e:
- logger.warning(f"跳过行 {i + _}: {str(e)}")
- continue
- if batch_objects:
- # 批量插入
- db.add_all(batch_objects)
- db.commit()
- imported_count += len(batch_objects)
- logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
- logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
- # 验证导入结果
- final_count = db.query(WaterSampleData).count()
- logger.info(f"导入后数据库总数据: {final_count} 条")
- except Exception as e:
- db.rollback()
- logger.error(f"数据导入失败,已回滚: {str(e)}")
- raise
- finally:
- db.close()
- except Exception as e:
- logger.error(f"数据导入过程失败: {str(e)}")
- raise
- def run_import(self):
- """
- 执行完整的导入流程
- """
- try:
- logger.info("=" * 60)
- logger.info("开始灌溉水采样数据导入流程")
- logger.info("=" * 60)
- # 1. 读取Excel数据
- df = self.read_excel_data()
- # 2. 验证数据
- df = self.validate_data(df)
- # 3. 导入数据
- self.import_data(df)
- logger.info("=" * 60)
- logger.info("灌溉水采样数据导入流程完成!")
- logger.info("=" * 60)
- except Exception as e:
- logger.error(f"导入流程失败: {str(e)}")
- raise
- def main():
- """
- 主函数
- """
- # Excel文件路径
- excel_path = r"D:\destkop\数据库对应数据.xlsx"
- sheet_name = "Water_sample" # 指定对应的sheet名称
- try:
- # 创建导入器并执行导入
- importer = WaterSampleDataImporter(excel_path, sheet_name)
- importer.run_import()
- except Exception as e:
- logger.error(f"程序执行失败: {str(e)}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|