import warnings import pandas as pd from sklearn.datasets import make_classification warnings.filterwarnings('ignore') X, y = make_classification( n_samples=5000, n_features=30, n_redundant=15, n_clusters_per_class=1, weights=[0.50], class_sep=2, random_state=42 ) cols = [] for i in range(len(X[0])): cols.append(f"feat_{i}") X = pd.DataFrame(X, columns=cols) y = pd.DataFrame({"y": y}) X.head() from feature_engine.selection import SmartCorrelatedSelection MODEL_TYPE = "classifier" ## Or "regressor" CORRELATION_THRESHOLD = .97 # Setup Smart Selector /// Tks feature_engine feature_selector = SmartCorrelatedSelection( variables=None, method="spearman", threshold=CORRELATION_THRESHOLD, missing_values="ignore", selection_method="variance", estimator=None, ) feature_selector.fit_transform(X) ### Setup a list of correlated clusters as lists and a list of uncorrelated features correlated_sets = feature_selector.correlated_feature_sets_ correlated_clusters = [list(feature) for feature in correlated_sets] correlated_features = [feature for features in correlated_clusters for feature in features] uncorrelated_features = [feature for feature in X if feature not in correlated_features] from sklearn.feature_selection import ( SelectKBest, mutual_info_classif, mutual_info_regression, ) mutual_info = { "classifier": mutual_info_classif, "regressor": mutual_info_regression, } top_features_cluster = [] for cluster in correlated_clusters: selector = SelectKBest(score_func=mutual_info[MODEL_TYPE], k=1) # selects the top feature (k=1) regarding target mutual information selector = selector.fit(X[cluster], y) top_features_cluster.append( list(selector.get_feature_names_out())[0] ) selected_features = top_features_cluster + uncorrelated_features import os import multiprocessing from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import StratifiedKFold, cross_validate cv = StratifiedKFold(shuffle=True, random_state=42) baseline_raw = cross_validate( RandomForestClassifier( max_samples=1.0, n_jobs=int(os.getenv("N_CORES", 0.50 * multiprocessing.cpu_count())), # simplifica isso aqui pro artigo, bota -1. random_state=42 ), X, y, cv=cv, scoring="f1", # or any other metric that you want. groups=None ) baseline_selected_features = cross_validate( RandomForestClassifier(), X[selected_features], y, cv=cv, scoring="f1", groups=None, error_score="raise", ) score_raw = baseline_raw["test_score"].mean() score_baseline = baseline_selected_features["test_score"].mean() # Define a threshold to decide whether to reduce or not the dimensionality for your test case dif = round(((score_raw - score_baseline) / score_raw), 3) # 5% is our limit (ponder how it will impact your product $) performance_threshold = -0.050 if dif >= performance_threshold: print(f"It's worth to go with the selected set =D") elif dif < performance_threshold: print(f"The performance reduction is not acceptable!!!! >.<") # Repeat df from example. import warnings import pandas as pd from sklearn.datasets import make_classification warnings.filterwarnings('ignore') X, y = make_classification( n_samples=5000, n_features=30, n_redundant=15, n_clusters_per_class=1, weights=[0.50], class_sep=2, random_state=42 ) cols = [] for i in range(len(X[0])): cols.append(f"feat_{i}") X = pd.DataFrame(X, columns=cols) y = pd.DataFrame({"y": y}) # Functions to iterate over accepted threshold from sklearn.feature_selection import ( SelectKBest, mutual_info_classif, mutual_info_regression, ) import os import multiprocessing from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import StratifiedKFold, cross_validate import pandas as pd from feature_engine.selection import SmartCorrelatedSelection def select_features_clf(X: pd.DataFrame, y: pd.DataFrame, corr_threshold: float) -> list: """ Function will select a set of features with minimum redundance and maximum relevante based on the set correlation threshold """ # Setup Smart Selector /// Tks feature_engine feature_selector = SmartCorrelatedSelection( variables=None, method="spearman", threshold=corr_threshold, missing_values="ignore", selection_method="variance", estimator=None, ) feature_selector.fit_transform(X) ### Setup a list of correlated clusters as lists and a list of uncorrelated features correlated_sets = feature_selector.correlated_feature_sets_ correlated_clusters = [list(feature) for feature in correlated_sets] correlated_features = [feature for features in correlated_clusters for feature in features] uncorrelated_features = [feature for feature in X if feature not in correlated_features] top_features_cluster = [] for cluster in correlated_clusters: selector = SelectKBest(score_func=mutual_info_classif, k=1) # selects the top feature (k=1) regarding target mutual information selector = selector.fit(X[cluster], y) top_features_cluster.append( list(selector.get_feature_names_out())[0] ) return top_features_cluster + uncorrelated_features def get_clf_model_scores(X: pd.DataFrame, y: pd.DataFrame, scoring: str, selected_features:list): """ """ cv = StratifiedKFold(shuffle=True, random_state=42) model_result = cross_validate( RandomForestClassifier(), X[selected_features], y, cv=cv, scoring=scoring, groups=None, error_score="raise", ) return model_result["test_score"].mean(), model_result["fit_time"].mean(), model_result["score_time"].mean() def evaluate_clf_feature_selection_range(X: pd.DataFrame, y: pd.DataFrame, scoring:str, corr_range: int, corr_starting_point: float = .98) -> pd.DataFrame: """ Evaluates feature selection for every .01 on corr threshold """ evaluation_data = { "corr_threshold": [], scoring: [], "n_features": [], "fit_time": [], "score_time": [] } for i in range(corr_range): current_corr_threshold = corr_starting_point - (i / 100) ## Reduces .01 on corr_threshold for every iteration selected_features = select_features_clf(X, y, corr_threshold=current_corr_threshold) score, fit_time, score_time = get_clf_model_scores(X, y, scoring, selected_features) evaluation_data["corr_threshold"].append(current_corr_threshold) evaluation_data[scoring].append(score) evaluation_data["n_features"].append(len(selected_features)) evaluation_data["fit_time"].append(fit_time) evaluation_data["score_time"].append(score_time) return pd.DataFrame(evaluation_data) evaluation_df = evaluate_clf_feature_selection_range(X, y, "f1", 15) %pip install hiplot import hiplot html = hiplot.Experiment.from_dataframe(evaluation_df).to_html() displayHTML(html)