123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, Float, DateTime, select, create_engine
- import uuid
- from datetime import datetime, timezone
- import pandas as pd
- from .database_models import CurrentReduce, CurrentReflux
- from sqlalchemy.schema import MetaData, Table
- Base = declarative_base()
- def create_dynamic_table(dataset_id, columns):
- """动态创建数据表"""
- # 动态构建列
- dynamic_columns = {
- 'id': Column(Integer, primary_key=True, autoincrement=True) # 为每个表添加一个主键
- }
- # 根据 columns 字典动态创建字段
- for col_name, col_type in columns.items():
- if col_type == 'str':
- dynamic_columns[col_name] = Column(String(255))
- elif col_type == 'int':
- dynamic_columns[col_name] = Column(Integer)
- elif col_type == 'float':
- dynamic_columns[col_name] = Column(Float)
- elif col_type == 'datetime':
- dynamic_columns[col_name] = Column(DateTime)
- # 动态生成模型类,表名使用 dataset_{dataset_id}
- table_name = f"dataset_{dataset_id}"
- # 在生成的类中添加 `__tablename__`
- dynamic_columns['__tablename__'] = table_name
- # 动态创建类
- dynamic_class = type(table_name, (Base,), dynamic_columns)
- # 打印调试信息
- print("table_name:", table_name)
- print("dynamic_columns:", dynamic_columns)
- # 创建数据库引擎
- engine = create_engine('sqlite:///SoilAcidification.db') # 这里需要替换为你的数据库引擎
- Base.metadata.create_all(engine) # 创建所有表格
- return dynamic_class
- # 判断文件类型是否允许
- def allowed_file(filename):
- ALLOWED_EXTENSIONS = {'xlsx', 'xls'}
- return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
- # 生成唯一文件名
- def generate_unique_filename(filename):
- # 获取文件的扩展名
- ext = filename.rsplit('.', 1)[1].lower()
- # 使用 UUID 和当前时间戳生成唯一文件名(使用 UTC 时区)
- unique_filename = f"{uuid.uuid4().hex}_{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}.{ext}"
- return unique_filename
- def infer_column_types(df):
- type_map = {
- 'object': 'str',
- 'int64': 'int',
- 'float64': 'float',
- 'datetime64[ns]': 'datetime' # 适应Pandas datetime类型
- }
- # 提取列和其数据类型
- return {col: type_map.get(str(df[col].dtype), 'str') for col in df.columns}
- def clean_column_names(dataframe):
- # Strip whitespace and replace non-breaking spaces and other non-printable characters
- dataframe.columns = [col.strip().replace('\xa0', '') for col in dataframe.columns]
- return dataframe
- # 建立excel文件的列名和数据库模型字段之间的映射
- def rename_columns_for_model(dataframe, dataset_type):
- if dataset_type == 'reduce':
- rename_map = {
- '1/b': 'Q_over_b',
- 'pH': 'pH',
- 'OM': 'OM',
- 'CL': 'CL',
- 'H': 'H',
- 'Al': 'Al'
- }
- elif dataset_type == 'reflux':
- rename_map = {
- 'OM': 'OM',
- 'CL': 'CL',
- 'CEC': 'CEC',
- 'H+': 'H_plus',
- 'N': 'N',
- 'Al3+': 'Al3_plus',
- 'ΔpH': 'Delta_pH'
- }
- # 使用 rename() 方法更新列名
- dataframe = dataframe.rename(columns=rename_map)
- return dataframe
- # 建立前端参数和模型预测字段之间的映射
- def rename_columns_for_model_predict(dataframe, dataset_type):
- if dataset_type == 'reduce':
- rename_map = {
- 'init_pH': 'pH',
- 'OM': 'OM',
- 'CL': 'CL',
- 'H': 'H',
- 'Al': 'Al'
- }
- elif dataset_type == 'reflux':
- rename_map = {
- "OM": "OM",
- "CL": "CL",
- "CEC": "CEC",
- "H+": "H_plus",
- "N": "N",
- "Al3+": "Al3_plus"
- }
- # 使用 rename() 方法更新列名
- dataframe = dataframe.rename(columns=rename_map)
- return dataframe
- def insert_data_into_existing_table(session, dataframe, model_class):
- """Insert data from a DataFrame into an existing SQLAlchemy model table."""
- for index, row in dataframe.iterrows():
- record = model_class(**row.to_dict())
- session.add(record)
- def insert_data_into_dynamic_table(session, dataset_df, dynamic_table_class):
- for _, row in dataset_df.iterrows():
- record_data = row.to_dict()
- session.execute(dynamic_table_class.__table__.insert(), [record_data])
- def insert_data_by_type(session, dataset_df, dataset_type):
- if dataset_type == 'reduce':
- for _, row in dataset_df.iterrows():
- record = CurrentReduce(**row.to_dict())
- session.add(record)
- elif dataset_type == 'reflux':
- for _, row in dataset_df.iterrows():
- record = CurrentReflux(**row.to_dict())
- session.add(record)
- def get_current_data(session, data_type):
- # 根据数据类型选择相应的表模型
- if data_type == 'reduce':
- model = CurrentReduce
- elif data_type == 'reflux':
- model = CurrentReflux
- else:
- raise ValueError("Invalid data type provided. Choose 'reduce' or 'reflux'.")
- # 从数据库中查询所有记录
- result = session.execute(select(model))
- # 将结果转换为DataFrame
- dataframe = pd.DataFrame([dict(row) for row in result])
- return dataframe
- def get_dataset_by_id(session, dataset_id):
- # 动态获取表的元数据
- metadata = MetaData(bind=session.bind)
- dataset_table = Table(dataset_id, metadata, autoload=True, autoload_with=session.bind)
- # 从数据库中查询整个表的数据
- query = select(dataset_table)
- result = session.execute(query).fetchall()
- # 检查是否有数据返回
- if not result:
- raise ValueError(f"No data found for dataset {dataset_id}.")
- # 将结果转换为DataFrame
- dataframe = pd.DataFrame(result, columns=[column.name for column in dataset_table.columns])
- return dataframe
- def predict_to_Q(predictions, init_ph, target_ph):
- # 将预测结果转换为Q
- Q = predictions * (target_ph - init_ph)
- return Q
- # 说明:Q指生石灰投加量,单位是%,例如1%代表100g土壤中施加1g生石灰。
- # 其中,土壤是指表层20cm土壤。# 如果Q的单位换算为吨/公顷,即t/ha,则需要乘以25。
- # ΔpH=目标pH-初始pH
- def Q_to_t_ha(Q):
- return Q * 25
|