123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610 |
- import sqlite3
- from flask import Blueprint, request, jsonify, current_app
- from .model import predict, train_and_save_model
- import pandas as pd
- from . import db # 从 app 包导入 db 实例
- from sqlalchemy.engine.reflection import Inspector
- from .database_models import Models, ModelParameters, Datasets, CurrentReduce, CurrentReflux
- import os
- from .utils import create_dynamic_table, allowed_file
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy.schema import MetaData, Table
- import logging
- from sqlalchemy import text, select
- # 配置日志
- logging.basicConfig(level=logging.DEBUG)
- logger = logging.getLogger(__name__)
- # 创建蓝图 (Blueprint),用于分离路由
- bp = Blueprint('routes', __name__)
- def infer_column_types(df):
- type_map = {
- 'object': 'str',
- 'int64': 'int',
- 'float64': 'float',
- 'datetime64[ns]': 'datetime' # 适应Pandas datetime类型
- }
- # 提取列和其数据类型
- return {col: type_map.get(str(df[col].dtype), 'str') for col in df.columns}
- @bp.route('/upload-dataset', methods=['POST'])
- def upload_dataset():
- try:
- if 'file' not in request.files:
- return jsonify({'error': 'No file part'}), 400
- file = request.files['file']
- if file.filename == '' or not allowed_file(file.filename):
- return jsonify({'error': 'No selected file or invalid file type'}), 400
- dataset_name = request.form.get('dataset_name')
- dataset_description = request.form.get('dataset_description', 'No description provided')
- dataset_type = request.form.get('dataset_type')
- if not dataset_type:
- return jsonify({'error': 'Dataset type is required'}), 400
- # 创建 sessionmaker 实例
- Session = sessionmaker(bind=db.engine)
- session = Session()
- new_dataset = Datasets(
- Dataset_name=dataset_name,
- Dataset_description=dataset_description,
- Row_count=0,
- Status='pending',
- Dataset_type=dataset_type
- )
- session.add(new_dataset)
- session.commit()
- unique_filename = f"dataset_{new_dataset.Dataset_ID}.xlsx"
- upload_folder = current_app.config['UPLOAD_FOLDER']
- file_path = os.path.join(upload_folder, unique_filename)
- file.save(file_path)
- dataset_df = pd.read_excel(file_path)
- new_dataset.Row_count = len(dataset_df)
- new_dataset.Status = 'processed'
- session.commit()
- # 清理列名
- dataset_df = clean_column_names(dataset_df)
- # 重命名 DataFrame 列以匹配模型字段
- dataset_df = rename_columns_for_model(dataset_df, dataset_type)
- column_types = infer_column_types(dataset_df)
- dynamic_table_class = create_dynamic_table(new_dataset.Dataset_ID, column_types)
- insert_data_into_dynamic_table(session, dataset_df, dynamic_table_class)
- # 根据 dataset_type 决定插入到哪个已有表
- if dataset_type == 'reduce':
- insert_data_into_existing_table(session, dataset_df, CurrentReduce)
- elif dataset_type == 'reflux':
- insert_data_into_existing_table(session, dataset_df, CurrentReflux)
- session.commit()
- return jsonify({
- 'message': f'Dataset {dataset_name} uploaded successfully!',
- 'dataset_id': new_dataset.Dataset_ID,
- 'filename': unique_filename
- }), 201
- except Exception as e:
- if session:
- session.rollback()
- logging.error('Failed to process the dataset upload:', exc_info=True)
- return jsonify({'error': str(e)}), 500
- finally:
- session.close()
- @bp.route('/train-and-save-model', methods=['POST'])
- def train_and_save_model_endpoint():
- # 创建 sessionmaker 实例
- Session = sessionmaker(bind=db.engine)
- session = Session()
- # 从请求中解析参数
- data = request.get_json()
- model_type = data.get('model_type')
- model_name = data.get('model_name')
- model_description = data.get('model_description')
- data_type = data.get('data_type')
- dataset_id = data.get('dataset_id', None) # 默认为 None,如果未提供
- try:
- # 调用训练和保存模型的函数
- result = train_and_save_model(session, model_type, model_name, model_description, data_type, dataset_id)
- # 返回成功响应
- return jsonify({'message': 'Model trained and saved successfully', 'result': result}), 200
- except Exception as e:
- session.rollback()
- logging.error('Failed to process the dataset upload:', exc_info=True)
- return jsonify({'error': 'Failed to train and save model', 'message': str(e)}), 500
- finally:
- session.close()
- def clean_column_names(dataframe):
- # Strip whitespace and replace non-breaking spaces and other non-printable characters
- dataframe.columns = [col.strip().replace('\xa0', '') for col in dataframe.columns]
- return dataframe
- def rename_columns_for_model(dataframe, dataset_type):
- if dataset_type == 'reduce':
- rename_map = {
- '1/b': 'Q_over_b',
- 'pH': 'pH',
- 'OM': 'OM',
- 'CL': 'CL',
- 'H': 'H',
- 'Al': 'Al'
- }
- elif dataset_type == 'reflux':
- rename_map = {
- 'OM g/kg': 'OM',
- 'CL g/kg': 'CL',
- 'CEC cmol/kg': 'CEC',
- 'H+ cmol/kg': 'H_plus',
- 'HN mg/kg': 'HN',
- 'Al3+cmol/kg': 'Al3_plus',
- 'Free alumina g/kg': 'Free_alumina',
- 'Free iron oxides g/kg': 'Free_iron_oxides',
- 'ΔpH': 'Delta_pH'
- }
- # 使用 rename() 方法更新列名
- dataframe = dataframe.rename(columns=rename_map)
- return dataframe
- def insert_data_into_existing_table(session, dataframe, model_class):
- """Insert data from a DataFrame into an existing SQLAlchemy model table."""
- for index, row in dataframe.iterrows():
- record = model_class(**row.to_dict())
- session.add(record)
- def insert_data_into_dynamic_table(session, dataset_df, dynamic_table_class):
- for _, row in dataset_df.iterrows():
- record_data = row.to_dict()
- session.execute(dynamic_table_class.__table__.insert(), [record_data])
- def insert_data_by_type(session, dataset_df, dataset_type):
- if dataset_type == 'reduce':
- for _, row in dataset_df.iterrows():
- record = CurrentReduce(**row.to_dict())
- session.add(record)
- elif dataset_type == 'reflux':
- for _, row in dataset_df.iterrows():
- record = CurrentReflux(**row.to_dict())
- session.add(record)
- def get_current_data(session, data_type):
- # 根据数据类型选择相应的表模型
- if data_type == 'reduce':
- model = CurrentReduce
- elif data_type == 'reflux':
- model = CurrentReflux
- else:
- raise ValueError("Invalid data type provided. Choose 'reduce' or 'reflux'.")
- # 从数据库中查询所有记录
- result = session.execute(select(model))
- # 将结果转换为DataFrame
- dataframe = pd.DataFrame([dict(row) for row in result])
- return dataframe
- def get_dataset_by_id(session, dataset_id):
- # 动态获取表的元数据
- metadata = MetaData(bind=session.bind)
- dataset_table = Table(dataset_id, metadata, autoload=True, autoload_with=session.bind)
- # 从数据库中查询整个表的数据
- query = select(dataset_table)
- result = session.execute(query).fetchall()
- # 检查是否有数据返回
- if not result:
- raise ValueError(f"No data found for dataset {dataset_id}.")
- # 将结果转换为DataFrame
- dataframe = pd.DataFrame(result, columns=[column.name for column in dataset_table.columns])
- return dataframe
- @bp.route('/delete-dataset/<int:dataset_id>', methods=['DELETE'])
- def delete_dataset(dataset_id):
- # 创建 sessionmaker 实例
- Session = sessionmaker(bind=db.engine)
- session = Session()
- try:
- # 查询数据集
- dataset = session.query(Datasets).filter_by(Dataset_ID=dataset_id).first()
- if not dataset:
- return jsonify({'error': 'Dataset not found'}), 404
- # 删除文件
- filename = f"dataset_{dataset.Dataset_ID}.xlsx"
- file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
- if os.path.exists(file_path):
- os.remove(file_path)
- # 删除数据表
- table_name = f"dataset_{dataset.Dataset_ID}"
- session.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
- # 删除数据集记录
- session.delete(dataset)
- session.commit()
- return jsonify({'message': 'Dataset deleted successfully'}), 200
- except Exception as e:
- session.rollback()
- logging.error(f'Failed to delete dataset {dataset_id}:', exc_info=True)
- return jsonify({'error': str(e)}), 500
- finally:
- session.close()
- @bp.route('/tables', methods=['GET'])
- def list_tables():
- engine = db.engine # 使用 db 实例的 engine
- inspector = Inspector.from_engine(engine) # 创建 Inspector 对象
- table_names = inspector.get_table_names() # 获取所有表名
- return jsonify(table_names) # 以 JSON 形式返回表名列表
- @bp.route('/models/<int:model_id>', methods=['GET'])
- def get_model(model_id):
- try:
- model = Models.query.filter_by(ModelID=model_id).first()
- if model:
- return jsonify({
- 'ModelID': model.ModelID,
- 'ModelName': model.ModelName,
- 'ModelType': model.ModelType,
- 'CreatedAt': model.CreatedAt.strftime('%Y-%m-%d %H:%M:%S'),
- 'Description': model.Description
- })
- else:
- return jsonify({'message': 'Model not found'}), 404
- except Exception as e:
- return jsonify({'error': 'Internal server error', 'message': str(e)}), 500
- @bp.route('/models', methods=['GET'])
- def get_all_models():
- try:
- models = Models.query.all() # 获取所有模型数据
- if models:
- result = [
- {
- 'ModelID': model.ModelID,
- 'ModelName': model.ModelName,
- 'ModelType': model.ModelType,
- 'CreatedAt': model.CreatedAt.strftime('%Y-%m-%d %H:%M:%S'),
- 'Description': model.Description
- }
- for model in models
- ]
- return jsonify(result)
- else:
- return jsonify({'message': 'No models found'}), 404
- except Exception as e:
- return jsonify({'error': 'Internal server error', 'message': str(e)}), 500
- @bp.route('/model-parameters', methods=['GET'])
- def get_all_model_parameters():
- try:
- parameters = ModelParameters.query.all() # 获取所有参数数据
- if parameters:
- result = [
- {
- 'ParamID': param.ParamID,
- 'ModelID': param.ModelID,
- 'ParamName': param.ParamName,
- 'ParamValue': param.ParamValue
- }
- for param in parameters
- ]
- return jsonify(result)
- else:
- return jsonify({'message': 'No parameters found'}), 404
- except Exception as e:
- return jsonify({'error': 'Internal server error', 'message': str(e)}), 500
- @bp.route('/models/<int:model_id>/parameters', methods=['GET'])
- def get_model_parameters(model_id):
- try:
- model = Models.query.filter_by(ModelID=model_id).first()
- if model:
- # 获取该模型的所有参数
- parameters = [
- {
- 'ParamID': param.ParamID,
- 'ParamName': param.ParamName,
- 'ParamValue': param.ParamValue
- }
- for param in model.parameters
- ]
-
- # 返回模型参数信息
- return jsonify({
- 'ModelID': model.ModelID,
- 'ModelName': model.ModelName,
- 'ModelType': model.ModelType,
- 'CreatedAt': model.CreatedAt.strftime('%Y-%m-%d %H:%M:%S'),
- 'Description': model.Description,
- 'Parameters': parameters
- })
- else:
- return jsonify({'message': 'Model not found'}), 404
- except Exception as e:
- return jsonify({'error': 'Internal server error', 'message': str(e)}), 500
- @bp.route('/predict', methods=['POST'])
- def predict_route():
- try:
- data = request.get_json()
- model_name = data.get('model_name') # 提取模型名称
- parameters = data.get('parameters', {}) # 提取所有参数
- input_data = pd.DataFrame([parameters]) # 转换参数为DataFrame
- predictions = predict(input_data, model_name) # 调用预测函数
- return jsonify({'predictions': predictions}), 200
- except Exception as e:
- return jsonify({'error': str(e)}), 400
- # 定义添加数据库记录的 API 接口
- @bp.route('/add_item', methods=['POST'])
- def add_item():
- """
- 接收 JSON 格式的请求体,包含表名和要插入的数据。
- 尝试将数据插入到指定的表中。
- :return:
- """
- try:
- # 确保请求体是JSON格式
- data = request.get_json()
- if not data:
- raise ValueError("No JSON data provided")
- table_name = data.get('table')
- item_data = data.get('item')
- if not table_name or not item_data:
- return jsonify({'error': 'Missing table name or item data'}), 400
- cur = db.cursor()
- # 动态构建 SQL 语句
- columns = ', '.join(item_data.keys())
- placeholders = ', '.join(['?'] * len(item_data))
- sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
- cur.execute(sql, tuple(item_data.values()))
- db.commit()
- # 返回更详细的成功响应
- return jsonify({'success': True, 'message': 'Item added successfully'}), 201
- except ValueError as e:
- return jsonify({'error': str(e)}), 400
- except KeyError as e:
- return jsonify({'error': f'Missing data field: {e}'}), 400
- except sqlite3.IntegrityError as e:
- # 处理例如唯一性约束违反等数据库完整性错误
- return jsonify({'error': 'Database integrity error', 'details': str(e)}), 409
- except sqlite3.Error as e:
- # 处理其他数据库错误
- return jsonify({'error': 'Database error', 'details': str(e)}), 500
- finally:
- db.close()
- # 定义删除数据库记录的 API 接口
- @bp.route('/delete_item', methods=['POST'])
- def delete_item():
- data = request.get_json()
- table_name = data.get('table')
- condition = data.get('condition')
- # 检查表名和条件是否提供
- if not table_name or not condition:
- return jsonify({
- "success": False,
- "message": "缺少表名或条件参数"
- }), 400
- # 尝试从条件字符串中分离键和值
- try:
- key, value = condition.split('=')
- except ValueError:
- return jsonify({
- "success": False,
- "message": "条件格式错误,应为 'key=value'"
- }), 400
- cur = db.cursor()
- try:
- # 执行删除操作
- cur.execute(f"DELETE FROM {table_name} WHERE {key} = ?", (value,))
- db.commit()
- # 如果没有错误发生,返回成功响应
- return jsonify({
- "success": True,
- "message": "记录删除成功"
- }), 200
- except sqlite3.Error as e:
- # 发生错误,回滚事务
- db.rollback()
- # 返回失败响应,并包含错误信息
- return jsonify({
- "success": False,
- "message": f"删除失败: {e}"
- }), 400
- # 定义修改数据库记录的 API 接口
- @bp.route('/update_item', methods=['PUT'])
- def update_record():
- data = request.get_json()
- # 检查必要的数据是否提供
- if not data or 'table' not in data or 'item' not in data:
- return jsonify({
- "success": False,
- "message": "请求数据不完整"
- }), 400
- table_name = data['table']
- item = data['item']
- # 假设 item 的第一个元素是 ID
- if not item or next(iter(item.keys())) is None:
- return jsonify({
- "success": False,
- "message": "记录数据为空"
- }), 400
- # 获取 ID 和其他字段值
- id_key = next(iter(item.keys()))
- record_id = item[id_key]
- updates = {key: value for key, value in item.items() if key != id_key} # 排除 ID
- cur = db.cursor()
- try:
- record_id = int(record_id) # 确保 ID 是整数
- except ValueError:
- return jsonify({
- "success": False,
- "message": "ID 必须是整数"
- }), 400
- # 准备参数列表,包括更新的值和 ID
- parameters = list(updates.values()) + [record_id]
- # 执行更新操作
- set_clause = ','.join([f"{k} = ?" for k in updates.keys()])
- sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_key} = ?"
- try:
- cur.execute(sql, parameters)
- db.commit()
- if cur.rowcount == 0:
- return jsonify({
- "success": False,
- "message": "未找到要更新的记录"
- }), 404
- return jsonify({
- "success": True,
- "message": "数据更新成功"
- }), 200
- except sqlite3.Error as e:
- db.rollback()
- return jsonify({
- "success": False,
- "message": f"更新失败: {e}"
- }), 400
- # 定义查询数据库记录的 API 接口
- @bp.route('/search/record', methods=['GET'])
- def sql_search():
- """
- 接收 JSON 格式的请求体,包含表名和要查询的 ID。
- 尝试查询指定 ID 的记录并返回结果。
- :return:
- """
- try:
- data = request.get_json()
- # 表名
- sql_table = data['table']
- # 要搜索的 ID
- Id = data['id']
- # 连接到数据库
- cur = db.cursor()
- # 构造查询语句
- sql = f"SELECT * FROM {sql_table} WHERE id = ?"
- # 执行查询
- cur.execute(sql, (Id,))
- # 获取查询结果
- rows = cur.fetchall()
- column_names = [desc[0] for desc in cur.description]
- # 检查是否有结果
- if not rows:
- return jsonify({'error': '未查找到对应数据。'}), 400
- # 构造响应数据
- results = []
- for row in rows:
- result = {column_names[i]: row[i] for i in range(len(row))}
- results.append(result)
- # 关闭游标和数据库连接
- cur.close()
- db.close()
- # 返回 JSON 响应
- return jsonify(results), 200
- except sqlite3.Error as e:
- # 如果发生数据库错误,返回错误信息
- return jsonify({'error': str(e)}), 400
- except KeyError as e:
- # 如果请求数据中缺少必要的键,返回错误信息
- return jsonify({'error': f'缺少必要的数据字段: {e}'}), 400
- # 定义提供数据库列表,用于展示表格的 API 接口
- @bp.route('/table', methods=['POST'])
- def get_table():
- data = request.get_json()
- table_name = data.get('table')
- if not table_name:
- return jsonify({'error': '需要表名'}), 400
- try:
- # 创建 sessionmaker 实例
- Session = sessionmaker(bind=db.engine)
- session = Session()
- # 动态获取表的元数据
- metadata = MetaData()
- table = Table(table_name, metadata, autoload_with=db.engine)
- # 从数据库中查询所有记录
- query = select(table)
- result = session.execute(query).fetchall()
- # 将结果转换为列表字典形式
- rows = [dict(zip([column.name for column in table.columns], row)) for row in result]
- # 获取列名
- headers = [column.name for column in table.columns]
- return jsonify(rows=rows, headers=headers), 200
- except Exception as e:
- return jsonify({'error': str(e)}), 400
- finally:
- # 关闭 session
- session.close()
|