import logging import os import sqlite3 import uuid from io import BytesIO import pandas as pd from flask import Blueprint, request, jsonify, send_from_directory, current_app, send_file, session from sqlalchemy import text, select, MetaData, Table from sqlalchemy.orm import sessionmaker from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.utils import secure_filename from app import db from .database_models import Models # 配置日志 logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # 创建蓝图 (Blueprint),用于分离路由 bp = Blueprint('frontend', __name__) # 密码加密 def hash_password(password): return generate_password_hash(password) def get_db(): """ 获取数据库连接 """ return sqlite3.connect(current_app.config['DATABASE']) # 定义添加数据库记录的 API 接口 @bp.route('/add_item', methods=['POST']) def add_item(): """ 接收 JSON 格式的请求体,包含表名和要插入的数据。 尝试将数据插入到指定的表中,并进行字段查重。 :return: """ try: # 确保请求体是 JSON 格式 data = request.get_json() if not data: raise ValueError("No JSON data provided") table_name = data.get('table') item_data = data.get('item') if not table_name or not item_data: return jsonify({'error': 'Missing table name or item data'}), 400 # 定义各个表的字段查重规则 duplicate_check_rules = { 'users': ['email', 'username'], 'products': ['product_code'], 'current_reduce': ['Q_over_b', 'pH', 'OM', 'CL', 'H', 'Al'], 'current_reflux': ['OM', 'CL', 'CEC', 'H_plus', 'N', 'Al3_plus', 'Delta_pH'], # 其他表和规则 } # 获取该表的查重字段 duplicate_columns = duplicate_check_rules.get(table_name) if not duplicate_columns: return jsonify({'error': 'No duplicate check rule for this table'}), 400 # 动态构建查询条件,逐一检查是否有重复数据 condition = ' AND '.join([f"{column} = :{column}" for column in duplicate_columns]) duplicate_query = f"SELECT 1 FROM {table_name} WHERE {condition} LIMIT 1" result = db.session.execute(text(duplicate_query), item_data).fetchone() if result: return jsonify({'error': '重复数据,已有相同的数据项存在。'}), 409 # 动态构建 SQL 语句,进行插入操作 columns = ', '.join(item_data.keys()) placeholders = ', '.join([f":{key}" for key in item_data.keys()]) sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" # 直接执行插入操作,无需显式的事务管理 db.session.execute(text(sql), item_data) # 提交事务 db.session.commit() # 返回成功响应 return jsonify({'success': True, 'message': 'Item added successfully'}), 201 except ValueError as e: return jsonify({'error': str(e)}), 400 except KeyError as e: return jsonify({'error': f'Missing data field: {e}'}), 400 except sqlite3.IntegrityError as e: return jsonify({'error': '数据库完整性错误', 'details': str(e)}), 409 except sqlite3.Error as e: return jsonify({'error': '数据库错误', 'details': str(e)}), 500 @bp.route('/delete_item', methods=['POST']) def delete_item(): """ 删除数据库记录的 API 接口 """ data = request.get_json() table_name = data.get('table') condition = data.get('condition') # 检查表名和条件是否提供 if not table_name or not condition: return jsonify({ "success": False, "message": "缺少表名或条件参数" }), 400 # 尝试从条件字符串中解析键和值 try: key, value = condition.split('=') key = key.strip() # 去除多余的空格 value = value.strip().strip("'\"") # 去除多余的空格和引号 except ValueError: return jsonify({ "success": False, "message": "条件格式错误,应为 'key=value'" }), 400 # 准备 SQL 删除语句 sql = f"DELETE FROM {table_name} WHERE {key} = :value" try: # 使用 SQLAlchemy 执行删除 with db.session.begin(): result = db.session.execute(text(sql), {"value": value}) # 检查是否有记录被删除 if result.rowcount == 0: return jsonify({ "success": False, "message": "未找到符合条件的记录" }), 404 return jsonify({ "success": True, "message": "记录删除成功" }), 200 except Exception as e: return jsonify({ "success": False, "message": f"删除失败: {e}" }), 500 # 定义修改数据库记录的 API 接口 @bp.route('/update_item', methods=['PUT']) def update_record(): """ 接收 JSON 格式的请求体,包含表名和更新的数据。 尝试更新指定的记录。 """ data = request.get_json() # 检查必要的数据是否提供 if not data or 'table' not in data or 'item' not in data: return jsonify({ "success": False, "message": "请求数据不完整" }), 400 table_name = data['table'] item = data['item'] # 假设 item 的第一个键是 ID id_key = next(iter(item.keys())) # 获取第一个键 record_id = item.get(id_key) if not record_id: return jsonify({ "success": False, "message": "缺少记录 ID" }), 400 # 获取更新的字段和值 updates = {key: value for key, value in item.items() if key != id_key} if not updates: return jsonify({ "success": False, "message": "没有提供需要更新的字段" }), 400 # 动态构建 SQL set_clause = ', '.join([f"{key} = :{key}" for key in updates.keys()]) sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_key} = :id_value" # 添加 ID 到参数 updates['id_value'] = record_id try: # 使用 SQLAlchemy 执行更新 with db.session.begin(): result = db.session.execute(text(sql), updates) # 检查是否有更新的记录 if result.rowcount == 0: return jsonify({ "success": False, "message": "未找到要更新的记录" }), 404 return jsonify({ "success": True, "message": "数据更新成功" }), 200 except Exception as e: # 捕获所有异常并返回 return jsonify({ "success": False, "message": f"更新失败: {str(e)}" }), 500 # 定义查询数据库记录的 API 接口 @bp.route('/search/record', methods=['GET']) def sql_search(): """ 接收 JSON 格式的请求体,包含表名和要查询的 ID。 尝试查询指定 ID 的记录并返回结果。 :return: """ try: data = request.get_json() # 表名 sql_table = data['table'] # 要搜索的 ID Id = data['id'] # 连接到数据库 cur = db.cursor() # 构造查询语句 sql = f"SELECT * FROM {sql_table} WHERE id = ?" # 执行查询 cur.execute(sql, (Id,)) # 获取查询结果 rows = cur.fetchall() column_names = [desc[0] for desc in cur.description] # 检查是否有结果 if not rows: return jsonify({'error': '未查找到对应数据。'}), 400 # 构造响应数据 results = [] for row in rows: result = {column_names[i]: row[i] for i in range(len(row))} results.append(result) # 关闭游标和数据库连接 cur.close() db.close() # 返回 JSON 响应 return jsonify(results), 200 except sqlite3.Error as e: # 如果发生数据库错误,返回错误信息 return jsonify({'error': str(e)}), 400 except KeyError as e: # 如果请求数据中缺少必要的键,返回错误信息 return jsonify({'error': f'缺少必要的数据字段: {e}'}), 400 # 更新用户信息接口 @bp.route('/update_user', methods=['POST']) def update_user(): # 获取前端传来的数据 data = request.get_json() # 打印收到的请求数据 current_app.logger.info(f"Received data: {data}") user_id = data.get('userId') # 用户ID name = data.get('name') # 用户名 old_password = data.get('oldPassword') # 旧密码 new_password = data.get('newPassword') # 新密码 logger.info(f"Update request received: user_id={user_id}, name={name}") # 校验传入的用户名和密码是否为空 if not name or not old_password: logger.warning("用户名和旧密码不能为空") return jsonify({"success": False, "message": "用户名和旧密码不能为空"}), 400 # 新密码和旧密码不能相同 if new_password and old_password == new_password: logger.warning(f"新密码与旧密码相同:{name}") return jsonify({"success": False, "message": "新密码与旧密码不能相同"}), 400 try: # 查询数据库验证用户ID query = "SELECT * FROM users WHERE id = :user_id" conn = get_db() user = conn.execute(query, {"user_id": user_id}).fetchone() if not user: logger.warning(f"用户ID '{user_id}' 不存在") return jsonify({"success": False, "message": "用户不存在"}), 400 # 获取数据库中存储的密码(假设密码是哈希存储的) stored_password = user[2] # 假设密码存储在数据库的第三列 # 校验旧密码是否正确 if not check_password_hash(stored_password, old_password): logger.warning(f"旧密码错误:{name}") return jsonify({"success": False, "message": "旧密码错误"}), 400 # 如果新密码非空,则更新新密码 if new_password: hashed_new_password = hash_password(new_password) update_query = "UPDATE users SET password = :new_password WHERE id = :user_id" conn.execute(update_query, {"new_password": hashed_new_password, "user_id": user_id}) conn.commit() logger.info(f"User ID '{user_id}' password updated successfully.") # 如果用户名发生更改,则更新用户名 if name != user[1]: update_name_query = "UPDATE users SET name = :new_name WHERE id = :user_id" conn.execute(update_name_query, {"new_name": name, "user_id": user_id}) conn.commit() logger.info(f"User ID '{user_id}' name updated to '{name}' successfully.") return jsonify({"success": True, "message": "用户信息更新成功"}) except Exception as e: # 记录错误日志并返回错误信息 logger.error(f"Error updating user: {e}", exc_info=True) return jsonify({"success": False, "message": "更新失败"}), 500 # 注册用户 @bp.route('/register', methods=['POST']) def register_user(): # 获取前端传来的数据 data = request.get_json() name = data.get('name') # 用户名 password = data.get('password') # 密码 logger.info(f"Register request received: name={name}") # 检查用户名和密码是否为空 if not name or not password: logger.warning("用户名和密码不能为空") return jsonify({"success": False, "message": "用户名和密码不能为空"}), 400 # 动态获取数据库表的列名 columns = get_column_names('users') logger.info(f"Database columns for 'users' table: {columns}") # 检查前端传来的数据是否包含数据库表中所有的必填字段 for column in ['name', 'password']: if column not in columns: logger.error(f"缺少必填字段:{column}") return jsonify({"success": False, "message": f"缺少必填字段:{column}"}), 400 # 对密码进行哈希处理 hashed_password = hash_password(password) logger.info(f"Password hashed for user: {name}") # 插入到数据库 try: # 检查用户是否已经存在 query = "SELECT * FROM users WHERE name = :name" conn = get_db() user = conn.execute(query, {"name": name}).fetchone() if user: logger.warning(f"用户名 '{name}' 已存在") return jsonify({"success": False, "message": "用户名已存在"}), 400 # 向数据库插入数据 query = "INSERT INTO users (name, password) VALUES (:name, :password)" conn.execute(query, {"name": name, "password": hashed_password}) conn.commit() logger.info(f"User '{name}' registered successfully.") return jsonify({"success": True, "message": "注册成功"}) except Exception as e: # 记录错误日志并返回错误信息 logger.error(f"Error registering user: {e}", exc_info=True) return jsonify({"success": False, "message": "注册失败"}), 500 def get_column_names(table_name): """ 动态获取数据库表的列名。 """ try: conn = get_db() query = f"PRAGMA table_info({table_name});" result = conn.execute(query).fetchall() conn.close() return [row[1] for row in result] # 第二列是列名 except Exception as e: logger.error(f"Error getting column names for table {table_name}: {e}", exc_info=True) return [] # 导出数据 @bp.route('/export_data', methods=['GET']) def export_data(): table_name = request.args.get('table') file_format = request.args.get('format', 'excel').lower() if not table_name: return jsonify({'error': '缺少表名参数'}), 400 if not table_name.isidentifier(): return jsonify({'error': '无效的表名'}), 400 try: conn = get_db() query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;" table_exists = conn.execute(query, (table_name,)).fetchone() if not table_exists: return jsonify({'error': f"表 {table_name} 不存在"}), 404 query = f"SELECT * FROM {table_name};" df = pd.read_sql(query, conn) output = BytesIO() if file_format == 'csv': df.to_csv(output, index=False, encoding='utf-8') output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_data.csv', mimetype='text/csv') elif file_format == 'excel': df.to_excel(output, index=False, engine='openpyxl') output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_data.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') else: return jsonify({'error': '不支持的文件格式,仅支持 CSV 和 Excel'}), 400 except Exception as e: logger.error(f"Error in export_data: {e}", exc_info=True) return jsonify({'error': str(e)}), 500 # 导入数据接口 @bp.route('/import_data', methods=['POST']) def import_data(): logger.debug("Import data endpoint accessed.") if 'file' not in request.files: logger.error("No file in request.") return jsonify({'success': False, 'message': '文件缺失'}), 400 file = request.files['file'] table_name = request.form.get('table') if not table_name: logger.error("Missing table name parameter.") return jsonify({'success': False, 'message': '缺少表名参数'}), 400 if file.filename == '': logger.error("No file selected.") return jsonify({'success': False, 'message': '未选择文件'}), 400 try: # 保存文件到临时路径 temp_path = os.path.join(current_app.config['UPLOAD_FOLDER'], secure_filename(file.filename)) file.save(temp_path) logger.debug(f"File saved to temporary path: {temp_path}") # 根据文件类型读取文件 if file.filename.endswith('.xlsx'): df = pd.read_excel(temp_path) elif file.filename.endswith('.csv'): df = pd.read_csv(temp_path) else: logger.error("Unsupported file format.") return jsonify({'success': False, 'message': '仅支持 Excel 和 CSV 文件'}), 400 # 获取数据库列名 db_columns = get_column_names(table_name) if 'id' in db_columns: db_columns.remove('id') # 假设 id 列是自增的,不需要处理 if not set(db_columns).issubset(set(df.columns)): logger.error(f"File columns do not match database columns. File columns: {df.columns.tolist()}, Expected: {db_columns}") return jsonify({'success': False, 'message': '文件列名与数据库表不匹配'}), 400 # 清洗数据并删除空值行 df_cleaned = df[db_columns].dropna() # 统一数据类型,避免 int 和 float 合并问题 df_cleaned[db_columns] = df_cleaned[db_columns].apply(pd.to_numeric, errors='coerce') # 获取现有的数据 conn = get_db() with conn: existing_data = pd.read_sql(f"SELECT * FROM {table_name}", conn) # 查找重复数据 duplicates = df_cleaned.merge(existing_data, on=db_columns, how='inner') # 如果有重复数据,删除它们 df_cleaned = df_cleaned[~df_cleaned.index.isin(duplicates.index)] logger.warning(f"Duplicate data detected and removed: {duplicates}") # 获取导入前后的数据量 total_data = len(df_cleaned) + len(duplicates) new_data = len(df_cleaned) duplicate_data = len(duplicates) # 导入不重复的数据 df_cleaned.to_sql(table_name, conn, if_exists='append', index=False) logger.debug(f"Imported {new_data} new records into the database.") # 删除临时文件 os.remove(temp_path) logger.debug(f"Temporary file removed: {temp_path}") # 返回结果 return jsonify({ 'success': True, 'message': '数据导入成功', 'total_data': total_data, 'new_data': new_data, 'duplicate_data': duplicate_data }), 200 except Exception as e: logger.error(f"Import failed: {e}", exc_info=True) return jsonify({'success': False, 'message': f'导入失败: {str(e)}'}), 500 # 模板下载接口 @bp.route('/download_template', methods=['GET']) def download_template(): """ 根据给定的表名,下载表的模板(如 CSV 或 Excel 格式)。 """ table_name = request.args.get('table') if not table_name: return jsonify({'error': '表名参数缺失'}), 400 columns = get_column_names(table_name) if not columns: return jsonify({'error': f"Table '{table_name}' not found or empty."}), 404 # 不包括 ID 列 if 'id' in columns: columns.remove('id') df = pd.DataFrame(columns=columns) file_format = request.args.get('format', 'excel').lower() try: if file_format == 'csv': output = BytesIO() df.to_csv(output, index=False, encoding='utf-8') output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_template.csv', mimetype='text/csv') else: output = BytesIO() df.to_excel(output, index=False, engine='openpyxl') output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_template.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') except Exception as e: logger.error(f"Failed to generate template: {e}", exc_info=True) return jsonify({'error': '生成模板文件失败'}), 500 # 切换模型接口 @bp.route('/switch-model', methods=['POST']) def switch_model(): session = None try: data = request.get_json() model_id = data.get('model_id') model_name = data.get('model_name') # 创建 session Session = sessionmaker(bind=db.engine) session = Session() # 查找模型 model = session.query(Models).filter_by(ModelID=model_id).first() if not model: return jsonify({'error': 'Model not found'}), 404 # 更新模型状态(或其他切换逻辑) # 假设此处是更新模型的某些字段来进行切换 model.status = 'active' # 假设有一个字段记录模型状态 session.commit() # 记录切换日志 logger.info(f'Model {model_name} (ID: {model_id}) switched successfully.') return jsonify({'success': True, 'message': f'Model {model_name} switched successfully!'}), 200 except Exception as e: logger.error('Failed to switch model:', exc_info=True) return jsonify({'error': str(e)}), 400 finally: if session: session.close() # 修改了一下登录·接口 @bp.route('/login', methods=['POST']) def login_user(): data = request.get_json() logger.debug(f"Received login data: {data}") # 增加调试日志 name = data.get('name') password = data.get('password') logger.info(f"Login request received for user: {name}") if not isinstance(name, str) or not isinstance(password, str): logger.warning("Username and password must be strings") return jsonify({"success": False, "message": "用户名和密码必须为字符串"}), 400 if not name or not password: logger.warning("Username and password cannot be empty") return jsonify({"success": False, "message": "用户名和密码不能为空"}), 400 query = "SELECT id, name, password FROM users WHERE name = ?" try: with get_db() as conn: user = conn.execute(query, (name,)).fetchone() if not user: logger.warning(f"User '{name}' does not exist") return jsonify({"success": False, "message": "用户名不存在"}), 400 stored_password = user[2] # 假设 'password' 是第三个字段 user_id = user[0] # 假设 'id' 是第一个字段 if check_password_hash(stored_password, password): session['name'] = name logger.info(f"User '{name}' logged in successfully.") return jsonify({ "success": True, "message": "登录成功", "userId": user_id, "name": name }) else: logger.warning(f"Incorrect password for user '{name}'") return jsonify({"success": False, "message": "用户名或密码错误"}), 400 except sqlite3.DatabaseError as db_err: logger.error(f"Database error during login process: {db_err}", exc_info=True) return jsonify({"success": False, "message": "数据库错误"}), 500 except Exception as e: logger.error(f"Unexpected error during login process: {e}", exc_info=True) return jsonify({"success": False, "message": "登录失败"}), 500 # 添加退出登录状态接口 @bp.route('/logout', methods=['GET', 'POST']) def logout_user(): try: session.clear() return jsonify({"msg": "退出成功"}), 200 except Exception as e: logger.error(f"Error during logout process: {e}", exc_info=True) return jsonify({"msg": "退出失败"}), 500 # 获取软件介绍信息的路由 @bp.route('/software-intro/', methods=['GET']) def get_software_intro(id): try: conn = get_db() cursor = conn.cursor() cursor.execute('SELECT title, intro FROM software_intro WHERE id = ?', (id,)) result = cursor.fetchone() conn.close() if result: title, intro = result return jsonify({ 'title': title, 'intro': intro }) return jsonify({}), 404 except sqlite3.Error as e: print(f"数据库错误: {e}") return jsonify({"error": f"数据库错误: {str(e)}"}), 500 # 更新软件介绍信息的路由 @bp.route('/software-intro/', methods=['PUT']) def update_software_intro(id): try: data = request.get_json() title = data.get('title') intro = data.get('intro') conn = get_db() cursor = conn.cursor() cursor.execute('UPDATE software_intro SET title =?, intro =? WHERE id = ?', (title, intro, id)) conn.commit() conn.close() return jsonify({'message': '软件介绍更新成功'}) except sqlite3.Error as e: print(f"数据库错误: {e}") return jsonify({"error": f"数据库错误: {str(e)}"}), 500 # 处理图片上传的路由 @bp.route('/upload-image', methods=['POST']) def upload_image(): file = request.files['image'] if file: filename = str(uuid.uuid4()) + '.' + file.filename.rsplit('.', 1)[1].lower() file.save(os.path.join(os.getcwd(), 'uploads', filename)) imageUrl = f'http://127.0.0.1:5000/uploads/{filename}' return jsonify({'imageUrl': imageUrl}) return jsonify({'error': '未找到图片文件'}), 400 # 配置静态资源服务 @bp.route('/uploads/') def serve_image(filename): uploads_folder = os.path.join(os.getcwd(), 'uploads') return send_from_directory(uploads_folder, filename)