import os import pandas as pd import numpy as np import joblib from sklearn.metrics import mean_squared_error, r2_score import matplotlib.pyplot as plt import seaborn as sns from datetime import datetime from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split def load_models(model_dir): """ 加载指定目录中的所有模型 @param {string} model_dir - 模型存储目录路径 @return {dict} - 模型名称和模型对象的字典 """ models = {} for filename in os.listdir(model_dir): if filename.endswith('.pkl'): model_path = os.path.join(model_dir, filename) model_name = os.path.splitext(filename)[0] try: model = joblib.load(model_path) models[model_name] = model print(f"成功加载模型: {model_name}") except Exception as e: print(f"加载模型 {model_name} 时出错: {str(e)}") return models def evaluate_models(models, X_test, y_test): """ 评估所有模型的性能 @param {dict} models - 模型名称和模型对象的字典 @param {DataFrame} X_test - 测试特征数据 @param {Series} y_test - 测试目标数据 @return {DataFrame} - 包含各模型评估指标的数据框 """ results = [] for name, model in models.items(): try: # 预测 y_pred = model.predict(X_test) # 计算评估指标 rmse = mean_squared_error(y_test, y_pred, squared=False) r2 = r2_score(y_test, y_pred) # 存储结果 results.append({ 'model_name': name, 'rmse': rmse, 'r2': r2 }) print(f"模型 {name} - RMSE: {rmse:.4f}, R²: {r2:.4f}") except Exception as e: print(f"评估模型 {name} 时出错: {str(e)}") # 转换为DataFrame并排序 results_df = pd.DataFrame(results) return results_df def select_best_model(results_df, metric='r2', higher_better=True): """ 根据指定指标选择最佳模型 @param {DataFrame} results_df - 包含各模型评估指标的数据框 @param {string} metric - 用于选择的指标名称 @param {boolean} higher_better - 指标值是否越高越好 @return {string} - 最佳模型名称 """ if higher_better: best_idx = results_df[metric].idxmax() else: best_idx = results_df[metric].idxmin() best_model = results_df.loc[best_idx, 'model_name'] print(f"根据 {metric} 指标,最佳模型是: {best_model}") return best_model def visualize_results(results_df): """ 可视化各模型的性能比较 @param {DataFrame} results_df - 包含各模型评估指标的数据框 """ # 设置字体,使用通用字体 plt.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial'] plt.rcParams['axes.unicode_minus'] = False # 创建图形 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) # RMSE比较图 sns.barplot(x='model_name', y='rmse', data=results_df, ax=ax1) ax1.set_title('RMSE Comparison of Models') ax1.set_xlabel('Model Name') ax1.set_ylabel('RMSE (Lower is better)') ax1.tick_params(axis='x', rotation=45) # R²比较图 sns.barplot(x='model_name', y='r2', data=results_df, ax=ax2) ax2.set_title('R² Comparison of Models') ax2.set_xlabel('Model Name') ax2.set_ylabel('R² (Higher is better)') ax2.tick_params(axis='x', rotation=45) plt.tight_layout() # 保存图表 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") plt.savefig(f'model_optimize/results/model_comparison_{timestamp}.png', dpi=300) plt.show() def save_best_model(models, best_model_name, output_dir): """ 将最佳模型保存到指定目录 @param {dict} models - 模型名称和模型对象的字典 @param {string} best_model_name - 最佳模型名称 @param {string} output_dir - 输出目录 """ if not os.path.exists(output_dir): os.makedirs(output_dir) best_model = models[best_model_name] output_path = os.path.join(output_dir, 'best_model.pkl') joblib.dump(best_model, output_path) print(f"最佳模型已保存至: {output_path}") def extract_and_retrain_model(model_path, X_train, y_train, X_test, y_test): """ 从现有模型中提取参数,使用这些参数在训练集上重新训练模型,然后在测试集上评估 @param {string} model_path - 模型文件路径 @param {DataFrame} X_train - 训练特征数据 @param {Series} y_train - 训练目标数据 @param {DataFrame} X_test - 测试特征数据 @param {Series} y_test - 测试目标数据 @return {dict} - 包含原始模型和重训练模型评估结果的字典 """ try: # 加载原始模型 original_model = joblib.load(model_path) model_name = os.path.basename(model_path) print(f"成功加载模型: {model_name}") # 提取模型参数 params = original_model.get_params() print(f"提取的模型参数: {params}") # 使用原始模型直接在测试集上评估 y_pred_original = original_model.predict(X_test) rmse_original = mean_squared_error(y_test, y_pred_original, squared=False) r2_original = r2_score(y_test, y_pred_original) # 使用提取的参数创建新模型并在训练集上训练 new_model = RandomForestRegressor(**params) new_model.fit(X_train, y_train) # 在测试集上评估新训练的模型 y_pred_new = new_model.predict(X_test) rmse_new = mean_squared_error(y_test, y_pred_new, squared=False) r2_new = r2_score(y_test, y_pred_new) # 返回结果 results = { 'model_name': model_name, 'original': { 'rmse': rmse_original, 'r2': r2_original }, 'retrained': { 'rmse': rmse_new, 'r2': r2_new, 'model': new_model }, 'parameters': params } print(f"原始模型 {model_name} - RMSE: {rmse_original:.4f}, R²: {r2_original:.4f}") print(f"重训练模型 {model_name} - RMSE: {rmse_new:.4f}, R²: {r2_new:.4f}") return results except Exception as e: print(f"处理模型时出错: {str(e)}") return None def visualize_comparison(original_results, retrained_results): """ 可视化原始模型和重训练模型的性能比较 @param {dict} original_results - 原始模型的评估结果 @param {dict} retrained_results - 重训练模型的评估结果 """ # 设置字体,使用通用字体 plt.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial'] plt.rcParams['axes.unicode_minus'] = False # 创建图形 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) # 准备数据 model_names = ['Original Model', 'Retrained Model'] rmse_values = [original_results['rmse'], retrained_results['rmse']] r2_values = [original_results['r2'], retrained_results['r2']] # RMSE比较图 ax1.bar(model_names, rmse_values, color=['blue', 'orange']) ax1.set_title('RMSE Comparison') ax1.set_ylabel('RMSE (Lower is better)') # 在柱状图上添加数值标签 for i, v in enumerate(rmse_values): ax1.text(i, v + 0.01, f'{v:.4f}', ha='center') # R²比较图 ax2.bar(model_names, r2_values, color=['blue', 'orange']) ax2.set_title('R² Comparison') ax2.set_ylabel('R² (Higher is better)') # 在柱状图上添加数值标签 for i, v in enumerate(r2_values): ax2.text(i, v + 0.01, f'{v:.4f}', ha='center') plt.tight_layout() # 保存图表 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") plt.savefig(f'model_optimize/results/model_retrain_comparison_{timestamp}.png', dpi=300) plt.show() if __name__ == "__main__": # 加载数据 data = pd.read_excel('model_optimize/data/Acidity_reduce_new.xlsx') X = data.iloc[:, 1:] y = data.iloc[:, 0] X.columns = ['pH', 'OM', 'CL', 'H', 'Al'] y.name = 'target' # 划分训练集和测试集 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # 确保结果目录存在 if not os.path.exists('model_optimize/results'): os.makedirs('model_optimize/results') # # 第一部分:评估特定模型并重新训练 # specific_model_path = r'pkl\rf_model_0308_1619.pkl' # if os.path.exists(specific_model_path): # print("\n===== 特定模型参数提取与重训练评估 =====") # # 提取参数并重新训练 # results = extract_and_retrain_model(specific_model_path, X_train, y_train, X_test, y_test) # if results: # # 可视化比较结果 # visualize_comparison(results['original'], results['retrained']) # # 保存重训练的模型 # retrained_model = results['retrained']['model'] # output_dir = 'model_optimize/retrained_models' # if not os.path.exists(output_dir): # os.makedirs(output_dir) # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # output_path = os.path.join(output_dir, f'retrained_model_{timestamp}.pkl') # joblib.dump(retrained_model, output_path) # print(f"重训练模型已保存至: {output_path}") # # 保存模型参数到文本文件 # params_output = os.path.join(output_dir, f'model_parameters_{timestamp}.txt') # with open(params_output, 'w') as f: # for param, value in results['parameters'].items(): # f.write(f"{param}: {value}\n") # print(f"模型参数已保存至: {params_output}") # else: # print(f"指定的模型文件不存在: {specific_model_path}") # 第二部分:原有的模型比较代码 print("\n===== 所有模型性能比较 =====") # 加载所有模型 models = load_models('model_optimize/pkl') if models: # 评估模型 results_df = evaluate_models(models, X, y) # 保存评估结果 # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # results_df.to_csv(f'model_optimize/results/model_comparison_{timestamp}.csv', index=False) # 选择最佳模型 (基于R²) best_model_r2 = select_best_model(results_df, metric='r2', higher_better=True) # 选择最佳模型 (基于RMSE) best_model_rmse = select_best_model(results_df, metric='rmse', higher_better=False) print(f"基于R²的最佳模型: {best_model_r2}") print(f"基于RMSE的最佳模型: {best_model_rmse}") # 可视化结果 visualize_results(results_df) # 保存最佳模型 (这里使用R²作为选择标准) # save_best_model(models, best_model_r2, 'model_optimize/best_model') else: print("没有找到可用的模型")