Source code for causalexplain.models.gbt

"""
This module contains the GBTRegressor class, which is a wrapper around the
GradientBoostingRegressor class from the scikit-learn library. The class
implements the fit, predict, and score methods to fit a separate model for
each feature in the dataframe, and predict and score the model for each feature
in the dataframe.

The class also implements a tune method to tune the hyperparameters of the model
using Optuna. The tune method uses the Objective class to define the objective
function for the hyperparameter optimization. The Objective class is a nested
class within the GBTRegressor class, and it defines the objective function for
the hyperparameter optimization. The class is designed to be used with the
Optuna library.

The module also contains a main function that can be used to run the GBTRegressor
class with the tune method. The main function takes the name of the experiment
as an argument, and loads the data and the reference graph for the experiment.
The main function then splits the data into train and test, and runs the tune
method to tune the hyperparameters of the model. The main function can be used
to run the GBTRegressor class with the tune method for any experiment.

The module can be run as a script to run the main function with the tune method
for a specific experiment. The experiment name is passed as an argument to the
script, and the main function is called with the experiment name as an argument.
The script can be used to run the GBTRegressor class with the tune method for
any experiment.

Example:

    $ python gbt.py rex_generated_linear_6

This will run the GBTRegressor class with the tune method for the experiment
'rex_generated_linear_6'.

The module can also be imported and used in other modules or scripts to run the
GBTRegressor class with the tune method for any experiment.

Example:

    from causalexplain.models.gbt import custom_main

    custom_main("rex_generated_linear_6")

"""

# pylint: disable=E1101:no-member, W0201:attribute-defined-outside-init, W0511:fixme
# pylint: disable=C0103:invalid-name
# pylint: disable=C0116:missing-function-docstring
# pylint: disable=R0913:too-many-arguments
# pylint: disable=R0914:too-many-locals, R0915:too-many-statements
# pylint: disable=W0106:expression-not-assigned, R1702:too-many-branches
# pylint: disable=W0102:dangerous-default-value


import inspect

import numpy as np
import optuna  # type: ignore
import pandas as pd
from mlforge.progbar import ProgBar  # type: ignore
from sklearn.ensemble import (GradientBoostingClassifier,
                              GradientBoostingRegressor)
from sklearn.metrics import f1_score
from sklearn.preprocessing import StandardScaler

from ..common import DEFAULT_HPO_TRIALS, utils
from ..explainability.hierarchies import Hierarchies


