import datetime import os import pickle import pandas as pd from flask_sqlalchemy.session import Session from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error from sklearn.model_selection import train_test_split, cross_val_score from sqlalchemy import text from xgboost import XGBRegressor import logging import numpy as np from .database_models import Models, Datasets from .config import Config from .data_cleaner import clean_dataset # 加载模型 def load_model(session, model_id): model = session.query(Models).filter(Models.ModelID == model_id).first() if not model: raise ValueError(f"Model with ID {model_id} not found.") with open(model.ModelFilePath, 'rb') as f: return pickle.load(f) # 模型预测 def predict(session, input_data: pd.DataFrame, model_id): # 初始化模型 ML_model = load_model(session, model_id) # 根据指定的模型名加载模型 # model = load_model(model_id) # 根据指定的模型名加载模型 predictions = ML_model.predict(input_data) return predictions.tolist() def check_dataset_overlap_with_test(dataset_df, data_type): """ 检查数据集是否与测试集有重叠 Args: dataset_df (DataFrame): 要检查的数据集 data_type (str): 数据集类型 ('reflux' 或 'reduce') Returns: tuple: (重叠的行数, 重叠的行索引) """ # 加载测试集 if data_type == 'reflux': X_test = pd.read_csv('uploads/data/X_test_reflux.csv') Y_test = pd.read_csv('uploads/data/Y_test_reflux.csv') elif data_type == 'reduce': X_test = pd.read_csv('uploads/data/X_test_reduce.csv') Y_test = pd.read_csv('uploads/data/Y_test_reduce.csv') else: raise ValueError(f"不支持的数据类型: {data_type}") # 合并X_test和Y_test if data_type == 'reflux': test_df = pd.concat([X_test, Y_test], axis=1) else: test_df = pd.concat([X_test, Y_test], axis=1) # 确定用于比较的列 compare_columns = [col for col in dataset_df.columns if col in test_df.columns] if not compare_columns: return 0, [] # 查找重叠的行 merged = dataset_df[compare_columns].merge(test_df[compare_columns], how='inner', indicator=True) overlapping_rows = merged[merged['_merge'] == 'both'] # 获取重叠行在原始数据集中的索引 if not overlapping_rows.empty: # 使用合并后的数据找回原始索引 overlap_indices = [] for _, row in overlapping_rows.iterrows(): # 创建一个布尔掩码,用于在原始数据集中查找匹配的行 mask = True for col in compare_columns: mask = mask & (dataset_df[col] == row[col]) # 获取匹配行的索引 matching_indices = dataset_df[mask].index.tolist() overlap_indices.extend(matching_indices) return len(set(overlap_indices)), list(set(overlap_indices)) return 0, [] # 计算模型评分 def calculate_model_score(model_info): """ 计算模型评分 Args: model_info: 模型信息对象 Returns: dict: 包含多种评分指标的字典 """ # 加载模型 with open(model_info.ModelFilePath, 'rb') as f: ML_model = pickle.load(f) # print("Model requires the following features:", model.feature_names_in_) # 数据准备 if model_info.Data_type == 'reflux': # 反酸数据集 # 加载保存的 X_test 和 Y_test X_test = pd.read_csv('uploads/data/X_test_reflux.csv') Y_test = pd.read_csv('uploads/data/Y_test_reflux.csv') # 预测测试集 y_pred = ML_model.predict(X_test) # 计算各种评分指标 r2 = r2_score(Y_test, y_pred) mae = mean_absolute_error(Y_test, y_pred) rmse = np.sqrt(mean_squared_error(Y_test, y_pred)) elif model_info.Data_type == 'reduce': # 降酸数据集 # 加载保存的 X_test 和 Y_test X_test = pd.read_csv('uploads/data/X_test_reduce.csv') Y_test = pd.read_csv('uploads/data/Y_test_reduce.csv') # 预测测试集 y_pred = ML_model.predict(X_test) # 计算各种评分指标 r2 = r2_score(Y_test, y_pred) mae = mean_absolute_error(Y_test, y_pred) rmse = np.sqrt(mean_squared_error(Y_test, y_pred)) else: # 不支持的数据类型 return {'r2': 0, 'mae': 0, 'rmse': 0} # 返回所有评分指标(不包括交叉验证得分) return { 'r2': float(r2), 'mae': float(mae), 'rmse': float(rmse) } def train_and_save_model(session, model_type, model_name, model_description, data_type, dataset_id=None): """ 训练并保存模型 Args: session: 数据库会话 model_type: 模型类型 model_name: 模型名称 model_description: 模型描述 data_type: 数据类型 ('reflux' 或 'reduce') dataset_id: 数据集ID Returns: tuple: (模型名称, 模型ID, 数据集ID) """ try: if not dataset_id: # 创建新的数据集并复制数据,此过程将不立即提交 dataset_id = save_current_dataset(session, data_type, commit=False) if data_type == 'reflux': current_table = 'current_reflux' elif data_type == 'reduce': current_table = 'current_reduce' # 从current数据集表中加载数据 dataset = pd.read_sql_table(current_table, session.bind) elif dataset_id: # 从新复制的数据集表中加载数据 dataset_table_name = f"dataset_{dataset_id}" dataset = pd.read_sql_table(dataset_table_name, session.bind) if dataset.empty: raise ValueError(f"Dataset {dataset_id} is empty or not found.") # 使用数据清理模块 if data_type == 'reflux': X = dataset.iloc[:, 1:-1] y = dataset.iloc[:, -1] # target_column = -1 # 假设目标变量在最后一列 # X, y, clean_stats = clean_dataset(dataset, target_column=target_column) elif data_type == 'reduce': X = dataset.iloc[:, 2:] y = dataset.iloc[:, 1] # target_column = 1 # 假设目标变量在第二列 # X, y, clean_stats = clean_dataset(dataset, target_column=target_column) # 记录清理统计信息 # logging.info(f"数据清理统计: {clean_stats}") # 训练模型 model = train_model_by_type(X, y, model_type) # 计算交叉验证得分 cv_score = cross_val_score(model, X, y, cv=5).mean() # 保存模型到数据库 model_id = save_model(session, model, model_name, model_type, model_description, dataset_id, data_type) # 更新模型的交叉验证得分 model_info = session.query(Models).filter(Models.ModelID == model_id).first() if model_info: model_info.CV_score = float(cv_score) session.commit() # 所有操作成功后,手动提交事务 session.commit() return model_name, model_id, dataset_id, cv_score except Exception as e: session.rollback() logging.error(f"训练和保存模型时发生错误: {str(e)}", exc_info=True) raise def save_current_dataset(session, data_type, commit=True): """ 创建一个新的数据集条目,并复制对应的数据类型表的数据,但不立即提交事务。 Args: session (Session): SQLAlchemy session对象。 data_type (str): 数据集的类型。 commit (bool): 是否在函数结束时提交事务。 Returns: int: 新保存的数据集的ID。 """ current_time = datetime.datetime.now() new_dataset = Datasets( Dataset_name=f"{data_type}_dataset_{current_time:%Y%m%d_%H%M%S}", Dataset_description=f"Automatically generated dataset for type {data_type}", Row_count=0, Status='pending', Dataset_type=data_type, Uploaded_at=current_time ) session.add(new_dataset) session.flush() dataset_id = new_dataset.Dataset_ID source_table = data_type_table_mapping(data_type) new_table_name = f"dataset_{dataset_id}" session.execute(text(f"CREATE TABLE {new_table_name} AS SELECT * FROM {source_table};")) session.execute(text(f"UPDATE datasets SET status='Datasets upgraded success', row_count=(SELECT count(*) FROM {new_table_name}) WHERE dataset_id={dataset_id};")) if commit: session.commit() return dataset_id def data_type_table_mapping(data_type): """映射数据类型到对应的数据库表名""" if data_type == 'reduce': return 'current_reduce' elif data_type == 'reflux': return 'current_reflux' else: raise ValueError("Invalid data type provided.") def train_model_by_type(X, y, model_type): # 划分数据集 # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # 使用全部数据作为训练集 X_train, y_train = X, y if model_type == 'RandomForest': # 随机森林的参数优化 return train_random_forest(X_train, y_train) elif model_type == 'XGBR': # XGBoost的参数优化 return train_xgboost(X_train, y_train) elif model_type == 'GBSTR': # 梯度提升树的参数优化 return train_gradient_boosting(X_train, y_train) else: raise ValueError(f"Unsupported model type: {model_type}") def train_random_forest(X_train, y_train): best_score = -float('inf') best_n_estimators = None best_max_depth = None random_state = 43 # 筛选最佳的树的数量 for n_estimators in range(1, 20, 1): model = RandomForestRegressor(n_estimators=n_estimators, random_state=random_state) score = cross_val_score(model, X_train, y_train, cv=5).mean() if score > best_score: best_score = score best_n_estimators = n_estimators print(f"Best number of trees: {best_n_estimators}, Score: {best_score}") # 在找到的最佳树的数量基础上,筛选最佳的最大深度 best_score = 0 # 重置最佳得分,为最大深度优化做准备 for max_depth in range(1, 5, 1): model = RandomForestRegressor(n_estimators=best_n_estimators, max_depth=max_depth, random_state=random_state) score = cross_val_score(model, X_train, y_train, cv=5).mean() if score > best_score: best_score = score best_max_depth = max_depth print(f"Best max depth: {best_max_depth}, Score: {best_score}") # 使用最佳的树的数量和最大深度训练最终模型 best_model = RandomForestRegressor(n_estimators=best_n_estimators, max_depth=best_max_depth, random_state=random_state) # 传入列名进行训练 best_model.fit(X_train, y_train) # 指定传入的特征名 best_model.feature_names_in_ = X_train.columns return best_model def train_xgboost(X_train, y_train): best_score = -float('inf') best_params = {'learning_rate': None, 'max_depth': None} random_state = 43 for learning_rate in [0.01, 0.05, 0.1, 0.2]: for max_depth in range(3, 10): model = XGBRegressor(learning_rate=learning_rate, max_depth=max_depth, random_state=random_state) score = cross_val_score(model, X_train, y_train, cv=5).mean() if score > best_score: best_score = score best_params['learning_rate'] = learning_rate best_params['max_depth'] = max_depth print(f"Best parameters: {best_params}, Score: {best_score}") # 使用找到的最佳参数训练最终模型 best_model = XGBRegressor(learning_rate=best_params['learning_rate'], max_depth=best_params['max_depth'], random_state=random_state) best_model.fit(X_train, y_train) return best_model def train_gradient_boosting(X_train, y_train): best_score = -float('inf') best_params = {'learning_rate': None, 'max_depth': None} random_state = 43 for learning_rate in [0.01, 0.05, 0.1, 0.2]: for max_depth in range(3, 10): model = GradientBoostingRegressor(learning_rate=learning_rate, max_depth=max_depth, random_state=random_state) score = cross_val_score(model, X_train, y_train, cv=5).mean() if score > best_score: best_score = score best_params['learning_rate'] = learning_rate best_params['max_depth'] = max_depth print(f"Best parameters: {best_params}, Score: {best_score}") # 使用找到的最佳参数训练最终模型 best_model = GradientBoostingRegressor(learning_rate=best_params['learning_rate'], max_depth=best_params['max_depth'], random_state=random_state) best_model.fit(X_train, y_train) return best_model def save_model(session, model, model_name, model_type, model_description, dataset_id, data_type, commit=False): """ 保存模型到数据库,并将模型文件保存到磁盘。 Args: session: 数据库会话 model: 要保存的模型对象 model_name: 模型的名称 model_type: 模型的类型 model_description: 模型的描述信息 dataset_id: 数据集ID data_type: 数据类型 commit: 是否提交事务 Returns: int: 返回保存的模型ID """ prefix_dict = { 'RandomForest': 'rf_model_', 'XGBR': 'xgbr_model_', 'GBSTR': 'gbstr_model_' } prefix = prefix_dict.get(model_type, 'default_model_') try: # 从配置中获取保存路径 model_save_path = Config.MODEL_SAVE_PATH # 确保路径存在 os.makedirs(model_save_path, exist_ok=True) # 获取当前时间戳 timestamp = datetime.datetime.now().strftime('%m%d_%H%M') # 拼接完整的文件名 file_name = os.path.join(model_save_path, f'{prefix}{timestamp}.pkl') # 保存模型到文件 with open(file_name, 'wb') as f: pickle.dump(model, f) print(f"模型已保存至: {file_name}") # 创建模型数据库记录 new_model = Models( Model_name=model_name, Model_type=model_type, Description=model_description, DatasetID=dataset_id, Created_at=datetime.datetime.now(), ModelFilePath=file_name, Data_type=data_type ) # 添加记录到数据库 session.add(new_model) session.flush() return new_model.ModelID except Exception as e: print(f"保存模型时发生错误: {str(e)}") raise if __name__ == '__main__': # 反酸模型预测 # 测试 predict 函数 input_data = pd.DataFrame([{ "organic_matter": 5.2, "chloride": 3.1, "cec": 25.6, "h_concentration": 0.5, "hn": 12.4, "al_concentration": 0.8, "free_alumina": 1.2, "free_iron": 0.9, "delta_ph": -0.2 }]) model_name = 'RF_filt' Acid_reflux_result = predict(input_data, model_name) print("Acid_reflux_result:", Acid_reflux_result) # 预测结果 # 降酸模型预测 # 测试 predict 函数 input_data = pd.DataFrame([{ "pH": 5.2, "OM": 3.1, "CL": 25.6, "H": 0.5, "Al": 12.4 }]) model_name = 'rf_model_1214_1008' Acid_reduce_result = predict(input_data, model_name) print("Acid_reduce_result:", Acid_reduce_result) # 预测结果