import sqlite3 from flask import Blueprint, request, jsonify, current_app from .model import predict, train_and_save_model import pandas as pd from . import db # 从 app 包导入 db 实例 from sqlalchemy.engine.reflection import Inspector from .database_models import Models, ModelParameters, Datasets, CurrentReduce, CurrentReflux import os from .utils import create_dynamic_table, allowed_file from sqlalchemy.orm import sessionmaker from sqlalchemy.schema import MetaData, Table import logging from sqlalchemy import text, select # 配置日志 logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # 创建蓝图 (Blueprint),用于分离路由 bp = Blueprint('routes', __name__) def infer_column_types(df): type_map = { 'object': 'str', 'int64': 'int', 'float64': 'float', 'datetime64[ns]': 'datetime' # 适应Pandas datetime类型 } # 提取列和其数据类型 return {col: type_map.get(str(df[col].dtype), 'str') for col in df.columns} @bp.route('/upload-dataset', methods=['POST']) def upload_dataset(): try: if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '' or not allowed_file(file.filename): return jsonify({'error': 'No selected file or invalid file type'}), 400 dataset_name = request.form.get('dataset_name') dataset_description = request.form.get('dataset_description', 'No description provided') dataset_type = request.form.get('dataset_type') if not dataset_type: return jsonify({'error': 'Dataset type is required'}), 400 # 创建 sessionmaker 实例 Session = sessionmaker(bind=db.engine) session = Session() new_dataset = Datasets( Dataset_name=dataset_name, Dataset_description=dataset_description, Row_count=0, Status='pending', Dataset_type=dataset_type ) session.add(new_dataset) session.commit() unique_filename = f"dataset_{new_dataset.Dataset_ID}.xlsx" upload_folder = current_app.config['UPLOAD_FOLDER'] file_path = os.path.join(upload_folder, unique_filename) file.save(file_path) dataset_df = pd.read_excel(file_path) new_dataset.Row_count = len(dataset_df) new_dataset.Status = 'processed' session.commit() # 清理列名 dataset_df = clean_column_names(dataset_df) # 重命名 DataFrame 列以匹配模型字段 dataset_df = rename_columns_for_model(dataset_df, dataset_type) column_types = infer_column_types(dataset_df) dynamic_table_class = create_dynamic_table(new_dataset.Dataset_ID, column_types) insert_data_into_dynamic_table(session, dataset_df, dynamic_table_class) # 根据 dataset_type 决定插入到哪个已有表 if dataset_type == 'reduce': insert_data_into_existing_table(session, dataset_df, CurrentReduce) elif dataset_type == 'reflux': insert_data_into_existing_table(session, dataset_df, CurrentReflux) session.commit() return jsonify({ 'message': f'Dataset {dataset_name} uploaded successfully!', 'dataset_id': new_dataset.Dataset_ID, 'filename': unique_filename }), 201 except Exception as e: if session: session.rollback() logging.error('Failed to process the dataset upload:', exc_info=True) return jsonify({'error': str(e)}), 500 finally: session.close() @bp.route('/train-and-save-model', methods=['POST']) def train_and_save_model_endpoint(): # 创建 sessionmaker 实例 Session = sessionmaker(bind=db.engine) session = Session() # 从请求中解析参数 data = request.get_json() model_type = data.get('model_type') model_name = data.get('model_name') model_description = data.get('model_description') data_type = data.get('data_type') dataset_id = data.get('dataset_id', None) # 默认为 None,如果未提供 try: # 调用训练和保存模型的函数 result = train_and_save_model(session, model_type, model_name, model_description, data_type, dataset_id) # 返回成功响应 return jsonify({'message': 'Model trained and saved successfully', 'result': result}), 200 except Exception as e: session.rollback() logging.error('Failed to process the dataset upload:', exc_info=True) return jsonify({'error': 'Failed to train and save model', 'message': str(e)}), 500 finally: session.close() def clean_column_names(dataframe): # Strip whitespace and replace non-breaking spaces and other non-printable characters dataframe.columns = [col.strip().replace('\xa0', '') for col in dataframe.columns] return dataframe def rename_columns_for_model(dataframe, dataset_type): if dataset_type == 'reduce': rename_map = { '1/b': 'Q_over_b', 'pH': 'pH', 'OM': 'OM', 'CL': 'CL', 'H': 'H', 'Al': 'Al' } elif dataset_type == 'reflux': rename_map = { 'OM g/kg': 'OM', 'CL g/kg': 'CL', 'CEC cmol/kg': 'CEC', 'H+ cmol/kg': 'H_plus', 'HN mg/kg': 'HN', 'Al3+cmol/kg': 'Al3_plus', 'Free alumina g/kg': 'Free_alumina', 'Free iron oxides g/kg': 'Free_iron_oxides', 'ΔpH': 'Delta_pH' } # 使用 rename() 方法更新列名 dataframe = dataframe.rename(columns=rename_map) return dataframe def insert_data_into_existing_table(session, dataframe, model_class): """Insert data from a DataFrame into an existing SQLAlchemy model table.""" for index, row in dataframe.iterrows(): record = model_class(**row.to_dict()) session.add(record) def insert_data_into_dynamic_table(session, dataset_df, dynamic_table_class): for _, row in dataset_df.iterrows(): record_data = row.to_dict() session.execute(dynamic_table_class.__table__.insert(), [record_data]) def insert_data_by_type(session, dataset_df, dataset_type): if dataset_type == 'reduce': for _, row in dataset_df.iterrows(): record = CurrentReduce(**row.to_dict()) session.add(record) elif dataset_type == 'reflux': for _, row in dataset_df.iterrows(): record = CurrentReflux(**row.to_dict()) session.add(record) def get_current_data(session, data_type): # 根据数据类型选择相应的表模型 if data_type == 'reduce': model = CurrentReduce elif data_type == 'reflux': model = CurrentReflux else: raise ValueError("Invalid data type provided. Choose 'reduce' or 'reflux'.") # 从数据库中查询所有记录 result = session.execute(select(model)) # 将结果转换为DataFrame dataframe = pd.DataFrame([dict(row) for row in result]) return dataframe def get_dataset_by_id(session, dataset_id): # 动态获取表的元数据 metadata = MetaData(bind=session.bind) dataset_table = Table(dataset_id, metadata, autoload=True, autoload_with=session.bind) # 从数据库中查询整个表的数据 query = select(dataset_table) result = session.execute(query).fetchall() # 检查是否有数据返回 if not result: raise ValueError(f"No data found for dataset {dataset_id}.") # 将结果转换为DataFrame dataframe = pd.DataFrame(result, columns=[column.name for column in dataset_table.columns]) return dataframe @bp.route('/delete-dataset/', methods=['DELETE']) def delete_dataset(dataset_id): # 创建 sessionmaker 实例 Session = sessionmaker(bind=db.engine) session = Session() try: # 查询数据集 dataset = session.query(Datasets).filter_by(Dataset_ID=dataset_id).first() if not dataset: return jsonify({'error': 'Dataset not found'}), 404 # 删除文件 filename = f"dataset_{dataset.Dataset_ID}.xlsx" file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename) if os.path.exists(file_path): os.remove(file_path) # 删除数据表 table_name = f"dataset_{dataset.Dataset_ID}" session.execute(text(f"DROP TABLE IF EXISTS {table_name}")) # 删除数据集记录 session.delete(dataset) session.commit() return jsonify({'message': 'Dataset deleted successfully'}), 200 except Exception as e: session.rollback() logging.error(f'Failed to delete dataset {dataset_id}:', exc_info=True) return jsonify({'error': str(e)}), 500 finally: session.close() @bp.route('/tables', methods=['GET']) def list_tables(): engine = db.engine # 使用 db 实例的 engine inspector = Inspector.from_engine(engine) # 创建 Inspector 对象 table_names = inspector.get_table_names() # 获取所有表名 return jsonify(table_names) # 以 JSON 形式返回表名列表 @bp.route('/models/', methods=['GET']) def get_model(model_id): try: model = Models.query.filter_by(ModelID=model_id).first() if model: return jsonify({ 'ModelID': model.ModelID, 'ModelName': model.ModelName, 'ModelType': model.ModelType, 'CreatedAt': model.CreatedAt.strftime('%Y-%m-%d %H:%M:%S'), 'Description': model.Description }) else: return jsonify({'message': 'Model not found'}), 404 except Exception as e: return jsonify({'error': 'Internal server error', 'message': str(e)}), 500 @bp.route('/models', methods=['GET']) def get_all_models(): try: models = Models.query.all() # 获取所有模型数据 if models: result = [ { 'ModelID': model.ModelID, 'ModelName': model.ModelName, 'ModelType': model.ModelType, 'CreatedAt': model.CreatedAt.strftime('%Y-%m-%d %H:%M:%S'), 'Description': model.Description } for model in models ] return jsonify(result) else: return jsonify({'message': 'No models found'}), 404 except Exception as e: return jsonify({'error': 'Internal server error', 'message': str(e)}), 500 @bp.route('/model-parameters', methods=['GET']) def get_all_model_parameters(): try: parameters = ModelParameters.query.all() # 获取所有参数数据 if parameters: result = [ { 'ParamID': param.ParamID, 'ModelID': param.ModelID, 'ParamName': param.ParamName, 'ParamValue': param.ParamValue } for param in parameters ] return jsonify(result) else: return jsonify({'message': 'No parameters found'}), 404 except Exception as e: return jsonify({'error': 'Internal server error', 'message': str(e)}), 500 @bp.route('/models//parameters', methods=['GET']) def get_model_parameters(model_id): try: model = Models.query.filter_by(ModelID=model_id).first() if model: # 获取该模型的所有参数 parameters = [ { 'ParamID': param.ParamID, 'ParamName': param.ParamName, 'ParamValue': param.ParamValue } for param in model.parameters ] # 返回模型参数信息 return jsonify({ 'ModelID': model.ModelID, 'ModelName': model.ModelName, 'ModelType': model.ModelType, 'CreatedAt': model.CreatedAt.strftime('%Y-%m-%d %H:%M:%S'), 'Description': model.Description, 'Parameters': parameters }) else: return jsonify({'message': 'Model not found'}), 404 except Exception as e: return jsonify({'error': 'Internal server error', 'message': str(e)}), 500 @bp.route('/predict', methods=['POST']) def predict_route(): try: data = request.get_json() model_name = data.get('model_name') # 提取模型名称 parameters = data.get('parameters', {}) # 提取所有参数 input_data = pd.DataFrame([parameters]) # 转换参数为DataFrame predictions = predict(input_data, model_name) # 调用预测函数 return jsonify({'predictions': predictions}), 200 except Exception as e: return jsonify({'error': str(e)}), 400 # 定义添加数据库记录的 API 接口 @bp.route('/add_item', methods=['POST']) def add_item(): """ 接收 JSON 格式的请求体,包含表名和要插入的数据。 尝试将数据插入到指定的表中。 :return: """ try: # 确保请求体是JSON格式 data = request.get_json() if not data: raise ValueError("No JSON data provided") table_name = data.get('table') item_data = data.get('item') if not table_name or not item_data: return jsonify({'error': 'Missing table name or item data'}), 400 cur = db.cursor() # 动态构建 SQL 语句 columns = ', '.join(item_data.keys()) placeholders = ', '.join(['?'] * len(item_data)) sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" cur.execute(sql, tuple(item_data.values())) db.commit() # 返回更详细的成功响应 return jsonify({'success': True, 'message': 'Item added successfully'}), 201 except ValueError as e: return jsonify({'error': str(e)}), 400 except KeyError as e: return jsonify({'error': f'Missing data field: {e}'}), 400 except sqlite3.IntegrityError as e: # 处理例如唯一性约束违反等数据库完整性错误 return jsonify({'error': 'Database integrity error', 'details': str(e)}), 409 except sqlite3.Error as e: # 处理其他数据库错误 return jsonify({'error': 'Database error', 'details': str(e)}), 500 finally: db.close() # 定义删除数据库记录的 API 接口 @bp.route('/delete_item', methods=['POST']) def delete_item(): data = request.get_json() table_name = data.get('table') condition = data.get('condition') # 检查表名和条件是否提供 if not table_name or not condition: return jsonify({ "success": False, "message": "缺少表名或条件参数" }), 400 # 尝试从条件字符串中分离键和值 try: key, value = condition.split('=') except ValueError: return jsonify({ "success": False, "message": "条件格式错误,应为 'key=value'" }), 400 cur = db.cursor() try: # 执行删除操作 cur.execute(f"DELETE FROM {table_name} WHERE {key} = ?", (value,)) db.commit() # 如果没有错误发生,返回成功响应 return jsonify({ "success": True, "message": "记录删除成功" }), 200 except sqlite3.Error as e: # 发生错误,回滚事务 db.rollback() # 返回失败响应,并包含错误信息 return jsonify({ "success": False, "message": f"删除失败: {e}" }), 400 # 定义修改数据库记录的 API 接口 @bp.route('/update_item', methods=['PUT']) def update_record(): data = request.get_json() # 检查必要的数据是否提供 if not data or 'table' not in data or 'item' not in data: return jsonify({ "success": False, "message": "请求数据不完整" }), 400 table_name = data['table'] item = data['item'] # 假设 item 的第一个元素是 ID if not item or next(iter(item.keys())) is None: return jsonify({ "success": False, "message": "记录数据为空" }), 400 # 获取 ID 和其他字段值 id_key = next(iter(item.keys())) record_id = item[id_key] updates = {key: value for key, value in item.items() if key != id_key} # 排除 ID cur = db.cursor() try: record_id = int(record_id) # 确保 ID 是整数 except ValueError: return jsonify({ "success": False, "message": "ID 必须是整数" }), 400 # 准备参数列表,包括更新的值和 ID parameters = list(updates.values()) + [record_id] # 执行更新操作 set_clause = ','.join([f"{k} = ?" for k in updates.keys()]) sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_key} = ?" try: cur.execute(sql, parameters) db.commit() if cur.rowcount == 0: return jsonify({ "success": False, "message": "未找到要更新的记录" }), 404 return jsonify({ "success": True, "message": "数据更新成功" }), 200 except sqlite3.Error as e: db.rollback() return jsonify({ "success": False, "message": f"更新失败: {e}" }), 400 # 定义查询数据库记录的 API 接口 @bp.route('/search/record', methods=['GET']) def sql_search(): """ 接收 JSON 格式的请求体,包含表名和要查询的 ID。 尝试查询指定 ID 的记录并返回结果。 :return: """ try: data = request.get_json() # 表名 sql_table = data['table'] # 要搜索的 ID Id = data['id'] # 连接到数据库 cur = db.cursor() # 构造查询语句 sql = f"SELECT * FROM {sql_table} WHERE id = ?" # 执行查询 cur.execute(sql, (Id,)) # 获取查询结果 rows = cur.fetchall() column_names = [desc[0] for desc in cur.description] # 检查是否有结果 if not rows: return jsonify({'error': '未查找到对应数据。'}), 400 # 构造响应数据 results = [] for row in rows: result = {column_names[i]: row[i] for i in range(len(row))} results.append(result) # 关闭游标和数据库连接 cur.close() db.close() # 返回 JSON 响应 return jsonify(results), 200 except sqlite3.Error as e: # 如果发生数据库错误,返回错误信息 return jsonify({'error': str(e)}), 400 except KeyError as e: # 如果请求数据中缺少必要的键,返回错误信息 return jsonify({'error': f'缺少必要的数据字段: {e}'}), 400 # 定义提供数据库列表,用于展示表格的 API 接口 @bp.route('/table', methods=['POST']) def get_table(): data = request.get_json() table_name = data.get('table') if not table_name: return jsonify({'error': '需要表名'}), 400 try: # 创建 sessionmaker 实例 Session = sessionmaker(bind=db.engine) session = Session() # 动态获取表的元数据 metadata = MetaData() table = Table(table_name, metadata, autoload_with=db.engine) # 从数据库中查询所有记录 query = select(table) result = session.execute(query).fetchall() # 将结果转换为列表字典形式 rows = [dict(zip([column.name for column in table.columns], row)) for row in result] # 获取列名 headers = [column.name for column in table.columns] return jsonify(rows=rows, headers=headers), 200 except Exception as e: return jsonify({'error': str(e)}), 400 finally: # 关闭 session session.close()