import os import sqlite3 from flask import Flask, jsonify, request, send_file, g from werkzeug.utils import secure_filename from flask_cors import CORS import pandas as pd from io import BytesIO import logging app = Flask(__name__) # 设置数据库文件和上传文件夹的路径 DATABASE = 'SoilAcidification.db' UPLOAD_FOLDER = 'uploads' app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # 跨源资源共享(CORS)配置,允许跨域请求 CORS(app) # 设置日志 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 创建一个数据库连接池 def get_db(): db = getattr(g, '_database', None) if db is None: db = g._database = sqlite3.connect(DATABASE) return db # 获取列名的函数 def get_column_names(table_name): db = get_db() cur = db.cursor() cur.execute(f"PRAGMA table_info({table_name});") columns = [column[1] for column in cur.fetchall()] db.close() return columns @app.after_request def add_headers(response): response.headers['Content-Type'] = 'application/json; charset=utf-8' return response @app.route('/') def index(): return 'Hello, World!' # 有这个函数来获取列名 def get_column_names(table_name): """ 根据表名获取对应的列名(字段)。 """ if table_name == 'current_reflux': # 返回列名映射 return ['OM', 'CL', 'CEC', 'H_plus', 'HN', 'Al3_plus', 'Free_alumina', 'Free_iron_oxides', 'Delta_pH'], \ ["有机质含量", "土壤粘粒重量", "阳离子交换量", "氢离子含量", "硝态氮含量", "铝离子含量", "游离氧化铝", "游离氧化铁", "酸碱差值"] elif table_name == 'current_reduce': # 返回列名映射 return ['Q_over_b', 'pH', 'OM', 'CL', 'H', 'Al'], \ ["比值", "酸碱度", "有机质含量", "氯离子含量", "氢离子含量", "铝离子含量"] else: return [], [] # 不支持的表名,返回空列表 @app.route('/download_template', methods=['GET']) def download_template(): """ 根据表名生成模板文件(Excel 或 CSV),并返回下载。 :return: 返回模板文件 """ table_name = request.args.get('table') # 从请求参数中获取表名 if not table_name: return jsonify({'error': '表名参数缺失'}), 400 # 获取表的列名 try: column_names = get_column_names(table_name) except sqlite3.Error as e: return jsonify({'error': f'获取列名失败: {str(e)}'}), 400 if not column_names: return jsonify({'error': f'未找到表 {table_name} 或列名为空'}), 400 # 根据表名映射列名(可自定义标题) if table_name == 'current_reflux': column_titles = ["有机质含量", "土壤粘粒重量", "阳离子交换量", "氢离子含量", "硝态氮含量", "铝离子含量", "游离氧化铝", "游离氧化铁", "酸碱差值"] elif table_name == 'current_reduce': column_titles = ["比值", "酸碱度", "有机质含量", "氯离子含量", "氢离子含量1", "铝离子含量1"] else: return jsonify({'error': f'不支持的表名 {table_name}'}), 400 # 使用 pandas 创建一个空的 DataFrame,列名为自定义的标题 df = pd.DataFrame(columns=column_titles) # 根据需求选择模板格式:Excel 或 CSV file_format = request.args.get('format', 'excel').lower() if file_format == 'csv': # 将 DataFrame 写入内存中的 CSV 文件 output = BytesIO() df.to_csv(output, index=False, encoding='utf-8') output.seek(0) # 返回文件开头 return send_file(output, as_attachment=True, download_name=f'{table_name}_template.csv', mimetype='text/csv') else: # 默认生成 Excel 文件 output = BytesIO() df.to_excel(output, index=False, engine='openpyxl') output.seek(0) # 返回文件开头 return send_file(output, as_attachment=True, download_name=f'{table_name}_template.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') # 模板下载接口 @app.route('/import_data', methods=['POST']) def import_data(): """ 导入数据接口,根据表名将 Excel 或 CSV 数据导入到对应的数据库表中。 """ # 检查是否有文件 if 'file' not in request.files: logging.error("请求中没有文件") return jsonify({'success': False, 'message': '文件缺失'}), 400 file = request.files['file'] table_name = request.form.get('table') # 检查表名参数 if not table_name: logging.error("缺少表名参数") return jsonify({'success': False, 'message': '缺少表名参数'}), 400 # 检查文件是否为空 if file.filename == '': logging.error("文件为空") return jsonify({'success': False, 'message': '未选择文件'}), 400 # 获取表的字段映射 db_columns, display_titles = get_column_names(table_name) # 校验是否支持该表名 if not db_columns: logging.error(f"不支持的表名: {table_name}") return jsonify({'success': False, 'message': f'不支持的表名: {table_name}'}), 400 logging.info(f"表名 {table_name} 对应的字段: {db_columns}") # 中文列名到数据库列名的映射 column_name_mapping = { "有机质含量": "OM", "土壤粘粒重量": "CL", "阳离子交换量": "CEC", "氢离子含量": "H_plus", "硝态氮含量": "HN", "铝离子含量": "Al3_plus", "游离氧化铝": "Free_alumina", "游离氧化铁": "Free_iron_oxides", "酸碱差值": "Delta_pH", "比值": "Q_over_b", "酸碱度": "pH", "氯离子含量": "CL", "氢离子含量1": "H", "铝离子含量1": "Al" } try: # 保存上传文件到临时目录 temp_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(file.filename)) logging.info(f"正在保存文件到临时路径: {temp_path}") file.save(temp_path) # 读取文件内容(支持 Excel 和 CSV 格式) if file.filename.endswith('.xlsx'): logging.info(f"读取 Excel 文件: {file.filename}") df = pd.read_excel(temp_path) elif file.filename.endswith('.csv'): logging.info(f"读取 CSV 文件: {file.filename}") df = pd.read_csv(temp_path) else: logging.error("仅支持 Excel 和 CSV 文件") return jsonify({'success': False, 'message': '仅支持 Excel 和 CSV 文件'}), 400 logging.info(f"文件读取成功,列名: {df.columns.tolist()}") # 1. 移除“序号”列(如果存在) if '序号' in df.columns: df = df.drop(columns=['序号']) # 2. 将文件中的列名(中文)转换为数据库列名(英文) df.columns = [column_name_mapping.get(col, col) for col in df.columns] logging.info(f"转换后的列名: {df.columns.tolist()}") # 校验文件的列是否与数据库表字段匹配 if not set(db_columns).issubset(set(df.columns)): logging.error(f"文件列名与数据库表不匹配. 文件列: {df.columns.tolist()}, 期望列: {db_columns}") return jsonify({'success': False, 'message': '文件列名与数据库表不匹配'}), 400 logging.info("开始清洗数据:移除空行并按需要格式化") # 数据清洗:移除空行并按需要格式化 df_cleaned = df[db_columns].dropna() logging.info(f"清洗后数据行数: {len(df_cleaned)}") # 插入数据到数据库 conn = get_db() logging.info(f"开始将数据插入到表 {table_name} 中") # 使用事务确保操作的一致性 with conn: df_cleaned.to_sql(table_name, conn, if_exists='append', index=False) logging.info(f"数据成功插入到表 {table_name} 中,插入行数: {len(df_cleaned)}") # 删除临时文件 os.remove(temp_path) logging.info(f"临时文件 {temp_path} 删除成功") return jsonify({'success': True, 'message': '数据导入成功'}), 200 except Exception as e: logging.error(f"导入失败: {e}", exc_info=True) # 捕获堆栈信息,方便调试 return jsonify({'success': False, 'message': f'导入失败: {str(e)}'}), 500 # 导出数据接口 @app.route('/export_data', methods=['GET']) def export_data(): """ 根据表名从数据库导出数据,并返回 Excel 或 CSV 格式的文件 :return: 返回文件 """ # 获取表名和文件格式 table_name = request.args.get('table') if not table_name: return jsonify({'error': '缺少表名参数'}), 400 file_format = request.args.get('format', 'excel').lower() # 默认生成 Excel 文件 # 获取数据 conn = get_db() query = f"SELECT * FROM {table_name};" # 查询表中的所有数据 df = pd.read_sql(query, conn) conn.close() if file_format == 'csv': # 将数据保存为 CSV 格式 output = BytesIO() df.to_csv(output, index=False) output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_data.csv', mimetype='text/csv') else: # 默认生成 Excel 格式 output = BytesIO() df.to_excel(output, index=False, engine='openpyxl') output.seek(0) return send_file(output, as_attachment=True, download_name=f'{table_name}_data.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') # 添加记录 @app.route('/add_item', methods=['POST']) def add_item(): db = get_db() try: # 确保请求体是JSON格式 data = request.get_json() if not data: raise ValueError("No JSON data provided") table_name = data.get('table') item_data = data.get('item') if not table_name or not item_data: return jsonify({'error': 'Missing table name or item data'}), 400 cur = db.cursor() # 动态构建 SQL 语句 columns = ', '.join(item_data.keys()) placeholders = ', '.join(['?'] * len(item_data)) sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" cur.execute(sql, tuple(item_data.values())) db.commit() return jsonify({'success': True, 'message': 'Item added successfully'}), 201 except ValueError as e: return jsonify({'error': str(e)}), 400 except KeyError as e: return jsonify({'error': f'Missing data field: {e}'}), 400 except sqlite3.IntegrityError as e: return jsonify({'error': 'Database integrity error', 'details': str(e)}), 409 except sqlite3.Error as e: return jsonify({'error': 'Database error', 'details': str(e)}), 500 finally: db.close() # 删除记录 @app.route('/delete_item', methods=['POST']) def delete_item(): data = request.get_json() table_name = data.get('table') condition = data.get('condition') # 日志:检查是否收到正确的参数 logging.debug(f"Received data: table={table_name}, condition={condition}") if not table_name or not condition: logging.error("缺少表名或条件参数") return jsonify({"success": False, "message": "缺少表名或条件参数"}), 400 try: key, value = condition.split('=') except ValueError: logging.error(f"条件格式错误,received condition: {condition}") return jsonify({"success": False, "message": "条件格式错误,应为 'key=value'"}), 400 db = get_db() cur = db.cursor() # 日志:确认开始删除操作 logging.debug(f"Attempting to delete from {table_name} where {key} = {value}") try: cur.execute(f"DELETE FROM {table_name} WHERE {key} = ?", (value,)) db.commit() # 日志:删除成功 logging.info(f"Record deleted from {table_name} where {key} = {value}") return jsonify({"success": True, "message": "记录删除成功"}), 200 except sqlite3.Error as e: db.rollback() # 日志:捕获删除失败的错误 logging.error(f"Failed to delete from {table_name} where {key} = {value}. Error: {e}") return jsonify({"success": False, "message": f"删除失败: {e}"}), 400 # 更新记录 @app.route('/update_item', methods=['PUT']) def update_record(): data = request.get_json() if not data or 'table' not in data or 'item' not in data: return jsonify({"success": False, "message": "请求数据不完整"}), 400 table_name = data['table'] item = data['item'] if not item or next(iter(item.keys())) is None: return jsonify({"success": False, "message": "记录数据为空"}), 400 id_key = next(iter(item.keys())) record_id = item[id_key] updates = {key: value for key, value in item.items() if key != id_key} db = get_db() cur = db.cursor() try: record_id = int(record_id) except ValueError: return jsonify({"success": False, "message": "ID 必须是整数"}), 400 parameters = list(updates.values()) + [record_id] set_clause = ','.join([f"{k} = ?" for k in updates.keys()]) sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_key} = ?" try: cur.execute(sql, parameters) db.commit() if cur.rowcount == 0: return jsonify({"success": False, "message": "未找到要更新的记录"}), 404 return jsonify({"success": True, "message": "数据更新成功"}), 200 except sqlite3.Error as e: db.rollback() return jsonify({"success": False, "message": f"更新失败: {e}"}), 400 # 查询记录 @app.route('/search/record', methods=['GET']) def sql_search(): try: data = request.get_json() sql_table = data['table'] record_id = data['id'] db = get_db() cur = db.cursor() sql = f"SELECT * FROM {sql_table} WHERE id = ?" cur.execute(sql, (record_id,)) rows = cur.fetchall() column_names = [desc[0] for desc in cur.description] if not rows: return jsonify({'error': '未查找到对应数据。'}), 400 results = [] for row in rows: result = {column_names[i]: row[i] for i in range(len(row))} results.append(result) cur.close() db.close() return jsonify(results), 200 except sqlite3.Error as e: return jsonify({'error': str(e)}), 400 except KeyError as e: return jsonify({'error': f'缺少必要的数据字段: {e}'}), 400 # 提供表格数据 @app.route('/tables', methods=['POST']) def get_table(): data = request.get_json() table_name = data.get('table') if not table_name: return jsonify({'error': '需要表名'}), 400 db = get_db() try: cur = db.cursor() cur.execute(f"SELECT * FROM {table_name}") rows = cur.fetchall() if not rows: return jsonify({'error': f'表 {table_name} 为空或不存在'}), 400 headers = [description[0] for description in cur.description] return jsonify(rows=rows, headers=headers), 200 except sqlite3.Error as e: return jsonify({'error': str(e)}), 400 finally: db.close() if __name__ == '__main__': app.run(debug=True)