import os import sqlite3 from flask import Flask, jsonify, request from flask import g from flask_cors import CORS # 定义应用的状态码文档 """ 状态码 200 成功 状态码 400 失败 """ app = Flask(__name__) # 设置数据库文件和上传文件夹的路径 DATABASE = 'SoilAcidification.db' UPLOAD_FOLDER = 'uploads' app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER # 确保上传文件夹存在,如果不存在则创建 os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # 跨源资源共享(CORS)配置,允许跨域请求 CORS(app) # 创建一个数据库连接池 def get_db(): db = getattr(g, '_database', None) if db is None: db = g._database = sqlite3.connect(DATABASE) return db # 请求后钩子,为响应添加内容类型头 @app.after_request def add_headers(response): response.headers['Content-Type'] = 'application/json; charset=utf-8' return response @app.route('/') def index(): return 'Hello, World!' # 定义添加数据库记录的 API 接口 @app.route('/add_item', methods=['POST']) def add_item(): """ 接收 JSON 格式的请求体,包含表名和要插入的数据。 尝试将数据插入到指定的表中。 :return: """ db = get_db() try: # 确保请求体是JSON格式 data = request.get_json() if not data: raise ValueError("No JSON data provided") table_name = data.get('table') item_data = data.get('item') if not table_name or not item_data: return jsonify({'error': 'Missing table name or item data'}), 400 cur = db.cursor() # 动态构建 SQL 语句 columns = ', '.join(item_data.keys()) placeholders = ', '.join(['?'] * len(item_data)) sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" cur.execute(sql, tuple(item_data.values())) db.commit() # 返回更详细的成功响应 return jsonify({'success': True, 'message': 'Item added successfully'}), 201 except ValueError as e: return jsonify({'error': str(e)}), 400 except KeyError as e: return jsonify({'error': f'Missing data field: {e}'}), 400 except sqlite3.IntegrityError as e: # 处理例如唯一性约束违反等数据库完整性错误 return jsonify({'error': 'Database integrity error', 'details': str(e)}), 409 except sqlite3.Error as e: # 处理其他数据库错误 return jsonify({'error': 'Database error', 'details': str(e)}), 500 finally: db.close() # 定义删除数据库记录的 API 接口 @app.route('/delete_item', methods=['POST']) def delete_item(): data = request.get_json() table_name = data.get('table') condition = data.get('condition') # 检查表名和条件是否提供 if not table_name or not condition: return jsonify({ "success": False, "message": "缺少表名或条件参数" }), 400 # 尝试从条件字符串中分离键和值 try: key, value = condition.split('=') except ValueError: return jsonify({ "success": False, "message": "条件格式错误,应为 'key=value'" }), 400 db = get_db() cur = db.cursor() try: # 执行删除操作 cur.execute(f"DELETE FROM {table_name} WHERE {key} = ?", (value,)) db.commit() # 如果没有错误发生,返回成功响应 return jsonify({ "success": True, "message": "记录删除成功" }), 200 except sqlite3.Error as e: # 发生错误,回滚事务 db.rollback() # 返回失败响应,并包含错误信息 return jsonify({ "success": False, "message": f"删除失败: {e}" }), 400 # 定义修改数据库记录的 API 接口 @app.route('/update_item', methods=['PUT']) def update_record(): data = request.get_json() # 检查必要的数据是否提供 if not data or 'table' not in data or 'item' not in data: return jsonify({ "success": False, "message": "请求数据不完整" }), 400 table_name = data['table'] item = data['item'] # 假设 item 的第一个元素是 ID if not item or next(iter(item.keys())) is None: return jsonify({ "success": False, "message": "记录数据为空" }), 400 # 获取 ID 和其他字段值 id_key = next(iter(item.keys())) record_id = item[id_key] updates = {key: value for key, value in item.items() if key != id_key} # 排除 ID db = get_db() cur = db.cursor() try: record_id = int(record_id) # 确保 ID 是整数 except ValueError: return jsonify({ "success": False, "message": "ID 必须是整数" }), 400 # 准备参数列表,包括更新的值和 ID parameters = list(updates.values()) + [record_id] # 执行更新操作 set_clause = ','.join([f"{k} = ?" for k in updates.keys()]) sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_key} = ?" try: cur.execute(sql, parameters) db.commit() if cur.rowcount == 0: return jsonify({ "success": False, "message": "未找到要更新的记录" }), 404 return jsonify({ "success": True, "message": "数据更新成功" }), 200 except sqlite3.Error as e: db.rollback() return jsonify({ "success": False, "message": f"更新失败: {e}" }), 400 # 定义查询数据库记录的 API 接口 @app.route('/search/record', methods=['GET']) def sql_search(): """ 接收 JSON 格式的请求体,包含表名和要查询的 ID。 尝试查询指定 ID 的记录并返回结果。 :return: """ try: data = request.get_json() # 表名 sql_table = data['table'] # 要搜索的 ID Id = data['id'] # 连接到数据库 db = get_db() cur = db.cursor() # 构造查询语句 sql = f"SELECT * FROM {sql_table} WHERE id = ?" # 执行查询 cur.execute(sql, (Id,)) # 获取查询结果 rows = cur.fetchall() column_names = [desc[0] for desc in cur.description] # 检查是否有结果 if not rows: return jsonify({'error': '未查找到对应数据。'}), 400 # 构造响应数据 results = [] for row in rows: result = {column_names[i]: row[i] for i in range(len(row))} results.append(result) # 关闭游标和数据库连接 cur.close() db.close() # 返回 JSON 响应 return jsonify(results), 200 except sqlite3.Error as e: # 如果发生数据库错误,返回错误信息 return jsonify({'error': str(e)}), 400 except KeyError as e: # 如果请求数据中缺少必要的键,返回错误信息 return jsonify({'error': f'缺少必要的数据字段: {e}'}), 400 # 定义提供数据库列表,用于展示表格的 API 接口 @app.route('/tables', methods=['POST']) def get_table(): data = request.get_json() table_name = data.get('table') if not table_name: return jsonify({'error': '需要表名'}), 400 db = get_db() try: cur = db.cursor() cur.execute(f"SELECT * FROM {table_name}") rows = cur.fetchall() if not rows: return jsonify({'error': '表为空或不存在'}), 400 headers = [description[0] for description in cur.description] return jsonify(rows=rows, headers=headers), 200 except sqlite3.Error as e: return jsonify({'error': str(e)}), 400 finally: db.close() if __name__ == '__main__': app.run(debug=True)