Source code for causalexplain.independence.feature_selection

# pylint: disable=E1101:no-member, W0201:attribute-defined-outside-init, W0511:fixme
# pylint: disable=C0103:invalid-name
# pylint: disable=C0116:missing-function-docstring
# pylint: disable=R0913:too-many-arguments
# pylint: disable=R0914:too-many-locals, R0915:too-many-statements
# pylint: disable=W0106:expression-not-assigned, R1702:too-many-branches

from copy import copy
from typing import List

import colorama
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.cluster import DBSCAN
from sklearn.metrics import euclidean_distances

BLACK = colorama.Fore.BLACK
RED = colorama.Fore.RED
GREEN = colorama.Fore.GREEN
GRAY = colorama.Fore.LIGHTBLACK_EX
RESET = colorama.Style.RESET_ALL


[docs] def select_features( values, feature_names, return_shaps=False, min_impact: float = 1e-06, exhaustive=False, threshold: float = None, verbose=False) -> List[str]: """ Sort the values and select those before (strict) the point of max. curvature, according to the selected algorithm. If strict is False, the point of max curv. is also selected. When the method is 'abrupt' the selection method is based on taking only those feature up-to (down-to) a certain percentage of change in their values. Arguments: - values (np.ndarray): The values for each of the features. This can be anything that should be used to determine what features are more important than others. - feature_names (list): Names of the variables corresponding to the shap values - return_shaps (bool): Whether returning the mean shap values together with order of the features. - min_impact (float): Default 1e-06. The minimum impact of a feature to be selected. If all features are below this value, none are selected. - exhaustive (bool): Default False. Whether to use the exhaustive method or not. If True, the threshold is used to find all possible clusters above the given threshold, not only the first one. - threshold (float): Default None. The threshold to use when exahustive is True. If None, exception is raised. - verbose: guess what. """ if (exhaustive is True) and (threshold is None): raise ValueError("If exhaustive is True, threshold must be provided.") threshold = 0.0 if threshold is None else threshold if len(values.shape) > 1: feature_order = np.argsort(np.sum(np.abs(values), axis=0)) mean_values = np.abs(values).mean(0) else: feature_order = np.argsort(np.abs(values)) mean_values = np.abs(values) sorted_shap_values = np.array([mean_values[idx] for idx in feature_order]) if verbose: print(f" Feature order......: {feature_order}") # In some cases, the mean SHAP values are 0. We return an empty list in that case. if np.all(mean_values < min_impact): return [] if verbose: print(" Sum values.........: ", end="") if len(values.shape) > 1: print(','.join([f"({f}:{s:.03f})" for f, s in zip( feature_names, np.sum(np.abs(values), axis=0))])) else: print(','.join([f"({f}:{s:.03f})" for f, s in zip( feature_names, np.abs(values))])) print( f" Feature_order......: " f"{','.join([f'{feature_names[i]}' for i in feature_order])}\n" f" sorted_mean_values.: " f"{','.join([f'{x:.6f}' for x in sorted_shap_values])}\n" f" threshold..........: {threshold:.6f}") sorted_impact_values = copy(sorted_shap_values) selected_features = [] max_iterations = len(sorted_impact_values) iteration = 0 limit_idx = 0 while np.any(sorted_impact_values > threshold): if iteration >= max_iterations: break limit_idx = find_cluster_change_point(sorted_impact_values, verbose=verbose) selected_features = list(reversed( [feature_names[i] for i in feature_order[limit_idx:]])) if not exhaustive: break sorted_impact_values = sorted_impact_values[:limit_idx] iteration += 1 if verbose: print(f" Limit_idx(cut-off).: {limit_idx}") print(f" Selected_features..: {selected_features}") if return_shaps: return selected_features, list(reversed(sorted(mean_values)[limit_idx:])) return selected_features
[docs] def find_cluster_change_point(X: List, verbose: bool = False) -> int: """ Given an array of values in increasing or decreasing order, detect what are the elements that belong to the same cluster. The clustering is done using DBSCAN with a distance computed as the max. difference between consecutive elements. Arguments: - X (np.array): the series of values to detect the abrupt change. - verbose (bool): Verbose output. Returns: The position in the array where an abrupt change is produced. If there's no change in consecutive values greater than the tolerance passed then the last element of the array is returned. """ if len(X) <= 1: return None X = np.array(X).reshape(-1, 1) X_safe = X.copy() X_safe[X_safe == 0.0] = 1e-06 pairwise_distances = euclidean_distances(X)[0,] pairwise_distances = np.diff(pairwise_distances) pairwise_distances = np.sort(pairwise_distances)[::-1] # Safety check: if any of the values in pairwise_distances is 0, add 1e-06 pairwise_distances[pairwise_distances == 0.0] = 1e-06 n_clusters_ = 0 while n_clusters_ <= 1 and len(pairwise_distances) > 0: max_distance = pairwise_distances.max() + 1e-06 # Safety check max_distance = 0.001 if max_distance <= 0.0 else max_distance if verbose: print(f" pairwise_distances.: {pairwise_distances}") print(f" max_distance.......: {max_distance:.4f}") db = DBSCAN(eps=max_distance, min_samples=1).fit(X) labels = db.labels_ # Number of clusters in labels, ignoring noise if present. n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0) n_noise_ = list(labels).count(-1) if n_clusters_ <= 1: if verbose: print(" ↳ Only 1 cluster generated. Decreasing max_distance.") pairwise_distances = pairwise_distances[1:] if pairwise_distances.size == 0: print("** No clusters generated") if verbose else None return None if verbose: print(f" Est.clusters/noise.: {n_clusters_}/{n_noise_}") if (len(labels) > 3) and (len(labels) < (X.shape[0]-1)): print( f" Silhouette Coeff...: {metrics.silhouette_score(X, labels):.3f}\n" f" ↳ Labels: {labels}") winner_label = n_clusters_ - 1 samples_in_winner_cluster = np.argwhere(X_safe[labels != winner_label]) return samples_in_winner_cluster[:, 0][-1]+1
[docs] def main(): # Display Options np.set_printoptions(precision=4, linewidth=100) pd.set_option('display.precision', 4) pd.set_option('display.float_format', '{:.4f}'.format) toy_values = np.array( [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.959290610692456] ) # values = np.array( # [0.083, 0.175, 0.353, 0.204, 0.081, 0.116, 0.088, 0.451, 0.152] # ) names = [f"V{i}" for i in range(len(toy_values))] select_features(toy_values, names, verbose=True)
[docs] def test(): values = np.array([0.1, 0.2, 0.3, 0.4, 0.5]) feature_names = ['A', 'B', 'C', 'D', 'E'] result = select_features(values, feature_names) assert result == ['A', 'B', 'C', 'D', 'E']
if __name__ == "__main__": test()