from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Float, DateTime from sqlalchemy import create_engine import uuid from datetime import datetime, timezone Base = declarative_base() def create_dynamic_table(dataset_id, columns): """动态创建数据表""" # 动态构建列 dynamic_columns = { 'id': Column(Integer, primary_key=True, autoincrement=True) # 为每个表添加一个主键 } # 根据 columns 字典动态创建字段 for col_name, col_type in columns.items(): if col_type == 'str': dynamic_columns[col_name] = Column(String(255)) elif col_type == 'int': dynamic_columns[col_name] = Column(Integer) elif col_type == 'float': dynamic_columns[col_name] = Column(Float) elif col_type == 'datetime': dynamic_columns[col_name] = Column(DateTime) # 动态生成模型类,表名使用 dataset_{dataset_id} table_name = f"dataset_{dataset_id}" # 在生成的类中添加 `__tablename__` dynamic_columns['__tablename__'] = table_name # 动态创建类 dynamic_class = type(table_name, (Base,), dynamic_columns) # 打印调试信息 print("table_name:", table_name) print("dynamic_columns:", dynamic_columns) # 创建数据库引擎 engine = create_engine('sqlite:///SoilAcidification.db') # 这里需要替换为你的数据库引擎 Base.metadata.create_all(engine) # 创建所有表格 return dynamic_class # 判断文件类型是否允许 def allowed_file(filename): ALLOWED_EXTENSIONS = {'xlsx', 'xls'} return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS # 生成唯一文件名 def generate_unique_filename(filename): # 获取文件的扩展名 ext = filename.rsplit('.', 1)[1].lower() # 使用 UUID 和当前时间戳生成唯一文件名(使用 UTC 时区) unique_filename = f"{uuid.uuid4().hex}_{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}.{ext}" return unique_filename