123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- import os
- import pandas as pd
- import numpy as np
- import joblib
- from sklearn.metrics import mean_squared_error, r2_score
- import matplotlib.pyplot as plt
- import seaborn as sns
- from datetime import datetime
- from sklearn.ensemble import RandomForestRegressor
- from sklearn.model_selection import train_test_split
- def load_models(model_dir):
- """
- 加载指定目录中的所有模型
-
- @param {string} model_dir - 模型存储目录路径
- @return {dict} - 模型名称和模型对象的字典
- """
- models = {}
- for filename in os.listdir(model_dir):
- if filename.endswith('.pkl'):
- model_path = os.path.join(model_dir, filename)
- model_name = os.path.splitext(filename)[0]
- try:
- model = joblib.load(model_path)
- models[model_name] = model
- print(f"成功加载模型: {model_name}")
- except Exception as e:
- print(f"加载模型 {model_name} 时出错: {str(e)}")
- return models
- def evaluate_models(models, X_test, y_test):
- """
- 评估所有模型的性能
-
- @param {dict} models - 模型名称和模型对象的字典
- @param {DataFrame} X_test - 测试特征数据
- @param {Series} y_test - 测试目标数据
- @return {DataFrame} - 包含各模型评估指标的数据框
- """
- results = []
-
- for name, model in models.items():
- try:
- # 预测
- y_pred = model.predict(X_test)
-
- # 计算评估指标
- rmse = mean_squared_error(y_test, y_pred, squared=False)
- r2 = r2_score(y_test, y_pred)
-
- # 存储结果
- results.append({
- 'model_name': name,
- 'rmse': rmse,
- 'r2': r2
- })
-
- print(f"模型 {name} - RMSE: {rmse:.4f}, R²: {r2:.4f}")
- except Exception as e:
- print(f"评估模型 {name} 时出错: {str(e)}")
-
- # 转换为DataFrame并排序
- results_df = pd.DataFrame(results)
- return results_df
- def select_best_model(results_df, metric='r2', higher_better=True):
- """
- 根据指定指标选择最佳模型
-
- @param {DataFrame} results_df - 包含各模型评估指标的数据框
- @param {string} metric - 用于选择的指标名称
- @param {boolean} higher_better - 指标值是否越高越好
- @return {string} - 最佳模型名称
- """
- if higher_better:
- best_idx = results_df[metric].idxmax()
- else:
- best_idx = results_df[metric].idxmin()
-
- best_model = results_df.loc[best_idx, 'model_name']
- print(f"根据 {metric} 指标,最佳模型是: {best_model}")
- return best_model
- def visualize_results(results_df):
- """
- 可视化各模型的性能比较
-
- @param {DataFrame} results_df - 包含各模型评估指标的数据框
- """
- # 设置字体,使用通用字体
- plt.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial']
- plt.rcParams['axes.unicode_minus'] = False
-
- # 创建图形
- fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
-
- # RMSE比较图
- sns.barplot(x='model_name', y='rmse', data=results_df, ax=ax1)
- ax1.set_title('RMSE Comparison of Models')
- ax1.set_xlabel('Model Name')
- ax1.set_ylabel('RMSE (Lower is better)')
- ax1.tick_params(axis='x', rotation=45)
-
- # R²比较图
- sns.barplot(x='model_name', y='r2', data=results_df, ax=ax2)
- ax2.set_title('R² Comparison of Models')
- ax2.set_xlabel('Model Name')
- ax2.set_ylabel('R² (Higher is better)')
- ax2.tick_params(axis='x', rotation=45)
-
- plt.tight_layout()
-
- # 保存图表
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- plt.savefig(f'model_optimize/results/model_comparison_{timestamp}.png', dpi=300)
- plt.show()
- def save_best_model(models, best_model_name, output_dir):
- """
- 将最佳模型保存到指定目录
-
- @param {dict} models - 模型名称和模型对象的字典
- @param {string} best_model_name - 最佳模型名称
- @param {string} output_dir - 输出目录
- """
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
-
- best_model = models[best_model_name]
- output_path = os.path.join(output_dir, 'best_model.pkl')
- joblib.dump(best_model, output_path)
- print(f"最佳模型已保存至: {output_path}")
- def extract_and_retrain_model(model_path, X_train, y_train, X_test, y_test):
- """
- 从现有模型中提取参数,使用这些参数在训练集上重新训练模型,然后在测试集上评估
-
- @param {string} model_path - 模型文件路径
- @param {DataFrame} X_train - 训练特征数据
- @param {Series} y_train - 训练目标数据
- @param {DataFrame} X_test - 测试特征数据
- @param {Series} y_test - 测试目标数据
- @return {dict} - 包含原始模型和重训练模型评估结果的字典
- """
- try:
- # 加载原始模型
- original_model = joblib.load(model_path)
- model_name = os.path.basename(model_path)
- print(f"成功加载模型: {model_name}")
-
- # 提取模型参数
- params = original_model.get_params()
- print(f"提取的模型参数: {params}")
-
- # 使用原始模型直接在测试集上评估
- y_pred_original = original_model.predict(X_test)
- rmse_original = mean_squared_error(y_test, y_pred_original, squared=False)
- r2_original = r2_score(y_test, y_pred_original)
-
- # 使用提取的参数创建新模型并在训练集上训练
- new_model = RandomForestRegressor(**params)
- new_model.fit(X_train, y_train)
-
- # 在测试集上评估新训练的模型
- y_pred_new = new_model.predict(X_test)
- rmse_new = mean_squared_error(y_test, y_pred_new, squared=False)
- r2_new = r2_score(y_test, y_pred_new)
-
- # 返回结果
- results = {
- 'model_name': model_name,
- 'original': {
- 'rmse': rmse_original,
- 'r2': r2_original
- },
- 'retrained': {
- 'rmse': rmse_new,
- 'r2': r2_new,
- 'model': new_model
- },
- 'parameters': params
- }
-
- print(f"原始模型 {model_name} - RMSE: {rmse_original:.4f}, R²: {r2_original:.4f}")
- print(f"重训练模型 {model_name} - RMSE: {rmse_new:.4f}, R²: {r2_new:.4f}")
-
- return results
-
- except Exception as e:
- print(f"处理模型时出错: {str(e)}")
- return None
- def visualize_comparison(original_results, retrained_results):
- """
- 可视化原始模型和重训练模型的性能比较
-
- @param {dict} original_results - 原始模型的评估结果
- @param {dict} retrained_results - 重训练模型的评估结果
- """
- # 设置字体,使用通用字体
- plt.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial']
- plt.rcParams['axes.unicode_minus'] = False
-
- # 创建图形
- fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
-
- # 准备数据
- model_names = ['Original Model', 'Retrained Model']
- rmse_values = [original_results['rmse'], retrained_results['rmse']]
- r2_values = [original_results['r2'], retrained_results['r2']]
-
- # RMSE比较图
- ax1.bar(model_names, rmse_values, color=['blue', 'orange'])
- ax1.set_title('RMSE Comparison')
- ax1.set_ylabel('RMSE (Lower is better)')
-
- # 在柱状图上添加数值标签
- for i, v in enumerate(rmse_values):
- ax1.text(i, v + 0.01, f'{v:.4f}', ha='center')
-
- # R²比较图
- ax2.bar(model_names, r2_values, color=['blue', 'orange'])
- ax2.set_title('R² Comparison')
- ax2.set_ylabel('R² (Higher is better)')
-
- # 在柱状图上添加数值标签
- for i, v in enumerate(r2_values):
- ax2.text(i, v + 0.01, f'{v:.4f}', ha='center')
-
- plt.tight_layout()
-
- # 保存图表
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- plt.savefig(f'model_optimize/results/model_retrain_comparison_{timestamp}.png', dpi=300)
- plt.show()
- if __name__ == "__main__":
- # 加载数据
- data = pd.read_excel('model_optimize/data/Acidity_reduce_new.xlsx')
- X = data.iloc[:, 1:]
- y = data.iloc[:, 0]
- X.columns = ['pH', 'OM', 'CL', 'H', 'Al']
- y.name = 'target'
-
- # 划分训练集和测试集
- X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
-
- # 确保结果目录存在
- if not os.path.exists('model_optimize/results'):
- os.makedirs('model_optimize/results')
-
- # # 第一部分:评估特定模型并重新训练
- # specific_model_path = r'pkl\rf_model_0308_1619.pkl'
-
- # if os.path.exists(specific_model_path):
- # print("\n===== 特定模型参数提取与重训练评估 =====")
- # # 提取参数并重新训练
- # results = extract_and_retrain_model(specific_model_path, X_train, y_train, X_test, y_test)
-
- # if results:
- # # 可视化比较结果
- # visualize_comparison(results['original'], results['retrained'])
-
- # # 保存重训练的模型
- # retrained_model = results['retrained']['model']
- # output_dir = 'model_optimize/retrained_models'
- # if not os.path.exists(output_dir):
- # os.makedirs(output_dir)
-
- # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- # output_path = os.path.join(output_dir, f'retrained_model_{timestamp}.pkl')
- # joblib.dump(retrained_model, output_path)
- # print(f"重训练模型已保存至: {output_path}")
-
- # # 保存模型参数到文本文件
- # params_output = os.path.join(output_dir, f'model_parameters_{timestamp}.txt')
- # with open(params_output, 'w') as f:
- # for param, value in results['parameters'].items():
- # f.write(f"{param}: {value}\n")
- # print(f"模型参数已保存至: {params_output}")
- # else:
- # print(f"指定的模型文件不存在: {specific_model_path}")
-
- # 第二部分:原有的模型比较代码
- print("\n===== 所有模型性能比较 =====")
- # 加载所有模型
- models = load_models('model_optimize/pkl')
-
- if models:
- # 评估模型
- results_df = evaluate_models(models, X, y)
-
- # 保存评估结果
- # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- # results_df.to_csv(f'model_optimize/results/model_comparison_{timestamp}.csv', index=False)
-
- # 选择最佳模型 (基于R²)
- best_model_r2 = select_best_model(results_df, metric='r2', higher_better=True)
-
- # 选择最佳模型 (基于RMSE)
- best_model_rmse = select_best_model(results_df, metric='rmse', higher_better=False)
-
- print(f"基于R²的最佳模型: {best_model_r2}")
- print(f"基于RMSE的最佳模型: {best_model_rmse}")
-
- # 可视化结果
- visualize_results(results_df)
-
- # 保存最佳模型 (这里使用R²作为选择标准)
- # save_best_model(models, best_model_r2, 'model_optimize/best_model')
- else:
- print("没有找到可用的模型")
|