123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- """
- 数据清理模块,提供各种数据清理和预处理功能
- """
- import pandas as pd
- import numpy as np
- from sklearn.preprocessing import StandardScaler
- import logging
- logger = logging.getLogger(__name__)
- def remove_duplicates(df):
- """
- 移除数据框中的重复行
-
- Args:
- df: 输入数据框
-
- Returns:
- tuple: (清理后的数据框, 移除的重复项数量)
- """
- original_count = len(df)
- df_clean = df.drop_duplicates()
- duplicates_removed = original_count - len(df_clean)
- logger.info(f"移除了 {duplicates_removed} 个重复样本")
- return df_clean, duplicates_removed
- def remove_outliers(df, method='iqr', threshold=1.5):
- """
- 使用指定方法检测和移除异常值
-
- Args:
- df: 输入数据框
- method: 异常值检测方法 ('iqr', 'zscore')
- threshold: 异常值判定阈值
-
- Returns:
- tuple: (清理后的数据框, 移除的异常值数量)
- """
- original_count = len(df)
-
- if method == 'iqr':
- Q1 = df.quantile(0.25)
- Q3 = df.quantile(0.75)
- IQR = Q3 - Q1
- outlier_mask = ~((df < (Q1 - threshold * IQR)) | (df > (Q3 + threshold * IQR))).any(axis=1)
- df_clean = df[outlier_mask]
-
- elif method == 'zscore':
- from scipy import stats
- z_scores = stats.zscore(df)
- outlier_mask = ~(np.abs(z_scores) > threshold).any(axis=1)
- df_clean = df[outlier_mask]
-
- outliers_removed = original_count - len(df_clean)
- logger.info(f"使用 {method} 方法移除了 {outliers_removed} 个异常值")
- return df_clean, outliers_removed
- def clean_dataset(df, target_column=None, remove_dups=False, handle_outliers=False,
- outlier_method='iqr', outlier_threshold=1.5, normalize=False):
- """
- 综合数据清理函数
-
- Args:
- df: 输入数据框
- target_column: 目标变量列名或索引
- remove_dups: 是否移除重复项
- handle_outliers: 是否处理异常值
- outlier_method: 异常值检测方法
- outlier_threshold: 异常值判定阈值
- normalize: 是否标准化特征
-
- Returns:
- tuple: (特征数据框, 目标变量, 清理统计信息)
- """
- stats = {'original_count': len(df)}
-
- # 分离特征和目标变量
- if target_column is not None:
- if isinstance(target_column, str):
- X = df.drop(columns=[target_column])
- y = df[target_column]
- else:
- X = df.drop(df.columns[target_column], axis=1)
- y = df.iloc[:, target_column]
- else:
- X = df
- y = None
-
- # 移除重复项
- if remove_dups:
- if y is not None:
- combined = pd.concat([X, y], axis=1)
- combined, stats['duplicates_removed'] = remove_duplicates(combined)
- X = combined.iloc[:, :-1] if isinstance(target_column, int) else combined.drop(columns=[target_column])
- y = combined.iloc[:, -1] if isinstance(target_column, int) else combined[target_column]
- else:
- X, stats['duplicates_removed'] = remove_duplicates(X)
-
- # 处理异常值
- if handle_outliers:
- if y is not None:
- combined = pd.concat([X, y], axis=1)
- combined, stats['outliers_removed'] = remove_outliers(combined, method=outlier_method, threshold=outlier_threshold)
- X = combined.iloc[:, :-1] if isinstance(target_column, int) else combined.drop(columns=[target_column])
- y = combined.iloc[:, -1] if isinstance(target_column, int) else combined[target_column]
- else:
- X, stats['outliers_removed'] = remove_outliers(X, method=outlier_method, threshold=outlier_threshold)
-
- # 标准化特征
- if normalize:
- scaler = StandardScaler()
- X = pd.DataFrame(scaler.fit_transform(X), columns=X.columns, index=X.index)
- stats['normalized'] = True
-
- stats['final_count'] = len(X)
- return X, y, stats
|