123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- import os
- import sqlite3
- from flask import Flask, jsonify, request
- from flask import g
- from flask_cors import CORS
- # 定义应用的状态码文档
- """
- 状态码 200 成功
- 状态码 400 失败
- """
- app = Flask(__name__)
- # 设置数据库文件和上传文件夹的路径
- DATABASE = 'SoilAcidification.db'
- UPLOAD_FOLDER = 'uploads'
- app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
- # 确保上传文件夹存在,如果不存在则创建
- os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
- # 跨源资源共享(CORS)配置,允许跨域请求
- CORS(app)
- # 创建一个数据库连接池
- def get_db():
- db = getattr(g, '_database', None)
- if db is None:
- db = g._database = sqlite3.connect(DATABASE)
- return db
- # 请求后钩子,为响应添加内容类型头
- @app.after_request
- def add_headers(response):
- response.headers['Content-Type'] = 'application/json; charset=utf-8'
- return response
- @app.route('/')
- def index():
- return 'Hello, World!'
- # 定义添加数据库记录的 API 接口
- @app.route('/add_item', methods=['POST'])
- def add_item():
- """
- 接收 JSON 格式的请求体,包含表名和要插入的数据。
- 尝试将数据插入到指定的表中。
- :return:
- """
- db = get_db()
- try:
- # 确保请求体是JSON格式
- data = request.get_json()
- if not data:
- raise ValueError("No JSON data provided")
- table_name = data.get('table')
- item_data = data.get('item')
- if not table_name or not item_data:
- return jsonify({'error': 'Missing table name or item data'}), 400
- cur = db.cursor()
- # 动态构建 SQL 语句
- columns = ', '.join(item_data.keys())
- placeholders = ', '.join(['?'] * len(item_data))
- sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
- cur.execute(sql, tuple(item_data.values()))
- db.commit()
- # 返回更详细的成功响应
- return jsonify({'success': True, 'message': 'Item added successfully'}), 201
- except ValueError as e:
- return jsonify({'error': str(e)}), 400
- except KeyError as e:
- return jsonify({'error': f'Missing data field: {e}'}), 400
- except sqlite3.IntegrityError as e:
- # 处理例如唯一性约束违反等数据库完整性错误
- return jsonify({'error': 'Database integrity error', 'details': str(e)}), 409
- except sqlite3.Error as e:
- # 处理其他数据库错误
- return jsonify({'error': 'Database error', 'details': str(e)}), 500
- finally:
- db.close()
- # 定义删除数据库记录的 API 接口
- @app.route('/delete_item', methods=['DELETE'])
- def delete_item():
- data = request.get_json()
- table_name = data.get('table')
- condition = data.get('condition')
- # 检查表名和条件是否提供
- if not table_name or not condition:
- return jsonify({
- "success": False,
- "message": "缺少表名或条件参数"
- }), 400
- # 尝试从条件字符串中分离键和值
- try:
- key, value = condition.split('=')
- except ValueError:
- return jsonify({
- "success": False,
- "message": "条件格式错误,应为 'key=value'"
- }), 400
- db = get_db()
- cur = db.cursor()
- try:
- # 执行删除操作
- cur.execute(f"DELETE FROM {table_name} WHERE {key} = ?", (value,))
- db.commit()
- # 如果没有错误发生,返回成功响应
- return jsonify({
- "success": True,
- "message": "记录删除成功"
- }), 200
- except sqlite3.Error as e:
- # 发生错误,回滚事务
- db.rollback()
- # 返回失败响应,并包含错误信息
- return jsonify({
- "success": False,
- "message": f"删除失败: {e}"
- }), 400
- # 定义修改数据库记录的 API 接口
- @app.route('/update_item', methods=['PUT'])
- def update_record():
- data = request.get_json()
- # 检查必要的数据是否提供
- if not data or 'table' not in data or 'item' not in data:
- return jsonify({
- "success": False,
- "message": "请求数据不完整"
- }), 400
- table_name = data['table']
- item = data['item']
- # 假设 item 的第一个元素是 ID
- if not item or next(iter(item.keys())) is None:
- return jsonify({
- "success": False,
- "message": "记录数据为空"
- }), 400
- # 获取 ID 和其他字段值
- id_key = next(iter(item.keys()))
- record_id = item[id_key]
- updates = {key: value for key, value in item.items() if key != id_key} # 排除 ID
- db = get_db()
- cur = db.cursor()
- try:
- record_id = int(record_id) # 确保 ID 是整数
- except ValueError:
- return jsonify({
- "success": False,
- "message": "ID 必须是整数"
- }), 400
- # 准备参数列表,包括更新的值和 ID
- parameters = list(updates.values()) + [record_id]
- # 执行更新操作
- set_clause = ','.join([f"{k} = ?" for k in updates.keys()])
- sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_key} = ?"
- try:
- cur.execute(sql, parameters)
- db.commit()
- if cur.rowcount == 0:
- return jsonify({
- "success": False,
- "message": "未找到要更新的记录"
- }), 404
- return jsonify({
- "success": True,
- "message": "数据更新成功"
- }), 200
- except sqlite3.Error as e:
- db.rollback()
- return jsonify({
- "success": False,
- "message": f"更新失败: {e}"
- }), 400
- # 定义查询数据库记录的 API 接口
- @app.route('/search/record', methods=['GET'])
- def sql_search():
- """
- 接收 JSON 格式的请求体,包含表名和要查询的 ID。
- 尝试查询指定 ID 的记录并返回结果。
- :return:
- """
- try:
- data = request.get_json()
- # 表名
- sql_table = data['table']
- # 要搜索的 ID
- Id = data['id']
- # 连接到数据库
- db = get_db()
- cur = db.cursor()
- # 构造查询语句
- sql = f"SELECT * FROM {sql_table} WHERE id = ?"
- # 执行查询
- cur.execute(sql, (Id,))
- # 获取查询结果
- rows = cur.fetchall()
- column_names = [desc[0] for desc in cur.description]
- # 检查是否有结果
- if not rows:
- return jsonify({'error': '未查找到对应数据。'}), 400
- # 构造响应数据
- results = []
- for row in rows:
- result = {column_names[i]: row[i] for i in range(len(row))}
- results.append(result)
- # 关闭游标和数据库连接
- cur.close()
- db.close()
- # 返回 JSON 响应
- return jsonify(results), 200
- except sqlite3.Error as e:
- # 如果发生数据库错误,返回错误信息
- return jsonify({'error': str(e)}), 400
- except KeyError as e:
- # 如果请求数据中缺少必要的键,返回错误信息
- return jsonify({'error': f'缺少必要的数据字段: {e}'}), 400
- # 定义提供数据库列表,用于展示表格的 API 接口
- @app.route('/tables', methods=['POST'])
- def get_table():
- data = request.get_json()
- table_name = data.get('table')
- if not table_name:
- return jsonify({'error': '需要表名'}), 400
- db = get_db()
- try:
- cur = db.cursor()
- cur.execute(f"SELECT * FROM {table_name}")
- rows = cur.fetchall()
- if not rows:
- return jsonify({'error': '表为空或不存在'}), 400
- headers = [description[0] for description in cur.description]
- return jsonify(rows=rows, headers=headers), 200
- except sqlite3.Error as e:
- return jsonify({'error': str(e)}), 400
- finally:
- db.close()
- if __name__ == '__main__':
- app.run(debug=True)
|