123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744 |
- import logging
- import os
- import sqlite3
- import uuid
- from io import BytesIO
- import pandas as pd
- from flask import Blueprint, request, jsonify, send_from_directory, current_app, send_file, session
- from sqlalchemy import text, select, MetaData, Table
- from sqlalchemy.orm import sessionmaker
- from werkzeug.security import check_password_hash, generate_password_hash
- from werkzeug.utils import secure_filename
- from .database_models import Models
- # 配置日志
- logging.basicConfig(level=logging.DEBUG)
- logger = logging.getLogger(__name__)
- # 创建蓝图 (Blueprint),用于分离路由
- bp = Blueprint('frontend', __name__)
- # 封装数据库连接函数
- def get_db_connection():
- return sqlite3.connect('software_intro.db')
- # 密码加密
- def hash_password(password):
- return generate_password_hash(password)
- def get_db():
- """ 获取数据库连接 """
- return sqlite3.connect(current_app.config['DATABASE'])
- # 定义添加数据库记录的 API 接口
- @bp.route('/add_item', methods=['POST'])
- def add_item():
- """
- 接收 JSON 格式的请求体,包含表名和要插入的数据。
- 尝试将数据插入到指定的表中,并进行字段查重。
- :return:
- """
- try:
- # 确保请求体是 JSON 格式
- data = request.get_json()
- if not data:
- raise ValueError("No JSON data provided")
- table_name = data.get('table')
- item_data = data.get('item')
- if not table_name or not item_data:
- return jsonify({'error': 'Missing table name or item data'}), 400
- # 定义各个表的字段查重规则
- duplicate_check_rules = {
- 'users': ['email', 'username'],
- 'products': ['product_code'],
- 'current_reduce': ['Q_over_b', 'pH', 'OM', 'CL', 'H', 'Al'],
- 'current_reflux': ['OM', 'CL', 'CEC', 'H_plus', 'N', 'Al3_plus', 'Delta_pH'],
- # 其他表和规则
- }
- # 获取该表的查重字段
- duplicate_columns = duplicate_check_rules.get(table_name)
- if not duplicate_columns:
- return jsonify({'error': 'No duplicate check rule for this table'}), 400
- # 动态构建查询条件,逐一检查是否有重复数据
- condition = ' AND '.join([f"{column} = :{column}" for column in duplicate_columns])
- duplicate_query = f"SELECT 1 FROM {table_name} WHERE {condition} LIMIT 1"
- result = db.session.execute(text(duplicate_query), item_data).fetchone()
- if result:
- return jsonify({'error': '重复数据,已有相同的数据项存在。'}), 409
- # 动态构建 SQL 语句,进行插入操作
- columns = ', '.join(item_data.keys())
- placeholders = ', '.join([f":{key}" for key in item_data.keys()])
- sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
- # 直接执行插入操作,无需显式的事务管理
- db.session.execute(text(sql), item_data)
- # 提交事务
- db.session.commit()
- # 返回成功响应
- return jsonify({'success': True, 'message': 'Item added successfully'}), 201
- except ValueError as e:
- return jsonify({'error': str(e)}), 400
- except KeyError as e:
- return jsonify({'error': f'Missing data field: {e}'}), 400
- except sqlite3.IntegrityError as e:
- return jsonify({'error': '数据库完整性错误', 'details': str(e)}), 409
- except sqlite3.Error as e:
- return jsonify({'error': '数据库错误', 'details': str(e)}), 500
- @bp.route('/delete_item', methods=['POST'])
- def delete_item():
- """
- 删除数据库记录的 API 接口
- """
- data = request.get_json()
- table_name = data.get('table')
- condition = data.get('condition')
- # 检查表名和条件是否提供
- if not table_name or not condition:
- return jsonify({
- "success": False,
- "message": "缺少表名或条件参数"
- }), 400
- # 尝试从条件字符串中解析键和值
- try:
- key, value = condition.split('=')
- key = key.strip() # 去除多余的空格
- value = value.strip().strip("'\"") # 去除多余的空格和引号
- except ValueError:
- return jsonify({
- "success": False,
- "message": "条件格式错误,应为 'key=value'"
- }), 400
- # 准备 SQL 删除语句
- sql = f"DELETE FROM {table_name} WHERE {key} = :value"
- try:
- # 使用 SQLAlchemy 执行删除
- with db.session.begin():
- result = db.session.execute(text(sql), {"value": value})
- # 检查是否有记录被删除
- if result.rowcount == 0:
- return jsonify({
- "success": False,
- "message": "未找到符合条件的记录"
- }), 404
- return jsonify({
- "success": True,
- "message": "记录删除成功"
- }), 200
- except Exception as e:
- return jsonify({
- "success": False,
- "message": f"删除失败: {e}"
- }), 500
- # 定义修改数据库记录的 API 接口
- @bp.route('/update_item', methods=['PUT'])
- def update_record():
- """
- 接收 JSON 格式的请求体,包含表名和更新的数据。
- 尝试更新指定的记录。
- """
- data = request.get_json()
- # 检查必要的数据是否提供
- if not data or 'table' not in data or 'item' not in data:
- return jsonify({
- "success": False,
- "message": "请求数据不完整"
- }), 400
- table_name = data['table']
- item = data['item']
- # 假设 item 的第一个键是 ID
- id_key = next(iter(item.keys())) # 获取第一个键
- record_id = item.get(id_key)
- if not record_id:
- return jsonify({
- "success": False,
- "message": "缺少记录 ID"
- }), 400
- # 获取更新的字段和值
- updates = {key: value for key, value in item.items() if key != id_key}
- if not updates:
- return jsonify({
- "success": False,
- "message": "没有提供需要更新的字段"
- }), 400
- # 动态构建 SQL
- set_clause = ', '.join([f"{key} = :{key}" for key in updates.keys()])
- sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_key} = :id_value"
- # 添加 ID 到参数
- updates['id_value'] = record_id
- try:
- # 使用 SQLAlchemy 执行更新
- with db.session.begin():
- result = db.session.execute(text(sql), updates)
- # 检查是否有更新的记录
- if result.rowcount == 0:
- return jsonify({
- "success": False,
- "message": "未找到要更新的记录"
- }), 404
- return jsonify({
- "success": True,
- "message": "数据更新成功"
- }), 200
- except Exception as e:
- # 捕获所有异常并返回
- return jsonify({
- "success": False,
- "message": f"更新失败: {str(e)}"
- }), 500
- # 定义查询数据库记录的 API 接口
- @bp.route('/search/record', methods=['GET'])
- def sql_search():
- """
- 接收 JSON 格式的请求体,包含表名和要查询的 ID。
- 尝试查询指定 ID 的记录并返回结果。
- :return:
- """
- try:
- data = request.get_json()
- # 表名
- sql_table = data['table']
- # 要搜索的 ID
- Id = data['id']
- # 连接到数据库
- cur = db.cursor()
- # 构造查询语句
- sql = f"SELECT * FROM {sql_table} WHERE id = ?"
- # 执行查询
- cur.execute(sql, (Id,))
- # 获取查询结果
- rows = cur.fetchall()
- column_names = [desc[0] for desc in cur.description]
- # 检查是否有结果
- if not rows:
- return jsonify({'error': '未查找到对应数据。'}), 400
- # 构造响应数据
- results = []
- for row in rows:
- result = {column_names[i]: row[i] for i in range(len(row))}
- results.append(result)
- # 关闭游标和数据库连接
- cur.close()
- db.close()
- # 返回 JSON 响应
- return jsonify(results), 200
- except sqlite3.Error as e:
- # 如果发生数据库错误,返回错误信息
- return jsonify({'error': str(e)}), 400
- except KeyError as e:
- # 如果请求数据中缺少必要的键,返回错误信息
- return jsonify({'error': f'缺少必要的数据字段: {e}'}), 400
- # 更新用户信息接口
- @bp.route('/update_user', methods=['POST'])
- def update_user():
- # 获取前端传来的数据
- data = request.get_json()
- # 打印收到的请求数据
- current_app.logger.info(f"Received data: {data}")
- user_id = data.get('userId') # 用户ID
- name = data.get('name') # 用户名
- old_password = data.get('oldPassword') # 旧密码
- new_password = data.get('newPassword') # 新密码
- logger.info(f"Update request received: user_id={user_id}, name={name}")
- # 校验传入的用户名和密码是否为空
- if not name or not old_password:
- logger.warning("用户名和旧密码不能为空")
- return jsonify({"success": False, "message": "用户名和旧密码不能为空"}), 400
- # 新密码和旧密码不能相同
- if new_password and old_password == new_password:
- logger.warning(f"新密码与旧密码相同:{name}")
- return jsonify({"success": False, "message": "新密码与旧密码不能相同"}), 400
- try:
- # 查询数据库验证用户ID
- query = "SELECT * FROM users WHERE id = :user_id"
- conn = get_db()
- user = conn.execute(query, {"user_id": user_id}).fetchone()
- if not user:
- logger.warning(f"用户ID '{user_id}' 不存在")
- return jsonify({"success": False, "message": "用户不存在"}), 400
- # 获取数据库中存储的密码(假设密码是哈希存储的)
- stored_password = user[2] # 假设密码存储在数据库的第三列
- # 校验旧密码是否正确
- if not check_password_hash(stored_password, old_password):
- logger.warning(f"旧密码错误:{name}")
- return jsonify({"success": False, "message": "旧密码错误"}), 400
- # 如果新密码非空,则更新新密码
- if new_password:
- hashed_new_password = hash_password(new_password)
- update_query = "UPDATE users SET password = :new_password WHERE id = :user_id"
- conn.execute(update_query, {"new_password": hashed_new_password, "user_id": user_id})
- conn.commit()
- logger.info(f"User ID '{user_id}' password updated successfully.")
- # 如果用户名发生更改,则更新用户名
- if name != user[1]:
- update_name_query = "UPDATE users SET name = :new_name WHERE id = :user_id"
- conn.execute(update_name_query, {"new_name": name, "user_id": user_id})
- conn.commit()
- logger.info(f"User ID '{user_id}' name updated to '{name}' successfully.")
- return jsonify({"success": True, "message": "用户信息更新成功"})
- except Exception as e:
- # 记录错误日志并返回错误信息
- logger.error(f"Error updating user: {e}", exc_info=True)
- return jsonify({"success": False, "message": "更新失败"}), 500
- # 注册用户
- @bp.route('/register', methods=['POST'])
- def register_user():
- # 获取前端传来的数据
- data = request.get_json()
- name = data.get('name') # 用户名
- password = data.get('password') # 密码
- logger.info(f"Register request received: name={name}")
- # 检查用户名和密码是否为空
- if not name or not password:
- logger.warning("用户名和密码不能为空")
- return jsonify({"success": False, "message": "用户名和密码不能为空"}), 400
- # 动态获取数据库表的列名
- columns = get_column_names('users')
- logger.info(f"Database columns for 'users' table: {columns}")
- # 检查前端传来的数据是否包含数据库表中所有的必填字段
- for column in ['name', 'password']:
- if column not in columns:
- logger.error(f"缺少必填字段:{column}")
- return jsonify({"success": False, "message": f"缺少必填字段:{column}"}), 400
- # 对密码进行哈希处理
- hashed_password = hash_password(password)
- logger.info(f"Password hashed for user: {name}")
- # 插入到数据库
- try:
- # 检查用户是否已经存在
- query = "SELECT * FROM users WHERE name = :name"
- conn = get_db()
- user = conn.execute(query, {"name": name}).fetchone()
- if user:
- logger.warning(f"用户名 '{name}' 已存在")
- return jsonify({"success": False, "message": "用户名已存在"}), 400
- # 向数据库插入数据
- query = "INSERT INTO users (name, password) VALUES (:name, :password)"
- conn.execute(query, {"name": name, "password": hashed_password})
- conn.commit()
- logger.info(f"User '{name}' registered successfully.")
- return jsonify({"success": True, "message": "注册成功"})
- except Exception as e:
- # 记录错误日志并返回错误信息
- logger.error(f"Error registering user: {e}", exc_info=True)
- return jsonify({"success": False, "message": "注册失败"}), 500
- def get_column_names(table_name):
- """
- 动态获取数据库表的列名。
- """
- try:
- conn = get_db()
- query = f"PRAGMA table_info({table_name});"
- result = conn.execute(query).fetchall()
- conn.close()
- return [row[1] for row in result] # 第二列是列名
- except Exception as e:
- logger.error(f"Error getting column names for table {table_name}: {e}", exc_info=True)
- return []
- # 导出数据
- @bp.route('/export_data', methods=['GET'])
- def export_data():
- table_name = request.args.get('table')
- file_format = request.args.get('format', 'excel').lower()
- if not table_name:
- return jsonify({'error': '缺少表名参数'}), 400
- if not table_name.isidentifier():
- return jsonify({'error': '无效的表名'}), 400
- try:
- conn = get_db()
- query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;"
- table_exists = conn.execute(query, (table_name,)).fetchone()
- if not table_exists:
- return jsonify({'error': f"表 {table_name} 不存在"}), 404
- query = f"SELECT * FROM {table_name};"
- df = pd.read_sql(query, conn)
- output = BytesIO()
- if file_format == 'csv':
- df.to_csv(output, index=False, encoding='utf-8')
- output.seek(0)
- return send_file(output, as_attachment=True, download_name=f'{table_name}_data.csv', mimetype='text/csv')
- elif file_format == 'excel':
- df.to_excel(output, index=False, engine='openpyxl')
- output.seek(0)
- return send_file(output, as_attachment=True, download_name=f'{table_name}_data.xlsx',
- mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
- else:
- return jsonify({'error': '不支持的文件格式,仅支持 CSV 和 Excel'}), 400
- except Exception as e:
- logger.error(f"Error in export_data: {e}", exc_info=True)
- return jsonify({'error': str(e)}), 500
- # 导入数据接口
- @bp.route('/import_data', methods=['POST'])
- def import_data():
- logger.debug("Import data endpoint accessed.")
- if 'file' not in request.files:
- logger.error("No file in request.")
- return jsonify({'success': False, 'message': '文件缺失'}), 400
- file = request.files['file']
- table_name = request.form.get('table')
- if not table_name:
- logger.error("Missing table name parameter.")
- return jsonify({'success': False, 'message': '缺少表名参数'}), 400
- if file.filename == '':
- logger.error("No file selected.")
- return jsonify({'success': False, 'message': '未选择文件'}), 400
- try:
- # 保存文件到临时路径
- temp_path = os.path.join(current_app.config['UPLOAD_FOLDER'], secure_filename(file.filename))
- file.save(temp_path)
- logger.debug(f"File saved to temporary path: {temp_path}")
- # 根据文件类型读取文件
- if file.filename.endswith('.xlsx'):
- df = pd.read_excel(temp_path)
- elif file.filename.endswith('.csv'):
- df = pd.read_csv(temp_path)
- else:
- logger.error("Unsupported file format.")
- return jsonify({'success': False, 'message': '仅支持 Excel 和 CSV 文件'}), 400
- # 获取数据库列名
- db_columns = get_column_names(table_name)
- if 'id' in db_columns:
- db_columns.remove('id') # 假设 id 列是自增的,不需要处理
- if not set(db_columns).issubset(set(df.columns)):
- logger.error(f"File columns do not match database columns. File columns: {df.columns.tolist()}, Expected: {db_columns}")
- return jsonify({'success': False, 'message': '文件列名与数据库表不匹配'}), 400
- # 清洗数据并删除空值行
- df_cleaned = df[db_columns].dropna()
- # 统一数据类型,避免 int 和 float 合并问题
- df_cleaned[db_columns] = df_cleaned[db_columns].apply(pd.to_numeric, errors='coerce')
- # 获取现有的数据
- conn = get_db()
- with conn:
- existing_data = pd.read_sql(f"SELECT * FROM {table_name}", conn)
- # 查找重复数据
- duplicates = df_cleaned.merge(existing_data, on=db_columns, how='inner')
- # 如果有重复数据,删除它们
- df_cleaned = df_cleaned[~df_cleaned.index.isin(duplicates.index)]
- logger.warning(f"Duplicate data detected and removed: {duplicates}")
- # 获取导入前后的数据量
- total_data = len(df_cleaned) + len(duplicates)
- new_data = len(df_cleaned)
- duplicate_data = len(duplicates)
- # 导入不重复的数据
- df_cleaned.to_sql(table_name, conn, if_exists='append', index=False)
- logger.debug(f"Imported {new_data} new records into the database.")
- # 删除临时文件
- os.remove(temp_path)
- logger.debug(f"Temporary file removed: {temp_path}")
- # 返回结果
- return jsonify({
- 'success': True,
- 'message': '数据导入成功',
- 'total_data': total_data,
- 'new_data': new_data,
- 'duplicate_data': duplicate_data
- }), 200
- except Exception as e:
- logger.error(f"Import failed: {e}", exc_info=True)
- return jsonify({'success': False, 'message': f'导入失败: {str(e)}'}), 500
- # 模板下载接口
- @bp.route('/download_template', methods=['GET'])
- def download_template():
- """
- 根据给定的表名,下载表的模板(如 CSV 或 Excel 格式)。
- """
- table_name = request.args.get('table')
- if not table_name:
- return jsonify({'error': '表名参数缺失'}), 400
- columns = get_column_names(table_name)
- if not columns:
- return jsonify({'error': f"Table '{table_name}' not found or empty."}), 404
- # 不包括 ID 列
- if 'id' in columns:
- columns.remove('id')
- df = pd.DataFrame(columns=columns)
- file_format = request.args.get('format', 'excel').lower()
- try:
- if file_format == 'csv':
- output = BytesIO()
- df.to_csv(output, index=False, encoding='utf-8')
- output.seek(0)
- return send_file(output, as_attachment=True, download_name=f'{table_name}_template.csv',
- mimetype='text/csv')
- else:
- output = BytesIO()
- df.to_excel(output, index=False, engine='openpyxl')
- output.seek(0)
- return send_file(output, as_attachment=True, download_name=f'{table_name}_template.xlsx',
- mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
- except Exception as e:
- logger.error(f"Failed to generate template: {e}", exc_info=True)
- return jsonify({'error': '生成模板文件失败'}), 500
- # 切换模型接口
- @bp.route('/switch-model', methods=['POST'])
- def switch_model():
- session = None
- try:
- data = request.get_json()
- model_id = data.get('model_id')
- model_name = data.get('model_name')
- # 创建 session
- Session = sessionmaker(bind=db.engine)
- session = Session()
- # 查找模型
- model = session.query(Models).filter_by(ModelID=model_id).first()
- if not model:
- return jsonify({'error': 'Model not found'}), 404
- # 更新模型状态(或其他切换逻辑)
- # 假设此处是更新模型的某些字段来进行切换
- model.status = 'active' # 假设有一个字段记录模型状态
- session.commit()
- # 记录切换日志
- logger.info(f'Model {model_name} (ID: {model_id}) switched successfully.')
- return jsonify({'success': True, 'message': f'Model {model_name} switched successfully!'}), 200
- except Exception as e:
- logger.error('Failed to switch model:', exc_info=True)
- return jsonify({'error': str(e)}), 400
- finally:
- if session:
- session.close()
- # 修改了一下登录·接口
- @bp.route('/login', methods=['POST'])
- def login_user():
- data = request.get_json()
- logger.debug(f"Received login data: {data}") # 增加调试日志
- name = data.get('name')
- password = data.get('password')
- logger.info(f"Login request received for user: {name}")
- if not isinstance(name, str) or not isinstance(password, str):
- logger.warning("Username and password must be strings")
- return jsonify({"success": False, "message": "用户名和密码必须为字符串"}), 400
- if not name or not password:
- logger.warning("Username and password cannot be empty")
- return jsonify({"success": False, "message": "用户名和密码不能为空"}), 400
- query = "SELECT id, name, password FROM users WHERE name = ?"
- try:
- with get_db() as conn:
- user = conn.execute(query, (name,)).fetchone()
- if not user:
- logger.warning(f"User '{name}' does not exist")
- return jsonify({"success": False, "message": "用户名不存在"}), 400
- stored_password = user[2] # 假设 'password' 是第三个字段
- user_id = user[0] # 假设 'id' 是第一个字段
- if check_password_hash(stored_password, password):
- session['name'] = name
- logger.info(f"User '{name}' logged in successfully.")
- return jsonify({
- "success": True,
- "message": "登录成功",
- "userId": user_id,
- "name": name
- })
- else:
- logger.warning(f"Incorrect password for user '{name}'")
- return jsonify({"success": False, "message": "用户名或密码错误"}), 400
- except sqlite3.DatabaseError as db_err:
- logger.error(f"Database error during login process: {db_err}", exc_info=True)
- return jsonify({"success": False, "message": "数据库错误"}), 500
- except Exception as e:
- logger.error(f"Unexpected error during login process: {e}", exc_info=True)
- return jsonify({"success": False, "message": "登录失败"}), 500
- # 添加退出登录状态接口
- @bp.route('/logout', methods=['GET', 'POST'])
- def logout_user():
- try:
- session.clear()
- return jsonify({"msg": "退出成功"}), 200
- except Exception as e:
- logger.error(f"Error during logout process: {e}", exc_info=True)
- return jsonify({"msg": "退出失败"}), 500
- # 获取软件介绍信息的路由
- @bp.route('/software-intro/<int:id>', methods=['GET'])
- def get_software_intro(id):
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT title, intro FROM software_intro WHERE id = ?', (id,))
- result = cursor.fetchone()
- conn.close()
- if result:
- title, intro = result
- return jsonify({
- 'title': title,
- 'intro': intro
- })
- return jsonify({}), 404
- except sqlite3.Error as e:
- print(f"数据库错误: {e}")
- return jsonify({"error": f"数据库错误: {str(e)}"}), 500
- # 更新软件介绍信息的路由
- @bp.route('/software-intro/<int:id>', methods=['PUT'])
- def update_software_intro(id):
- try:
- data = request.get_json()
- title = data.get('title')
- intro = data.get('intro')
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute('UPDATE software_intro SET title =?, intro =? WHERE id = ?', (title, intro, id))
- conn.commit()
- conn.close()
- return jsonify({'message': '软件介绍更新成功'})
- except sqlite3.Error as e:
- print(f"数据库错误: {e}")
- return jsonify({"error": f"数据库错误: {str(e)}"}), 500
- # 处理图片上传的路由
- @bp.route('/upload-image', methods=['POST'])
- def upload_image():
- file = request.files['image']
- if file:
- filename = str(uuid.uuid4()) + '.' + file.filename.rsplit('.', 1)[1].lower()
- file.save(os.path.join(os.getcwd(), 'uploads', filename))
- imageUrl = f'http://127.0.0.1:5000/uploads/{filename}'
- return jsonify({'imageUrl': imageUrl})
- return jsonify({'error': '未找到图片文件'}), 400
- # 配置静态资源服务
- @bp.route('/uploads/<path:filename>')
- def serve_image(filename):
- uploads_folder = os.path.join(os.getcwd(), 'uploads')
- return send_from_directory(uploads_folder, filename)
|