import os import pickle import sqlite3 import pandas as pd from flask import current_app, send_file from sqlalchemy.orm import sessionmaker from flask import Blueprint, request, jsonify, current_app as app from werkzeug.security import generate_password_hash from werkzeug.utils import secure_filename import logging from .model import predict, train_and_save_model from .database_models import Models, ModelParameters, Datasets, CurrentReduce, CurrentReflux from . import db from .utils import create_dynamic_table, allowed_file, infer_column_types, rename_columns_for_model_predict, \ clean_column_names, rename_columns_for_model, insert_data_into_dynamic_table, insert_data_into_existing_table, \ predict_to_Q from sqlalchemy import text from sqlalchemy.engine.reflection import Inspector from sqlalchemy import MetaData, Table, select from io import BytesIO from flask import Blueprint, request, jsonify from flask import Flask, request, jsonify from werkzeug.security import check_password_hash # 配置日志 logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # 创建蓝图 (Blueprint) bp = Blueprint('routes', __name__) # 密码加密 def hash_password(password): return generate_password_hash(password) def get_db(): """ 获取数据库连接 """ return sqlite3.connect(app.config['DATABASE']) def get_column_names(table_name): """ 动态获取数据库表的列名。 """ try: conn = get_db() query = f"PRAGMA table_info({table_name});" result = conn.execute(query).fetchall() conn.close() return [row[1] for row in result] # 第二列是列名 except Exception as e: logger.error(f"Error getting column names for table {table_name}: {e}", exc_info=True) return [] def get_model_class_for_table(table_name): """ 根据表名动态获取对应的 SQLAlchemy 模型类。 """ model_mapping = { 'current_reduce': CurrentReduce, 'current_reflux': CurrentReflux, # 添加更多模型映射 } return model_mapping.get(table_name) def complex_reflux_calculation(parameters): """ 假设的复杂 'reflux' 计算函数 :param parameters: 输入的参数 :return: 计算结果 """ try: # 加载已训练好的模型 with open('path_to_your_reflux_model.pkl', 'rb') as model_file: model = pickle.load(model_file) input_data = pd.DataFrame([parameters]) # 转换参数为DataFrame(假设是这种格式) reflux_result = model.predict(input_data) # 根据实际情况调用模型的 predict 方法 # 返回预测结果 return reflux_result.tolist() except Exception as e: logger.error('Failed to calculate reflux model prediction:', exc_info=True) return {'error': str(e)} @bp.route('/upload-dataset', methods=['POST']) def upload_dataset(): session = None try: if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '' or not allowed_file(file.filename): return jsonify({'error': 'No selected file or invalid file type'}), 400 dataset_name = request.form.get('dataset_name') dataset_description = request.form.get('dataset_description', 'No description provided') dataset_type = request.form.get('dataset_type') if not dataset_type: return jsonify({'error': 'Dataset type is required'}), 400 # 创建 sessionmaker 实例 Session = sessionmaker(bind=db.engine) session = Session() # 创建新的数据集记录 new_dataset = Datasets( Dataset_name=dataset_name, Dataset_description=dataset_description, Row_count=0, Status='pending', Dataset_type=dataset_type ) session.add(new_dataset) session.commit() # 文件保存 unique_filename = f"dataset_{new_dataset.Dataset_ID}.xlsx" upload_folder = current_app.config['UPLOAD_FOLDER'] file_path = os.path.join(upload_folder, unique_filename) file.save(file_path) # 处理数据集 dataset_df = pd.read_excel(file_path) new_dataset.Row_count = len(dataset_df) new_dataset.Status = 'processed' session.commit() # 清理列名和数据处理 dataset_df = clean_column_names(dataset_df) dataset_df = rename_columns_for_model(dataset_df, dataset_type) column_types = infer_column_types(dataset_df) dynamic_table_class = create_dynamic_table(new_dataset.Dataset_ID, column_types) insert_data_into_dynamic_table(session, dataset_df, dynamic_table_class) # 根据 dataset_type 决定插入到哪个已有表 if dataset_type == 'reduce': insert_data_into_existing_table(session, dataset_df, CurrentReduce) elif dataset_type == 'reflux': insert_data_into_existing_table(session, dataset_df, CurrentReflux) session.commit() return jsonify({ 'message': f'Dataset {dataset_name} uploaded successfully!', 'dataset_id': new_dataset.Dataset_ID, 'filename': unique_filename }), 201 except Exception as e: if session: session.rollback() logger.error('Failed to process the dataset upload:', exc_info=True) return jsonify({'error': str(e)}), 500 finally: if session: session.close() @bp.route('/train-and-save-model', methods=['POST']) def train_and_save_model_endpoint(): session = None data = request.get_json() try: # 获取请求中的数据 model_type = data.get('model_type') model_name = data.get('model_name') model_description = data.get('model_description') data_type = data.get('data_type') dataset_id = data.get('dataset_id', None) # 创建 session Session = sessionmaker(bind=db.engine) session = Session() # 调用训练和保存模型的函数 result = train_and_save_model(session, model_type, model_name, model_description, data_type, dataset_id) return jsonify({'message': 'Model trained and saved successfully', 'result': result}), 200 except Exception as e: if session: session.rollback() logger.error('Failed to train and save model:', exc_info=True) return jsonify({'error': 'Failed to train and save model', 'message': str(e)}), 500 finally: if session: session.close() # 预测接口 @bp.route('/predict', methods=['POST']) def predict_route(): session = None try: data = request.get_json() model_id = data.get('model_id') parameters = data.get('parameters', {}) # 创建 session Session = sessionmaker(bind=db.engine) session = Session() # 查询模型信息 model_info = session.query(Models).filter(Models.ModelID == model_id).first() if not model_info: return jsonify({'error': 'Model not found'}), 404 data_type = model_info.Data_type input_data = pd.DataFrame([parameters]) # 处理 'reduce' 类型模型 if data_type == 'reduce': init_ph = float(parameters.get('init_pH', 0.0)) target_ph = float(parameters.get('target_pH', 0.0)) input_data = input_data.drop('target_pH', axis=1, errors='ignore') # 通过模型类型重命名列以适配模型 input_data_rename = rename_columns_for_model_predict(input_data, data_type) # 使用当前活动的模型进行预测(假设根据 model_id 获取当前模型) predictions = predict(session, input_data_rename, model_id) # 如果是 reduce 类型,进行特定的转换 if data_type == 'reduce': predictions = predictions[0] predictions = predict_to_Q(predictions, init_ph, target_ph) return jsonify({'result': predictions}), 200 except Exception as e: logger.error('Failed to predict:', exc_info=True) return jsonify({'error': str(e)}), 400 finally: if session: session.close() # 切换模型接口 @bp.route('/switch-model', methods=['POST']) def switch_model(): session = None try: data = request.get_json() model_id = data.get('model_id') model_name = data.get('model_name') # 创建 session Session = sessionmaker(bind=db.engine) session = Session() # 查找模型 model = session.query(Models).filter_by(ModelID=model_id).first() if not model: return jsonify({'error': 'Model not found'}), 404 # 更新模型状态(或其他切换逻辑) # 假设此处是更新模型的某些字段来进行切换 model.status = 'active' # 假设有一个字段记录模型状态 session.commit() # 记录切换日志 logger.info(f'Model {model_name} (ID: {model_id}) switched successfully.') return jsonify({'success': True, 'message': f'Model {model_name} switched successfully!'}), 200 except Exception as e: logger.error('Failed to switch model:', exc_info=True) return jsonify({'error': str(e)}), 400 finally: if session: session.close() @bp.route('/models', methods=['GET']) def get_models(): session = None try: # 创建 session Session = sessionmaker(bind=db.engine) session = Session() # 查询所有模型 models = session.query(Models).all() logger.debug(f"Models found: {models}") # 打印查询的模型数据 if not models: return jsonify({'message': 'No models found'}), 404 # 将模型数据转换为字典列表 models_list = [ { 'ModelID': model.ModelID, 'ModelName': model.Model_name, 'ModelType': model.Model_type, 'CreatedAt': model.Created_at.strftime('%Y-%m-%d %H:%M:%S'), 'Description': model.Description, 'DatasetID': model.DatasetID, 'ModelFilePath': model.ModelFilePath, 'DataType': model.Data_type, 'PerformanceScore': model.Performance_score } for model in models ] return jsonify(models_list), 200 except Exception as e: return jsonify({'error': str(e)}), 400 finally: if session: session.close() @bp.route('/delete-dataset/', methods=['DELETE']) def delete_dataset(dataset_id): # 创建 sessionmaker 实例 Session = sessionmaker(bind=db.engine) session = Session() try: # 查询数据集 dataset = session.query(Datasets).filter_by(Dataset_ID=dataset_id).first() if not dataset: return jsonify({'error': 'Dataset not found'}), 404 # 删除文件 filename = f"dataset_{dataset.Dataset_ID}.xlsx" file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename) if os.path.exists(file_path): os.remove(file_path) # 删除数据表 table_name = f"dataset_{dataset.Dataset_ID}" session.execute(text(f"DROP TABLE IF EXISTS {table_name}")) # 删除数据集记录 session.delete(dataset) session.commit() return jsonify({'message': 'Dataset deleted successfully'}), 200 except Exception as e: session.rollback() logging.error(f'Failed to delete dataset {dataset_id}:', exc_info=True) return jsonify({'error': str(e)}), 500 finally: session.close() @bp.route('/tables', methods=['GET']) def list_tables(): engine = db.engine # 使用 db 实例的 engine inspector = Inspector.from_engine(engine) # 创建 Inspector 对象 table_names = inspector.get_table_names() # 获取所有表名 return jsonify(table_names) # 以 JSON 形式返回表名列表 @bp.route('/model-parameters', methods=['GET']) def get_all_model_parameters(): try: parameters = ModelParameters.query.all() # 获取所有参数数据 if parameters: result = [ { 'ParamID': param.ParamID, 'ModelID': param.ModelID, 'ParamName': param.ParamName, 'ParamValue': param.ParamValue } for param in parameters ] return jsonify(result) else: return jsonify({'message': 'No parameters found'}), 404 except Exception as e: return jsonify({'error': 'Internal server error', 'message': str(e)}), 500 # 定义添加数据库记录的 API 接口 @bp.route('/add_item', methods=['POST']) def add_item(): """ 接收 JSON 格式的请求体,包含表名和要插入的数据。 尝试将数据插入到指定的表中,并进行字段查重。 :return: """ try: # 确保请求体是 JSON 格式 data = request.get_json() if not data: raise ValueError("No JSON data provided") table_name = data.get('table') item_data = data.get('item') if not table_name or not item_data: return jsonify({'error': 'Missing table name or item data'}), 400 # 定义各个表的字段查重规则 duplicate_check_rules = { 'users': ['email', 'username'], 'products': ['product_code'], 'current_reduce': [ 'Q_over_b', 'pH', 'OM', 'CL', 'H', 'Al'], 'current_reflux': ['OM', 'CL', 'CEC', 'H_plus', 'N', 'Al3_plus', 'Delta_pH'], # 其他表和规则 } # 获取该表的查重字段 duplicate_columns = duplicate_check_rules.get(table_name) if not duplicate_columns: return jsonify({'error': 'No duplicate check rule for this table'}), 400 # 动态构建查询条件,逐一检查是否有重复数据 condition = ' AND '.join([f"{column} = :{column}" for column in duplicate_columns]) duplicate_query = f"SELECT 1 FROM {table_name} WHERE {condition} LIMIT 1" result = db.session.execute(text(duplicate_query), item_data).fetchone() if result: return jsonify({'error': '重复数据,已有相同的数据项存在。'}), 409 # 动态构建 SQL 语句,进行插入操作 columns = ', '.join(item_data.keys()) placeholders = ', '.join([f":{key}" for key in item_data.keys()]) sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" # 直接执行插入操作,无需显式的事务管理 db.session.execute(text(sql), item_data) # 提交事务 db.session.commit() # 返回成功响应 return jsonify({'success': True, 'message': 'Item added successfully'}), 201 except ValueError as e: return jsonify({'error': str(e)}), 400 except KeyError as e: return jsonify({'error': f'Missing data field: {e}'}), 400 except sqlite3.IntegrityError as e: return jsonify({'error': '数据库完整性错误', 'details': str(e)}), 409 except sqlite3.Error as e: return jsonify({'error': '数据库错误', 'details': str(e)}), 500 @bp.route('/delete_item', methods=['POST']) def delete_item(): """ 删除数据库记录的 API 接口 """ data = request.get_json() table_name = data.get('table') condition = data.get('condition') # 检查表名和条件是否提供 if not table_name or not condition: return jsonify({ "success": False, "message": "缺少表名或条件参数" }), 400 # 尝试从条件字符串中解析键和值 try: key, value = condition.split('=') key = key.strip() # 去除多余的空格 value = value.strip().strip("'\"") # 去除多余的空格和引号 except ValueError: return jsonify({ "success": False, "message": "条件格式错误,应为 'key=value'" }), 400 # 准备 SQL 删除语句 sql = f"DELETE FROM {table_name} WHERE {key} = :value" try: # 使用 SQLAlchemy 执行删除 with db.session.begin(): result = db.session.execute(text(sql), {"value": value}) # 检查是否有记录被删除 if result.rowcount == 0: return jsonify({ "success": False, "message": "未找到符合条件的记录" }), 404 return jsonify({ "success": True, "message": "记录删除成功" }), 200 except Exception as e: return jsonify({ "success": False, "message": f"删除失败: {e}" }), 500 # 定义修改数据库记录的 API 接口 @bp.route('/update_item', methods=['PUT']) def update_record(): """ 接收 JSON 格式的请求体,包含表名和更新的数据。 尝试更新指定的记录。 """ data = request.get_json() # 检查必要的数据是否提供 if not data or 'table' not in data or 'item' not in data: return jsonify({ "success": False, "message": "请求数据不完整" }), 400 table_name = data['table'] item = data['item'] # 假设 item 的第一个键是 ID id_key = next(iter(item.keys())) # 获取第一个键 record_id = item.get(id_key) if not record_id: return jsonify({ "success": False, "message": "缺少记录 ID" }), 400 # 获取更新的字段和值 updates = {key: value for key, value in item.items() if key != id_key} if not updates: return jsonify({ "success": False, "message": "没有提供需要更新的字段" }), 400 # 动态构建 SQL set_clause = ', '.join([f"{key} = :{key}" for key in updates.keys()]) sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_key} = :id_value" # 添加 ID 到参数 updates['id_value'] = record_id try: # 使用 SQLAlchemy 执行更新 with db.session.begin(): result = db.session.execute(text(sql), updates) # 检查是否有更新的记录 if result.rowcount == 0: return jsonify({ "success": False, "message": "未找到要更新的记录" }), 404 return jsonify({ "success": True, "message": "数据更新成功" }), 200 except Exception as e: # 捕获所有异常并返回 return jsonify({ "success": False, "message": f"更新失败: {str(e)}" }), 500 # 定义查询数据库记录的 API 接口 @bp.route('/search/record', methods=['GET']) def sql_search(): """ 接收 JSON 格式的请求体,包含表名和要查询的 ID。 尝试查询指定 ID 的记录并返回结果。 :return: """ try: data = request.get_json() # 表名 sql_table = data['table'] # 要搜索的 ID Id = data['id'] # 连接到数据库 cur = db.cursor() # 构造查询语句 sql = f"SELECT * FROM {sql_table} WHERE id = ?" # 执行查询 cur.execute(sql, (Id,)) # 获取查询结果 rows = cur.fetchall() column_names = [desc[0] for desc in cur.description] # 检查是否有结果 if not rows: return jsonify({'error': '未查找到对应数据。'}), 400 # 构造响应数据 results = [] for row in rows: result = {column_names[i]: row[i] for i in range(len(row))} results.append(result) # 关闭游标和数据库连接 cur.close() db.close() # 返回 JSON 响应 return jsonify(results), 200 except sqlite3.Error as e: # 如果发生数据库错误,返回错误信息 return jsonify({'error': str(e)}), 400 except KeyError as e: # 如果请求数据中缺少必要的键,返回错误信息 return jsonify({'error': f'缺少必要的数据字段: {e}'}), 400 # 模板下载接口 @bp.route('/download_template', methods=['GET']) def download_template(): """ 根据给定的表名,下载表的模板(如 CSV 或 Excel 格式)。 """ table_name = request.args.get('table') if not table_name: return jsonify({'error': '表名参数缺失'}), 400 columns = get_column_names(table_name) if not columns: return jsonify({'error': f"Table '{table_name}' not found or empty."}), 404 # 不包括 ID 列 if 'id' in columns: columns.remove('id') df = pd.DataFrame(columns=columns) file_format = request.args.get('format', 'excel').lower() try: if file_format == 'csv': output = BytesIO() df.to_csv(output, index=False, encoding='utf-8') output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_template.csv', mimetype='text/csv') else: output = BytesIO() df.to_excel(output, index=False, engine='openpyxl') output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_template.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') except Exception as e: logger.error(f"Failed to generate template: {e}", exc_info=True) return jsonify({'error': '生成模板文件失败'}), 500 # 导出数据 @bp.route('/export_data', methods=['GET']) def export_data(): table_name = request.args.get('table') file_format = request.args.get('format', 'excel').lower() if not table_name: return jsonify({'error': '缺少表名参数'}), 400 if not table_name.isidentifier(): return jsonify({'error': '无效的表名'}), 400 try: conn = get_db() query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;" table_exists = conn.execute(query, (table_name,)).fetchone() if not table_exists: return jsonify({'error': f"表 {table_name} 不存在"}), 404 query = f"SELECT * FROM {table_name};" df = pd.read_sql(query, conn) output = BytesIO() if file_format == 'csv': df.to_csv(output, index=False, encoding='utf-8') output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_data.csv', mimetype='text/csv') elif file_format == 'excel': df.to_excel(output, index=False, engine='openpyxl') output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_data.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') else: return jsonify({'error': '不支持的文件格式,仅支持 CSV 和 Excel'}), 400 except Exception as e: logger.error(f"Error in export_data: {e}", exc_info=True) return jsonify({'error': str(e)}), 500 # 导入数据接口 @bp.route('/import_data', methods=['POST']) def import_data(): logger.debug("Import data endpoint accessed.") if 'file' not in request.files: logger.error("No file in request.") return jsonify({'success': False, 'message': '文件缺失'}), 400 file = request.files['file'] table_name = request.form.get('table') if not table_name: logger.error("Missing table name parameter.") return jsonify({'success': False, 'message': '缺少表名参数'}), 400 if file.filename == '': logger.error("No file selected.") return jsonify({'success': False, 'message': '未选择文件'}), 400 try: # 保存文件到临时路径 temp_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(file.filename)) file.save(temp_path) logger.debug(f"File saved to temporary path: {temp_path}") # 根据文件类型读取文件 if file.filename.endswith('.xlsx'): df = pd.read_excel(temp_path) elif file.filename.endswith('.csv'): df = pd.read_csv(temp_path) else: logger.error("Unsupported file format.") return jsonify({'success': False, 'message': '仅支持 Excel 和 CSV 文件'}), 400 # 获取数据库列名 db_columns = get_column_names(table_name) if 'id' in db_columns: db_columns.remove('id') # 假设 id 列是自增的,不需要处理 if not set(db_columns).issubset(set(df.columns)): logger.error(f"File columns do not match database columns. File columns: {df.columns.tolist()}, Expected: {db_columns}") return jsonify({'success': False, 'message': '文件列名与数据库表不匹配'}), 400 # 清洗数据并删除空值行 df_cleaned = df[db_columns].dropna() # 统一数据类型,避免 int 和 float 合并问题 df_cleaned[db_columns] = df_cleaned[db_columns].apply(pd.to_numeric, errors='coerce') # 获取现有的数据 conn = get_db() with conn: existing_data = pd.read_sql(f"SELECT * FROM {table_name}", conn) # 查找重复数据 duplicates = df_cleaned.merge(existing_data, on=db_columns, how='inner') # 如果有重复数据,删除它们 df_cleaned = df_cleaned[~df_cleaned.index.isin(duplicates.index)] logger.warning(f"Duplicate data detected and removed: {duplicates}") # 获取导入前后的数据量 total_data = len(df_cleaned) + len(duplicates) new_data = len(df_cleaned) duplicate_data = len(duplicates) # 导入不重复的数据 df_cleaned.to_sql(table_name, conn, if_exists='append', index=False) logger.debug(f"Imported {new_data} new records into the database.") # 删除临时文件 os.remove(temp_path) logger.debug(f"Temporary file removed: {temp_path}") # 返回结果 return jsonify({ 'success': True, 'message': '数据导入成功', 'total_data': total_data, 'new_data': new_data, 'duplicate_data': duplicate_data }), 200 except Exception as e: logger.error(f"Import failed: {e}", exc_info=True) return jsonify({'success': False, 'message': f'导入失败: {str(e)}'}), 500 # 定义提供数据库列表,用于展示表格的 API 接口 @bp.route('/table', methods=['POST']) def get_table(): data = request.get_json() table_name = data.get('table') if not table_name: return jsonify({'error': '需要表名'}), 400 try: # 创建 sessionmaker 实例 Session = sessionmaker(bind=db.engine) session = Session() # 动态获取表的元数据 metadata = MetaData() table = Table(table_name, metadata, autoload_with=db.engine) # 从数据库中查询所有记录 query = select(table) result = session.execute(query).fetchall() # 将结果转换为列表字典形式 rows = [dict(zip([column.name for column in table.columns], row)) for row in result] # 获取列名 headers = [column.name for column in table.columns] return jsonify(rows=rows, headers=headers), 200 except Exception as e: return jsonify({'error': str(e)}), 400 finally: # 关闭 session session.close() # 注册用户 @bp.route('/register', methods=['POST']) def register_user(): # 获取前端传来的数据 data = request.get_json() name = data.get('name') # 用户名 password = data.get('password') # 密码 logger.info(f"Register request received: name={name}") # 检查用户名和密码是否为空 if not name or not password: logger.warning("用户名和密码不能为空") return jsonify({"success": False, "message": "用户名和密码不能为空"}), 400 # 动态获取数据库表的列名 columns = get_column_names('users') logger.info(f"Database columns for 'users' table: {columns}") # 检查前端传来的数据是否包含数据库表中所有的必填字段 for column in ['name', 'password']: if column not in columns: logger.error(f"缺少必填字段:{column}") return jsonify({"success": False, "message": f"缺少必填字段:{column}"}), 400 # 对密码进行哈希处理 hashed_password = hash_password(password) logger.info(f"Password hashed for user: {name}") # 插入到数据库 try: # 检查用户是否已经存在 query = "SELECT * FROM users WHERE name = :name" conn = get_db() user = conn.execute(query, {"name": name}).fetchone() if user: logger.warning(f"用户名 '{name}' 已存在") return jsonify({"success": False, "message": "用户名已存在"}), 400 # 向数据库插入数据 query = "INSERT INTO users (name, password) VALUES (:name, :password)" conn.execute(query, {"name": name, "password": hashed_password}) conn.commit() logger.info(f"User '{name}' registered successfully.") return jsonify({"success": True, "message": "注册成功"}) except Exception as e: # 记录错误日志并返回错误信息 logger.error(f"Error registering user: {e}", exc_info=True) return jsonify({"success": False, "message": "注册失败"}), 500 @bp.route('/login', methods=['POST']) def login_user(): # 获取前端传来的数据 data = request.get_json() name = data.get('name') # 用户名 password = data.get('password') # 密码 logger.info(f"Login request received: name={name}") # 检查用户名和密码是否为空 if not name or not password: logger.warning("用户名和密码不能为空") return jsonify({"success": False, "message": "用户名和密码不能为空"}), 400 try: # 查询数据库验证用户名 query = "SELECT * FROM users WHERE name = :name" conn = get_db() user = conn.execute(query, {"name": name}).fetchone() if not user: logger.warning(f"用户名 '{name}' 不存在") return jsonify({"success": False, "message": "用户名不存在"}), 400 # 获取数据库中存储的密码(假设密码是哈希存储的) stored_password = user[2] # 假设密码存储在数据库的第三列 user_id = user[0] # 假设 id 存储在数据库的第一列 # 校验密码是否正确 if check_password_hash(stored_password, password): logger.info(f"User '{name}' logged in successfully.") return jsonify({ "success": True, "message": "登录成功", "userId": user_id # 返回用户 ID }) else: logger.warning(f"Invalid password for user '{name}'") return jsonify({"success": False, "message": "用户名或密码错误"}), 400 except Exception as e: # 记录错误日志并返回错误信息 logger.error(f"Error during login: {e}", exc_info=True) return jsonify({"success": False, "message": "登录失败"}), 500 # 更新用户信息接口 @bp.route('/update_user', methods=['POST']) def update_user(): # 获取前端传来的数据 data = request.get_json() # 打印收到的请求数据 app.logger.info(f"Received data: {data}") user_id = data.get('userId') # 用户ID name = data.get('name') # 用户名 old_password = data.get('oldPassword') # 旧密码 new_password = data.get('newPassword') # 新密码 logger.info(f"Update request received: user_id={user_id}, name={name}") # 校验传入的用户名和密码是否为空 if not name or not old_password: logger.warning("用户名和旧密码不能为空") return jsonify({"success": False, "message": "用户名和旧密码不能为空"}), 400 # 新密码和旧密码不能相同 if new_password and old_password == new_password: logger.warning(f"新密码与旧密码相同:{name}") return jsonify({"success": False, "message": "新密码与旧密码不能相同"}), 400 try: # 查询数据库验证用户ID query = "SELECT * FROM users WHERE id = :user_id" conn = get_db() user = conn.execute(query, {"user_id": user_id}).fetchone() if not user: logger.warning(f"用户ID '{user_id}' 不存在") return jsonify({"success": False, "message": "用户不存在"}), 400 # 获取数据库中存储的密码(假设密码是哈希存储的) stored_password = user[2] # 假设密码存储在数据库的第三列 # 校验旧密码是否正确 if not check_password_hash(stored_password, old_password): logger.warning(f"旧密码错误:{name}") return jsonify({"success": False, "message": "旧密码错误"}), 400 # 如果新密码非空,则更新新密码 if new_password: hashed_new_password = hash_password(new_password) update_query = "UPDATE users SET password = :new_password WHERE id = :user_id" conn.execute(update_query, {"new_password": hashed_new_password, "user_id": user_id}) conn.commit() logger.info(f"User ID '{user_id}' password updated successfully.") # 如果用户名发生更改,则更新用户名 if name != user[1]: update_name_query = "UPDATE users SET name = :new_name WHERE id = :user_id" conn.execute(update_name_query, {"new_name": name, "user_id": user_id}) conn.commit() logger.info(f"User ID '{user_id}' name updated to '{name}' successfully.") return jsonify({"success": True, "message": "用户信息更新成功"}) except Exception as e: # 记录错误日志并返回错误信息 logger.error(f"Error updating user: {e}", exc_info=True) return jsonify({"success": False, "message": "更新失败"}), 500