123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- import os
- import sqlite3
- from flask import Flask, jsonify, request, send_file, g
- from werkzeug.utils import secure_filename
- from flask_cors import CORS
- import pandas as pd
- from io import BytesIO
- import logging
- app = Flask(__name__)
- # 设置数据库文件和上传文件夹的路径
- DATABASE = 'SoilAcidification.db'
- UPLOAD_FOLDER = 'uploads'
- app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
- os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
- # 跨源资源共享(CORS)配置,允许跨域请求
- CORS(app)
- # 设置日志
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- # 创建一个数据库连接池
- def get_db():
- db = getattr(g, '_database', None)
- if db is None:
- db = g._database = sqlite3.connect(DATABASE)
- return db
- # 获取列名的函数
- def get_column_names(table_name):
- db = get_db()
- cur = db.cursor()
- cur.execute(f"PRAGMA table_info({table_name});")
- columns = [column[1] for column in cur.fetchall()]
- db.close()
- return columns
- @app.after_request
- def add_headers(response):
- response.headers['Content-Type'] = 'application/json; charset=utf-8'
- return response
- @app.route('/')
- def index():
- return 'Hello, World!'
- # 有这个函数来获取列名
- def get_column_names(table_name):
- """
- 根据表名获取对应的列名(字段)。
- """
- if table_name == 'current_reflux':
- # 返回列名映射
- return ['OM', 'CL', 'CEC', 'H_plus', 'HN', 'Al3_plus', 'Free_alumina', 'Free_iron_oxides', 'Delta_pH'], \
- ["有机质含量", "土壤粘粒重量", "阳离子交换量", "氢离子含量", "硝态氮含量", "铝离子含量", "游离氧化铝", "游离氧化铁", "酸碱差值"]
- elif table_name == 'current_reduce':
- # 返回列名映射
- return ['Q_over_b', 'pH', 'OM', 'CL', 'H', 'Al'], \
- ["比值", "酸碱度", "有机质含量", "氯离子含量", "氢离子含量", "铝离子含量"]
- else:
- return [], [] # 不支持的表名,返回空列表
- # 模板下载接口
- @app.route('/download_template', methods=['GET'])
- def download_template():
- """
- 根据表名生成模板文件(Excel 或 CSV),并返回下载。
- :return: 返回模板文件
- """
- table_name = request.args.get('table') # 从请求参数中获取表名
- if not table_name:
- return jsonify({'error': '表名参数缺失'}), 400
- # 获取表的列名
- try:
- column_names = get_column_names(table_name)
- except sqlite3.Error as e:
- return jsonify({'error': f'获取列名失败: {str(e)}'}), 400
- if not column_names:
- return jsonify({'error': f'未找到表 {table_name} 或列名为空'}), 400
- # 根据表名映射列名(可自定义标题)
- if table_name == 'current_reflux':
- column_titles = ["有机质含量", "土壤粘粒重量", "阳离子交换量", "氢离子含量", "硝态氮含量", "铝离子含量",
- "游离氧化铝", "游离氧化铁", "酸碱差值"]
- elif table_name == 'current_reduce':
- column_titles = ["比值", "酸碱度", "有机质含量", "氯离子含量", "氢离子含量1", "铝离子含量1"]
- else:
- return jsonify({'error': f'不支持的表名 {table_name}'}), 400
- # 使用 pandas 创建一个空的 DataFrame,列名为自定义的标题
- df = pd.DataFrame(columns=column_titles)
- # 根据需求选择模板格式:Excel 或 CSV
- file_format = request.args.get('format', 'excel').lower()
- if file_format == 'csv':
- # 将 DataFrame 写入内存中的 CSV 文件
- output = BytesIO()
- df.to_csv(output, index=False, encoding='utf-8')
- output.seek(0) # 返回文件开头
- return send_file(output, as_attachment=True, download_name=f'{table_name}_template.csv',
- mimetype='text/csv')
- else:
- # 默认生成 Excel 文件
- output = BytesIO()
- df.to_excel(output, index=False, engine='openpyxl')
- output.seek(0) # 返回文件开头
- return send_file(output, as_attachment=True, download_name=f'{table_name}_template.xlsx',
- mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
- # 模板导入接口
- @app.route('/import_data', methods=['POST'])
- def import_data():
- """
- 导入数据接口,根据表名将 Excel 或 CSV 数据导入到对应的数据库表中。
- """
- # 检查是否有文件
- if 'file' not in request.files:
- logging.error("请求中没有文件")
- return jsonify({'success': False, 'message': '文件缺失'}), 400
- file = request.files['file']
- table_name = request.form.get('table')
- # 检查表名参数
- if not table_name:
- logging.error("缺少表名参数")
- return jsonify({'success': False, 'message': '缺少表名参数'}), 400
- # 检查文件是否为空
- if file.filename == '':
- logging.error("文件为空")
- return jsonify({'success': False, 'message': '未选择文件'}), 400
- # 获取表的字段映射
- db_columns, display_titles = get_column_names(table_name)
- # 校验是否支持该表名
- if not db_columns:
- logging.error(f"不支持的表名: {table_name}")
- return jsonify({'success': False, 'message': f'不支持的表名: {table_name}'}), 400
- logging.info(f"表名 {table_name} 对应的字段: {db_columns}")
- # 中文列名到数据库列名的映射
- column_name_mapping = {
- "有机质含量": "OM",
- "土壤粘粒重量": "CL",
- "阳离子交换量": "CEC",
- "氢离子含量": "H_plus",
- "硝态氮含量": "HN",
- "铝离子含量": "Al3_plus",
- "游离氧化铝": "Free_alumina",
- "游离氧化铁": "Free_iron_oxides",
- "酸碱差值": "Delta_pH",
- "比值": "Q_over_b",
- "酸碱度": "pH",
- "氯离子含量": "CL",
- "氢离子含量1": "H",
- "铝离子含量1": "Al"
- }
- try:
- # 保存上传文件到临时目录
- temp_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(file.filename))
- logging.info(f"正在保存文件到临时路径: {temp_path}")
- file.save(temp_path)
- # 读取文件内容(支持 Excel 和 CSV 格式)
- if file.filename.endswith('.xlsx'):
- logging.info(f"读取 Excel 文件: {file.filename}")
- df = pd.read_excel(temp_path)
- elif file.filename.endswith('.csv'):
- logging.info(f"读取 CSV 文件: {file.filename}")
- df = pd.read_csv(temp_path)
- else:
- logging.error("仅支持 Excel 和 CSV 文件")
- return jsonify({'success': False, 'message': '仅支持 Excel 和 CSV 文件'}), 400
- logging.info(f"文件读取成功,列名: {df.columns.tolist()}")
- # 1. 移除“序号”列(如果存在)
- if '序号' in df.columns:
- df = df.drop(columns=['序号'])
- # 2. 将文件中的列名(中文)转换为数据库列名(英文)
- df.columns = [column_name_mapping.get(col, col) for col in df.columns]
- logging.info(f"转换后的列名: {df.columns.tolist()}")
- # 校验文件的列是否与数据库表字段匹配
- if not set(db_columns).issubset(set(df.columns)):
- logging.error(f"文件列名与数据库表不匹配. 文件列: {df.columns.tolist()}, 期望列: {db_columns}")
- return jsonify({'success': False, 'message': '文件列名与数据库表不匹配'}), 400
- logging.info("开始清洗数据:移除空行并按需要格式化")
- # 数据清洗:移除空行并按需要格式化
- df_cleaned = df[db_columns].dropna()
- logging.info(f"清洗后数据行数: {len(df_cleaned)}")
- # 插入数据到数据库
- conn = get_db()
- logging.info(f"开始将数据插入到表 {table_name} 中")
- # 使用事务确保操作的一致性
- with conn:
- df_cleaned.to_sql(table_name, conn, if_exists='append', index=False)
- logging.info(f"数据成功插入到表 {table_name} 中,插入行数: {len(df_cleaned)}")
- # 删除临时文件
- os.remove(temp_path)
- logging.info(f"临时文件 {temp_path} 删除成功")
- return jsonify({'success': True, 'message': '数据导入成功'}), 200
- except Exception as e:
- logging.error(f"导入失败: {e}", exc_info=True) # 捕获堆栈信息,方便调试
- return jsonify({'success': False, 'message': f'导入失败: {str(e)}'}), 500
- # 导出数据接口
- @app.route('/export_data', methods=['GET'])
- def export_data():
- """
- 根据表名从数据库导出数据,并返回 Excel 或 CSV 格式的文件
- :return: 返回文件
- """
- # 获取表名和文件格式
- table_name = request.args.get('table')
- if not table_name:
- return jsonify({'error': '缺少表名参数'}), 400
- file_format = request.args.get('format', 'excel').lower() # 默认生成 Excel 文件
- # 获取数据
- conn = get_db()
- query = f"SELECT * FROM {table_name};" # 查询表中的所有数据
- df = pd.read_sql(query, conn)
- conn.close()
- if file_format == 'csv':
- # 将数据保存为 CSV 格式
- output = BytesIO()
- df.to_csv(output, index=False)
- output.seek(0)
- return send_file(output, as_attachment=True, download_name=f'{table_name}_data.csv',
- mimetype='text/csv')
- else:
- # 默认生成 Excel 格式
- output = BytesIO()
- df.to_excel(output, index=False, engine='openpyxl')
- output.seek(0)
- return send_file(output, as_attachment=True, download_name=f'{table_name}_data.xlsx',
- mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
- # 添加记录
- @app.route('/add_item', methods=['POST'])
- def add_item():
- db = get_db()
- try:
- # 确保请求体是JSON格式
- data = request.get_json()
- if not data:
- raise ValueError("No JSON data provided")
- table_name = data.get('table')
- item_data = data.get('item')
- if not table_name or not item_data:
- return jsonify({'error': 'Missing table name or item data'}), 400
- cur = db.cursor()
- # 动态构建 SQL 语句
- columns = ', '.join(item_data.keys())
- placeholders = ', '.join(['?'] * len(item_data))
- sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
- cur.execute(sql, tuple(item_data.values()))
- db.commit()
- return jsonify({'success': True, 'message': 'Item added successfully'}), 201
- except ValueError as e:
- return jsonify({'error': str(e)}), 400
- except KeyError as e:
- return jsonify({'error': f'Missing data field: {e}'}), 400
- except sqlite3.IntegrityError as e:
- return jsonify({'error': 'Database integrity error', 'details': str(e)}), 409
- except sqlite3.Error as e:
- return jsonify({'error': 'Database error', 'details': str(e)}), 500
- finally:
- db.close()
- # 删除记录
- @app.route('/delete_item', methods=['POST'])
- def delete_item():
- data = request.get_json()
- table_name = data.get('table')
- condition = data.get('condition')
- # 日志:检查是否收到正确的参数
- logging.debug(f"Received data: table={table_name}, condition={condition}")
- if not table_name or not condition:
- logging.error("缺少表名或条件参数")
- return jsonify({"success": False, "message": "缺少表名或条件参数"}), 400
- try:
- key, value = condition.split('=')
- except ValueError:
- logging.error(f"条件格式错误,received condition: {condition}")
- return jsonify({"success": False, "message": "条件格式错误,应为 'key=value'"}), 400
- db = get_db()
- cur = db.cursor()
- # 日志:确认开始删除操作
- logging.debug(f"Attempting to delete from {table_name} where {key} = {value}")
- try:
- cur.execute(f"DELETE FROM {table_name} WHERE {key} = ?", (value,))
- db.commit()
- # 日志:删除成功
- logging.info(f"Record deleted from {table_name} where {key} = {value}")
- return jsonify({"success": True, "message": "记录删除成功"}), 200
- except sqlite3.Error as e:
- db.rollback()
- # 日志:捕获删除失败的错误
- logging.error(f"Failed to delete from {table_name} where {key} = {value}. Error: {e}")
- return jsonify({"success": False, "message": f"删除失败: {e}"}), 400
- # 更新记录
- @app.route('/update_item', methods=['PUT'])
- def update_record():
- data = request.get_json()
- if not data or 'table' not in data or 'item' not in data:
- return jsonify({"success": False, "message": "请求数据不完整"}), 400
- table_name = data['table']
- item = data['item']
- if not item or next(iter(item.keys())) is None:
- return jsonify({"success": False, "message": "记录数据为空"}), 400
- id_key = next(iter(item.keys()))
- record_id = item[id_key]
- updates = {key: value for key, value in item.items() if key != id_key}
- db = get_db()
- cur = db.cursor()
- try:
- record_id = int(record_id)
- except ValueError:
- return jsonify({"success": False, "message": "ID 必须是整数"}), 400
- parameters = list(updates.values()) + [record_id]
- set_clause = ','.join([f"{k} = ?" for k in updates.keys()])
- sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_key} = ?"
- try:
- cur.execute(sql, parameters)
- db.commit()
- if cur.rowcount == 0:
- return jsonify({"success": False, "message": "未找到要更新的记录"}), 404
- return jsonify({"success": True, "message": "数据更新成功"}), 200
- except sqlite3.Error as e:
- db.rollback()
- return jsonify({"success": False, "message": f"更新失败: {e}"}), 400
- # 查询记录
- @app.route('/search/record', methods=['GET'])
- def sql_search():
- try:
- data = request.get_json()
- sql_table = data['table']
- record_id = data['id']
- db = get_db()
- cur = db.cursor()
- sql = f"SELECT * FROM {sql_table} WHERE id = ?"
- cur.execute(sql, (record_id,))
- rows = cur.fetchall()
- column_names = [desc[0] for desc in cur.description]
- if not rows:
- return jsonify({'error': '未查找到对应数据。'}), 400
- results = []
- for row in rows:
- result = {column_names[i]: row[i] for i in range(len(row))}
- results.append(result)
- cur.close()
- db.close()
- return jsonify(results), 200
- except sqlite3.Error as e:
- return jsonify({'error': str(e)}), 400
- except KeyError as e:
- return jsonify({'error': f'缺少必要的数据字段: {e}'}), 400
- # 提供表格数据
- @app.route('/tables', methods=['POST'])
- def get_table():
- data = request.get_json()
- table_name = data.get('table')
- if not table_name:
- return jsonify({'error': '需要表名'}), 400
- db = get_db()
- try:
- cur = db.cursor()
- cur.execute(f"SELECT * FROM {table_name}")
- rows = cur.fetchall()
- if not rows:
- return jsonify({'error': f'表 {table_name} 为空或不存在'}), 400
- headers = [description[0] for description in cur.description]
- return jsonify(rows=rows, headers=headers), 200
- except sqlite3.Error as e:
- return jsonify({'error': str(e)}), 400
- finally:
- db.close()
- if __name__ == '__main__':
- app.run(debug=True)
|