import_water_sample.py 12 KB


  1. """
  2. Water_sample数据导入脚本
  3. @description: 从Excel文件读取灌溉水采样数据并导入到water_sampling_data表
  4. """
  5. import os
  6. import sys
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. from sqlalchemy.orm import sessionmaker
  11. import re
  12. # 添加项目根目录到Python路径
  13. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  14. from app.database import engine, SessionLocal
  15. from app.models.water_sample import WaterSampleData # 需创建对应的ORM模型
  16. # 设置日志
  17. logging.basicConfig(
  18. level=logging.INFO,
  19. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  20. )
  21. logger = logging.getLogger(__name__)
  22. class WaterSampleDataImporter:
  23. """
  24. 灌溉水采样数据导入器
  25. @description: 从Excel文件读取灌溉水采样数据并导入到数据库
  26. """
  27. def __init__(self, excel_path, sheet_name='Water_sample'):
  28. """
  29. 初始化导入器
  30. @param {str} excel_path - Excel文件路径
  31. @param {str} sheet_name - Sheet名称,默认为'Water_sample'
  32. """
  33. self.excel_path = excel_path
  34. self.sheet_name = sheet_name
  35. # 定义必需字段列表
  36. self.required_columns = [
  37. 'sample_code', 'lon', 'lat', 'sampling_location',
  38. 'sample_time', 'Cr', 'As', 'Cd', 'Hg', 'Pb', 'pH'
  39. ]
  40. # 数值型字段列表
  41. self.numeric_columns = [
  42. 'lon', 'lat', 'sampling_volume', 'Cr', 'As', 'Cd', 'Hg', 'Pb', 'pH'
  43. ]
  44. def read_excel_data(self):
  45. """
  46. 读取Excel文件数据
  47. @returns: DataFrame 读取的数据
  48. """
  49. try:
  50. logger.info(f"开始读取Excel文件: {self.excel_path}")
  51. logger.info(f"Sheet名称: {self.sheet_name}")
  52. # 检查文件是否存在
  53. if not os.path.exists(self.excel_path):
  54. raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
  55. # 读取Excel文件
  56. df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
  57. logger.info(f"成功读取数据,共 {len(df)} 行")
  58. logger.info(f"数据列: {list(df.columns)}")
  59. # 显示前几行数据供确认
  60. logger.info("前5行数据预览:")
  61. logger.info(df.head().to_string())
  62. return df
  63. except Exception as e:
  64. logger.error(f"读取Excel文件失败: {str(e)}")
  65. raise
  66. def clean_sample_time(self, time_str):
  67. """
  68. 清理和标准化样本时间格式
  69. @param {str} time_str - 时间字符串
  70. @returns: datetime 标准化的时间对象
  71. """
  72. try:
  73. # 尝试转换常见格式
  74. if isinstance(time_str, str):
  75. # 处理可能的格式:2024.5.20 16:37
  76. if '.' in time_str and ':' in time_str:
  77. return datetime.strptime(time_str, "%Y.%m.%d %H:%M")
  78. # 处理其他格式
  79. elif '/' in time_str:
  80. return datetime.strptime(time_str, "%Y/%m/%d %H:%M")
  81. elif '-' in time_str:
  82. return datetime.strptime(time_str, "%Y-%m-%d %H:%M")
  83. return time_str
  84. except Exception as e:
  85. logger.warning(f"无法解析时间字符串: {time_str}, 错误: {str(e)}")
  86. return None
  87. def validate_data(self, df):
  88. """
  89. 验证数据格式和完整性
  90. @param {DataFrame} df - 要验证的数据
  91. @returns: DataFrame 验证后的数据
  92. """
  93. try:
  94. logger.info("开始验证数据...")
  95. # 检查必需的列是否存在
  96. missing_columns = [col for col in self.required_columns if col not in df.columns]
  97. if missing_columns:
  98. raise ValueError(f"缺少必需的列: {missing_columns}")
  99. # 将列名转换为小写(带下划线)
  100. df.columns = [col.lower().replace(' ', '_') for col in df.columns]
  101. required_columns_lower = [col.lower().replace(' ', '_') for col in self.required_columns]
  102. numeric_columns_lower = [col.lower().replace(' ', '_') for col in self.numeric_columns]
  103. # 检查数据类型
  104. logger.info("检查数据类型...")
  105. # 转换数值类型
  106. for col in numeric_columns_lower:
  107. if col in df.columns:
  108. # 对于数值列,转换为浮点数
  109. df[col] = pd.to_numeric(df[col], errors='coerce')
  110. # 特殊处理时间字段
  111. if 'sample_time' in df.columns:
  112. # 清理和标准化时间格式
  113. df['sample_time'] = df['sample_time'].apply(self.clean_sample_time)
  114. # 处理空值 - 所有字段必须非空(除了文本描述)
  115. empty_columns = df.isnull().any()
  116. empty_cols = [col for col in empty_columns.index if empty_columns[col]]
  117. if empty_cols:
  118. logger.warning(f"发现以下列存在空值: {', '.join(empty_cols)}")
  119. # 对于数值列,如果有空值,填充为0
  120. for col in numeric_columns_lower:
  121. if col in df.columns and df[col].isnull().any():
  122. df[col] = df[col].fillna(0)
  123. logger.info(f"已将 {col} 的空值替换为0")
  124. # 文本列填充空值为'未知'
  125. text_columns = [
  126. 'sample_number', 'weather', 'container_material',
  127. 'container_color', 'sample_description',
  128. 'water_quality', 'water_environment',
  129. 'storage_method'
  130. ]
  131. for col in text_columns:
  132. if col in df.columns and df[col].isnull().any():
  133. logger.warning(f"{col}列存在空值,填充为'未知'")
  134. df[col] = df[col].fillna('未知')
  135. # 验证经纬度范围
  136. longitude_errors = df[(df['lon'] < -180) | (df['lon'] > 180) | (df['lon'].isna())]
  137. latitude_errors = df[(df['lat'] < -90) | (df['lat'] > 90) | (df['lat'].isna())]
  138. if not longitude_errors.empty or not latitude_errors.empty:
  139. logger.warning("发现经纬度无效值或空值,将删除这些行")
  140. # 保留有效经纬度行
  141. valid_mask = ~df['lon'].isna() & ~df['lat'].isna() & (df['lon'].between(-180, 180)) & (df['lat'].between(-90, 90))
  142. invalid_df = df[~valid_mask]
  143. df = df[valid_mask]
  144. logger.warning(f"删除无效经纬度数据 {len(invalid_df)} 行")
  145. if not invalid_df.empty:
  146. logger.warning("部分无效行示例:")
  147. logger.warning(invalid_df[['lon', 'lat', 'sampling_location']].head().to_string())
  148. logger.info(f"数据验证完成,有效数据 {len(df)} 行")
  149. return df
  150. except Exception as e:
  151. logger.error(f"数据验证失败: {str(e)}")
  152. raise
  153. def import_data(self, df):
  154. """
  155. 将数据导入到数据库
  156. @param {DataFrame} df - 要导入的数据
  157. """
  158. try:
  159. logger.info("开始导入数据到数据库...")
  160. # 创建数据库会话
  161. db = SessionLocal()
  162. try:
  163. # 检查是否有重复数据
  164. existing_count = db.query(WaterSampleData).count()
  165. logger.info(f"数据库中现有数据: {existing_count} 条")
  166. # 批量创建对象
  167. batch_size = 500 # 由于字段较多,适当减小批量大小
  168. total_rows = len(df)
  169. imported_count = 0
  170. for i in range(0, total_rows, batch_size):
  171. batch_df = df.iloc[i:i+batch_size]
  172. batch_objects = []
  173. for _, row in batch_df.iterrows():
  174. try:
  175. # 创建WaterSampleData对象
  176. water_sample = WaterSampleData(
  177. sample_code=str(row['sample_code']),
  178. sample_number=str(row['sample_number']),
  179. longitude=float(row['lon']),
  180. latitude=float(row['lat']),
  181. sampling_location=str(row['sampling_location']),
  182. sample_time=row['sample_time'], # 作为datetime对象存储
  183. weather=str(row['weather']),
  184. container_material=str(row['storage_container_material']),
  185. container_color=str(row['storage_container_color']),
  186. container_capacity=int(row['storage_container_capacity']),
  187. sampling_volume=float(row['sampling_volume']),
  188. sample_description=str(row['sample_description']),
  189. water_quality=str(row['water_quality']),
  190. water_environment=str(row['water_environment']),
  191. storage_method=str(row['storage_method']),
  192. cr_concentration=float(row['cr']),
  193. as_concentration=float(row['as']),
  194. cd_concentration=float(row['cd']),
  195. hg_concentration=float(row['hg']),
  196. pb_concentration=float(row['pb']),
  197. ph_value=float(row['ph'])
  198. )
  199. batch_objects.append(water_sample)
  200. except Exception as e:
  201. logger.warning(f"跳过行 {i + _}: {str(e)}")
  202. continue
  203. if batch_objects:
  204. # 批量插入
  205. db.add_all(batch_objects)
  206. db.commit()
  207. imported_count += len(batch_objects)
  208. logger.info(f"已导入 {imported_count}/{total_rows} 条数据")
  209. logger.info(f"数据导入完成! 成功导入 {imported_count} 条数据")
  210. # 验证导入结果
  211. final_count = db.query(WaterSampleData).count()
  212. logger.info(f"导入后数据库总数据: {final_count} 条")
  213. except Exception as e:
  214. db.rollback()
  215. logger.error(f"数据导入失败,已回滚: {str(e)}")
  216. raise
  217. finally:
  218. db.close()
  219. except Exception as e:
  220. logger.error(f"数据导入过程失败: {str(e)}")
  221. raise
  222. def run_import(self):
  223. """
  224. 执行完整的导入流程
  225. """
  226. try:
  227. logger.info("=" * 60)
  228. logger.info("开始灌溉水采样数据导入流程")
  229. logger.info("=" * 60)
  230. # 1. 读取Excel数据
  231. df = self.read_excel_data()
  232. # 2. 验证数据
  233. df = self.validate_data(df)
  234. # 3. 导入数据
  235. self.import_data(df)
  236. logger.info("=" * 60)
  237. logger.info("灌溉水采样数据导入流程完成!")
  238. logger.info("=" * 60)
  239. except Exception as e:
  240. logger.error(f"导入流程失败: {str(e)}")
  241. raise
  242. def main():
  243. """
  244. 主函数
  245. """
  246. # Excel文件路径
  247. excel_path = r"D:\destkop\数据库对应数据.xlsx"
  248. sheet_name = "Water_sample" # 指定对应的sheet名称
  249. try:
  250. # 创建导入器并执行导入
  251. importer = WaterSampleDataImporter(excel_path, sheet_name)
  252. importer.run_import()
  253. except Exception as e:
  254. logger.error(f"程序执行失败: {str(e)}")
  255. sys.exit(1)
  256. if __name__ == "__main__":
  257. main()