[docs] class GBTRegressor(GradientBoostingRegressor): random_state = 42
[docs] def __init__( self, loss='squared_error', learning_rate=0.1, n_estimators=100, subsample=1.0, criterion='friedman_mse', min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_depth=3, min_impurity_decrease=0.0, init=None, random_state=42, max_features=None, # alpha=0.9, max_leaf_nodes=None, warm_start=False, validation_fraction=0.1, n_iter_no_change=None, tol=0.0001, ccp_alpha=0.0, correlation_th: float = None, verbose=False, silent=False, prog_bar=True, optuna_prog_bar=False): self.loss = loss self.learning_rate = learning_rate self.n_estimators = n_estimators self.subsample = subsample self.criterion = criterion self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.min_weight_fraction_leaf = min_weight_fraction_leaf self.max_depth = max_depth self.min_impurity_decrease = min_impurity_decrease self.init = init self.random_state = random_state self.max_features = max_features # self.alpha = alpha self.max_leaf_nodes = max_leaf_nodes self.warm_start = warm_start self.validation_fraction = validation_fraction self.n_iter_no_change = n_iter_no_change self.tol = tol self.ccp_alpha = ccp_alpha self.correlation_th = correlation_th self.verbose = verbose self.silent = silent self.prog_bar = prog_bar self.optuna_prog_bar = optuna_prog_bar self.regressor = None self._estimator_name = 'gbt' self._estimator_class = GradientBoostingRegressor self._fit_desc = "Training GBTs"
[docs] def fit(self, X): """ Call the fit method of the parent class with every feature from the "X" dataframe as a target variable. This will fit a separate model for each feature in the dataframe. """ self.n_features_in_ = X.shape[1] self.feature_names = utils.get_feature_names(X) self.feature_types = utils.get_feature_types(X) X = utils.cast_categoricals_to_int(X) self.regressor = dict() if self.correlation_th: self.corr_matrix = Hierarchies.compute_correlation_matrix(X) self.correlated_features = Hierarchies.compute_correlated_features( self.corr_matrix, self.correlation_th, self.feature_names, verbose=self.verbose) X_original = X.copy() # Who is calling me? try: curframe = inspect.currentframe() calframe = inspect.getouterframes(curframe, 2) caller_name = calframe[1][3] if caller_name == "__call__": caller_name = "HPO" except Exception: # pylint: disable=broad-except caller_name = "unknown" if self.prog_bar and not self.verbose: pbar_name = f"({caller_name}) GBT_fit" pbar = ProgBar().start_subtask(pbar_name, len(self.feature_names)) else: pbar = None for target_idx, target_name in enumerate(self.feature_names): # if correlation_th is not None then, remove features that are highly # correlated with the target, at each step of the loop if self.correlation_th is not None: X = X_original.copy() if len(self.correlated_features[target_name]) > 0: X = X.drop(self.correlated_features[target_name], axis=1) if self.verbose: print("REMOVED CORRELATED FEATURES: ", self.correlated_features[target_name]) if self.feature_types[target_name] == 'categorical' or \ self.feature_types[target_name] == 'binary': self.loss = 'log_loss' gbt_model = GradientBoostingClassifier else: self.loss = 'squared_error' gbt_model = GradientBoostingRegressor self.regressor[target_name] = gbt_model( loss=self.loss, learning_rate=self.learning_rate, n_estimators=self.n_estimators, subsample=self.subsample, criterion=self.criterion, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, min_weight_fraction_leaf=self.min_weight_fraction_leaf, max_depth=self.max_depth, min_impurity_decrease=self.min_impurity_decrease, init=self.init, random_state=self.random_state, max_features=self.max_features, # alpha=self.alpha, verbose=False, max_leaf_nodes=self.max_leaf_nodes, warm_start=self.warm_start, validation_fraction=self.validation_fraction, n_iter_no_change=self.n_iter_no_change, tol=self.tol, ccp_alpha=self.ccp_alpha ) self.regressor[target_name].fit( X.drop(target_name, axis=1), X[target_name]) pbar.update_subtask(pbar_name, target_idx+1) if pbar else None pbar.remove(pbar_name) if pbar else None self.is_fitted_ = True return self
[docs] def predict(self, X): """ Call the predict method of the parent class with every feature from the "X" dataframe as a target variable. This will predict a separate value for each feature in the dataframe. """ if not self.is_fitted_: raise ValueError( f"This {self.__class__.__name__} instance is not fitted yet." f"Call 'fit' with appropriate arguments before using this method.") y_pred = list() if self.correlation_th is not None: X_original = X.copy() for target_name in self.feature_names: if self.correlation_th is not None: X = X_original.drop( self.correlated_features[target_name], axis=1) y_pred.append( self.regressor[target_name].predict(X.drop(target_name, axis=1))) return np.array(y_pred)
[docs] def score(self, X): """ Call the score method of the parent class with every feature from the "X" dataframe as a target variable. This will score a separate model for each feature in the dataframe. """ if not self.is_fitted_: raise ValueError( f"This {self.__class__.__name__} instance is not fitted yet." f"Call 'fit' with appropriate arguments before using this method.") if self.correlation_th is not None: X_original = X.copy() scores = list() X_eval = utils.cast_categoricals_to_int(X) for target_name in self.feature_names: if self.correlation_th is not None: X_eval = X_original.drop( self.correlated_features[target_name], axis=1) R2 = self.regressor[target_name].score( X_eval.drop(target_name, axis=1), X_eval[target_name]) # Append 1.0 if R2 is negative, or 1.0 - R2 otherwise since we're # in the minimization mode of the error function. scores.append(1.0) if R2 < 0.0 else scores.append(1.0 - R2) self.scoring = np.array(scores) return self.scoring
[docs] def tune( self, training_data: pd.DataFrame, test_data: pd.DataFrame, study_name: str = None, min_loss: float = 0.05, storage: str = "sqlite:///rex_tuning.db", load_if_exists: bool = True, n_trials: int = DEFAULT_HPO_TRIALS ): """ Tune the hyperparameters of the model using Optuna. """ class Objective: """ A class to define the objective function for the hyperparameter optimization Some of the parameters for NNRegressor have been taken to default values to reduce the number of hyperparameters to optimize. Include this class in the hyperparameter optimization as follows: >>> study = optuna.create_study(direction='minimize', >>> study_name='study_name_here', >>> storage='sqlite:///db.sqlite3', >>> load_if_exists=True) >>> study.optimize(Objective(train_data, test_data), n_trials=100) The only dependency is you need to pass the train and test data to the class constructor. Tha class will build the data loaders for them from the dataframes. """ def __init__( self, train_data, test_data, device='cpu', prog_bar=True, verbose=False): self.train_data = train_data self.test_data = test_data self.device = device self.random_state = GBTRegressor.random_state self.prog_bar = prog_bar self.verbose = verbose def __call__(self, trial): """ This method is called by Optuna to evaluate the objective function. """ # Define the model hyperparameters self.n_iter_no_change = 5 self.tol = 0.0001 # Define the hyperparameters to optimize self.learning_rate = trial.suggest_float( "learning_rate", 0.001, 0.2) self.n_estimators = trial.suggest_int("n_estimators", 10, 1000) self.subsample = trial.suggest_float("subsample", 0.1, 1.0) self.min_samples_split = trial.suggest_int( "min_samples_split", 2, 10) self.min_samples_leaf = trial.suggest_int( "min_samples_leaf", 1, 10) self.min_weight_fraction_leaf = trial.suggest_float( "min_weight_fraction_leaf", 0.0, 0.5) self.max_depth = trial.suggest_int("max_depth", 3, 20) self.max_leaf_nodes = trial.suggest_int( "max_leaf_nodes", 10, 1000) self.min_impurity_decrease = trial.suggest_float( "min_impurity_decrease", 0.0, 0.5) self.models = GBTRegressor( learning_rate=self.learning_rate, n_estimators=self.n_estimators, subsample=self.subsample, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, min_weight_fraction_leaf=self.min_weight_fraction_leaf, max_depth=self.max_depth, min_impurity_decrease=self.min_impurity_decrease, random_state=self.random_state, verbose=False, max_leaf_nodes=self.max_leaf_nodes, n_iter_no_change=self.n_iter_no_change, tol=self.tol, prog_bar=True & (not self.verbose) & (self.prog_bar), silent=True) self.models.fit(self.train_data) # Now, measure the performance of the model with the test data. loss = [] X_test = utils.cast_categoricals_to_int(self.test_data) for target_name in list(self.train_data.columns): model = self.models.regressor[target_name] # For regressors, this is R2, for classifiers this is accuracy if model.__class__.__name__ == "GradientBoostingClassifier": # Get the F1 score of the model goodness_of_fit = f1_score( X_test[target_name], model.predict(X_test.drop(target_name, axis=1))) elif model.__class__.__name__ == "GradientBoostingRegressor": goodness_of_fit = model.score( X_test.drop(target_name, axis=1), X_test[target_name]) else: raise ValueError( f"Model {model.__class__.__name__} is not supported." f"Only GradientBoostingClassifier and" f"GradientBoostingRegressor are supported.") # Append 1.0 if R2 is negative, or 1.0 - R2 otherwise since we're # in the minimization mode of the error function. loss.append(1.0) if goodness_of_fit < 0.0 else loss.append( 1.0 - goodness_of_fit) return np.median(loss) # Callback function to stop the study if the loss is below a given threshold def callback(study: optuna.study.Study, trial: optuna.trial.FrozenTrial): if trial.value < min_loss or study.best_value < min_loss: study.stop() if self.verbose is False: optuna.logging.set_verbosity(optuna.logging.WARNING) # Create and run the HPO study study = optuna.create_study( direction='minimize', study_name=study_name, storage=storage, load_if_exists=load_if_exists) study.optimize( Objective( training_data, test_data, prog_bar=self.prog_bar, verbose=self.verbose), n_trials=n_trials, gc_after_trial=True, show_progress_bar=(self.optuna_prog_bar & ( not self.silent) & (not self.verbose)), callbacks=[callback]) # Capture the best hyperparameters and the minimum loss best_trials = sorted(study.best_trials, key=lambda x: x.values[0]) self.best_params = best_trials[0].params self.min_tunned_loss = best_trials[0].values[0] if self.verbose and not self.silent: print(f"Best params (min loss:{self.min_tunned_loss:.6f}):") for k, v in self.best_params.items(): print(f"\t{k:<15s}: {v}") regressor_args = { 'learning_rate': self.best_params['learning_rate'], 'n_estimators': self.best_params['n_estimators'], 'subsample': self.best_params['subsample'], 'min_samples_split': self.best_params['min_samples_split'], 'min_samples_leaf': self.best_params['min_samples_leaf'], 'min_weight_fraction_leaf': self.best_params['min_weight_fraction_leaf'], 'max_depth': self.best_params['max_depth'], 'max_leaf_nodes': self.best_params['max_leaf_nodes'], 'min_impurity_decrease': self.best_params['min_impurity_decrease'] } return regressor_args
[docs] def tune_fit( self, X: pd.DataFrame, hpo_study_name: str = None, hpo_min_loss: float = 0.05, hpo_storage: str = 'sqlite:///rex_tuning.db', hpo_load_if_exists: bool = True, hpo_n_trials: int = DEFAULT_HPO_TRIALS): """ Tune the hyperparameters of the model using Optuna, and the fit the model with the best parameters. """ # split X into train and test train_data = X.sample(frac=0.9, random_state=self.random_state) test_data = X.drop(train_data.index) # tune the model regressor_args = self.tune( train_data, test_data, n_trials=hpo_n_trials, study_name=hpo_study_name, min_loss=hpo_min_loss, storage=hpo_storage, load_if_exists=hpo_load_if_exists) if self.verbose and not self.silent: print(f"Best params (min loss:{self.min_tunned_loss:.6f}):") for k, v in regressor_args.items(): print(f"\t{k:<15s}: {v}") # Set the object parameters to the best parameters found. for k, v in regressor_args.items(): setattr(self, k, v) # Fit the model with the best parameters. self.fit(train_data)
# # Main function #
[docs] def custom_main(experiment_name='custom_rex', score: bool = False, tune: bool = False): from causalexplain.common import utils path = "/Users/renero/phd/data/RC3/" output_path = "/Users/renero/phd/output/RC3/" ref_graph = utils.graph_from_dot_file(f"{path}{experiment_name}.dot") data = pd.read_csv(f"{path}{experiment_name}.csv") scaler = StandardScaler() data = pd.DataFrame(scaler.fit_transform(data), columns=data.columns) # Split the dataframe into train and test train = data.sample(frac=0.9, random_state=42) test = data.drop(train.index) if score: rex = utils.load_experiment(f"{experiment_name}", output_path) rex.is_fitted_ = True print(f"Loaded experiment {experiment_name}") rex.models.score(test) elif tune: gbt = GBTRegressor(silent=True, prog_bar=False) # verbose=True) gbt.tune_fit(train, hpo_study_name=experiment_name, hpo_n_trials=100) print(gbt.score(test))
if __name__ == "__main__": custom_main("rex_generated_linear_6", tune=True)