"""
A class to train DFF networks for all variables in data. Each network will be trained to
predict one of the variables in the data, using the rest as predictors plus one
source of random noise.
(C) 2022,2023,2024, J. Renero
"""
# pylint: disable=E1101:no-member, W0201:attribute-defined-outside-init, W0511:fixme
# pylint: disable=C0103:invalid-name
# pylint: disable=C0116:missing-function-docstring
# pylint: disable=R0913:too-many-arguments
# pylint: disable=R0914:too-many-locals, R0915:too-many-statements
# pylint: disable=W0106:expression-not-assigned, R1702:too-many-branches
# pylint: disable=W0102:dangerous-default-value
import inspect
import types
import warnings
from typing import Dict, List, Tuple, Union, Any
import numpy as np
import optuna
import pandas as pd
import torch
from sklearn.base import BaseEstimator
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader
from mlforge.progbar import ProgBar # type: ignore
from ..common import DEFAULT_HPO_TRIALS, utils
from ..explainability.hierarchies import Hierarchies
from ._columnar import ColumnsDataset
from ._models import MLPModel
warnings.filterwarnings("ignore")
[docs]
class NNRegressor(BaseEstimator):
"""
A class to train DFF networks for all variables in data. Each network will be
trained to predict one of the variables in the data, using the rest as predictors
plus one source of random noise.
Attributes:
-----------
hidden_dim (int): The dimension(s) of the hidden layer(s). This value
can be a single integer for DFF or an array with the dimension of
each hidden layer for the MLP case.
activation (str): The activation function to use, either 'relu' or 'selu'.
Default is 'relu'.
learning_rate (float): The learning rate for the optimizer.
dropout (float): The dropout rate for the dropout layer.
batch_size (int): The batch size for the optimizer.
num_epochs (int): The number of epochs for the optimizer.
loss_fn (str): The loss function to use. Default is "mmd".
device (str): The device to use. Either "cpu", "cuda", or "mps". Default
is "auto".
test_size (float): The proportion of the data to use for testing. Default
is 0.1.
seed (int): The seed for the random number generator. Default is 1234.
early_stop (bool): Whether to use early stopping. Default is True.
patience (int): The patience for early stopping. Default is 10.
min_delta (float): The minimum delta for early stopping. Default is 0.001.
prog_bar (bool): Whether to enable the progress bar. Default
is False.
"""
[docs]
def __init__(
self,
hidden_dim: Union[int, List[int]] = [75, 17],
activation: str = 'relu',
learning_rate: float = 0.0046,
dropout: float = 0.001,
batch_size: int = 44,
num_epochs: int = 40,
loss_fn: str = 'mse',
device: Union[int, str] = "cpu",
test_size: float = 0.1,
early_stop: bool = False,
patience: int = 10,
min_delta: float = 0.001,
correlation_th: float = None,
random_state: int = 1234,
verbose: bool = False,
prog_bar: bool = True,
silent: bool = False,
optuna_prog_bar: bool = False):
"""
Train DFF networks for all variables in data. Each network will be trained to
predict one of the variables in the data, using the rest as predictors plus one
source of random noise.
Args:
data (pandas.DataFrame): The dataframe with the continuous variables.
model_type (str): The type of model to use. Either 'dff' or 'mlp'.
hidden_dim (int): The dimension(s) of the hidden layer(s). This value
can be a single integer for DFF or an array with the dimension of
each hidden layer for the MLP case.
activation (str): The activation function to use, either 'relu' or 'selu'.
Default is 'relu'.
learning_rate (float): The learning rate for the optimizer.
dropout (float): The dropout rate for the dropout layer.
batch_size (int): The batch size for the optimizer.
num_epochs (int): The number of epochs for the optimizer.
loss_fn (str): The loss function to use. Default is "mmd".
device (str): The device to use. Either "cpu", "cuda", or "mps". Default
is "auto".
test_size (float): The proportion of the data to use for testing. Default
is 0.1.
seed (int): The seed for the random number generator. Default is 1234.
early_stop (bool): Whether to use early stopping. Default is True.
patience (int): The patience for early stopping. Default is 10.
min_delta (float): The minimum delta for early stopping. Default is 0.001.
prog_bar (bool): Whether to enable the progress bar. Default
is False.
Returns:
dict: A dictionary with the trained DFF networks, using the name of the
variables as the key.
"""
self.hidden_dim = hidden_dim
self.activation = activation
self.learning_rate = learning_rate
self.dropout = dropout
self.batch_size = batch_size
self.num_epochs = num_epochs
self.loss_fn = loss_fn
self.device = device
self.test_size = test_size
self.early_stop = early_stop
self.patience = patience
self.min_delta = min_delta
self.correlation_th = correlation_th
self.random_state = random_state
self.verbose = verbose
self.prog_bar = prog_bar
self.silent = silent
self.optuna_prog_bar = optuna_prog_bar
self.regressor = None
self._fit_desc = "Training NNs"
if self.verbose:
self.prog_bar = False
[docs]
def fit(self, X):
"""A reference implementation of a fitting function.
Parameters
----------
X : {array-like, sparse matrix}, shape (n_samples, n_features)
The training input samples.
y : array-like, shape (n_samples,) or (n_samples, n_outputs)
The target values (class labels in classification, real numbers in
regression).
Returns
-------
self : object
Returns self.
"""
# X, y = check_X_y(X, y)
self.n_features_in_ = X.shape[1]
self.feature_names = utils.get_feature_names(X)
self.feature_types = utils.get_feature_types(X)
self.regressor = {}
# Who is calling me?
try:
curframe = inspect.currentframe()
calframe = inspect.getouterframes(curframe, 2)
caller_name = calframe[1][3]
if caller_name == "__call__":
caller_name = "HPO"
except Exception: # pylint: disable=broad-except
caller_name = "unknown"
if self.correlation_th:
self.corr_matrix = Hierarchies.compute_correlation_matrix(X)
self.correlated_features = Hierarchies.compute_correlated_features(
self.corr_matrix, self.correlation_th, self.feature_names,
verbose=self.verbose)
X_original = X.copy()
if self.prog_bar and not self.verbose:
pbar_name = f"({caller_name}) DNN_fit"
pbar = ProgBar().start_subtask(pbar_name, len(self.feature_names))
else:
pbar = None
for target_idx, target_name in enumerate(self.feature_names):
# if correlation_th is not None then, remove features that are highly
# correlated with the target, at each step of the loop
if self.correlation_th is not None:
X = X_original.copy()
if len(self.correlated_features[target_name]) > 0:
X = X.drop(self.correlated_features[target_name], axis=1)
if self.verbose:
print("REMOVED CORRELATED FEATURES: ",
self.correlated_features[target_name])
# Determine Loss function based on the type of target variable
if self.feature_types[target_name] == 'categorical':
loss_fn = 'crossentropy'
elif self.feature_types[target_name] == 'binary':
loss_fn = 'binary_crossentropy'
else:
loss_fn = self.loss_fn
self.regressor[target_name] = MLPModel(
target=target_name,
input_size=X.shape[1],
activation=self.activation,
hidden_dim=self.hidden_dim,
learning_rate=self.learning_rate,
batch_size=self.batch_size,
loss_fn=loss_fn,
dropout=self.dropout,
num_epochs=self.num_epochs,
dataframe=X,
test_size=self.test_size,
device=self.device,
seed=self.random_state,
early_stop=self.early_stop,
patience=self.patience,
min_delta=self.min_delta,
prog_bar=self.prog_bar)
self.regressor[target_name].train()
pbar.update_subtask(pbar_name, target_idx+1) if pbar else None
pbar.remove(pbar_name) if pbar else None
self.is_fitted_ = True
return self
[docs]
def predict(self, X: pd.DataFrame) -> np.ndarray:
"""Predicts the values for each target variable.
Parameters
----------
X : pd.DataFrame
The input data to make predictions on.
Returns
-------
np.ndarray
The predictions for each target variable.
"""
assert X.shape[1] == self.n_features_in_, \
f"X has {X.shape[1]} features, expected {self.n_features_in_}"
# X = check_array(X, accept_sparse=True)
if not self.is_fitted_:
raise ValueError("This Rex instance is not fitted yet. \
Call 'fit' with appropriate arguments before using this estimator.")
if self.correlation_th is not None:
X_original = X.copy()
prediction = pd.DataFrame(columns=self.feature_names)
for target in self.feature_names:
# Remove those features from the loader that are highly correlated to target
if self.correlation_th is not None:
X = X_original.drop(self.correlated_features[target], axis=1)
# Creat a data loader for the target variable. The ColumnsDataset will
# return the target variable as the second element of the tuple, and
# will drop the target from the features.
loader = DataLoader(
ColumnsDataset(target, X), batch_size=self.batch_size,
shuffle=False)
model = self.regressor[target].model
# Obtain the predictions for the target variable
preds = np.empty((0,), dtype=np.float16)
for (tensor_X, _) in loader:
tensor_X = tensor_X.to(self.device)
y_hat = model.forward(tensor_X)
preds = np.append(preds, y_hat.detach().numpy().flatten())
prediction[target] = preds
# Concatenate the numpy array for all the batchs
np_preds = prediction.values
final_preds = []
if np_preds.ndim > 1 and np_preds.shape[0] > 1:
for i in range(len(self.feature_names)):
column = np_preds[:, i]
if column.ndim == 1:
final_preds.append(column)
else:
final_preds.append(np.concatenate(column))
final_preds = np.array(final_preds)
else:
final_preds = np_preds
# If final_preds is still 1D, reshape it to 2D
if final_preds.ndim == 1:
final_preds = final_preds.reshape(1, -1)
if len(final_preds) == 0:
final_preds = np_preds
return np.array(final_preds)
[docs]
def score(self, X):
"""
Scores the model using the loss function. It returns the list of losses
for each target variable.
"""
assert X.shape[1] == self.n_features_in_, \
f"X has {X.shape[1]} features, expected {self.n_features_in_}"
if not self.is_fitted_:
raise ValueError("This Rex instance is not fitted yet. \
Call 'fit' with appropriate arguments before using this estimator.")
# Call the class method to predict the values for each target variable
y_hat = self.predict(X)
# Handle the case where the prediction returned by the model is not a
# numpy array but a numpy object type
if isinstance(y_hat, np.ndarray) and y_hat.dtype == np.object_:
y_hat = np.vstack(y_hat[:, :].flatten()).astype('float')
y_hat = torch.Tensor(y_hat)
scores = []
for i, target in enumerate(self.feature_names):
y_preds = torch.Tensor(y_hat[i])
y = torch.Tensor(X[target].values)
scores.append(self.regressor[target].model.loss_fn(y_preds, y))
self.scoring = np.array(scores)
return self.scoring
def __repr__(self):
forbidden_attrs = [
'fit', 'predict', 'score', 'get_params', 'set_params']
ret = f"REX object attributes\n"
ret += f"{'-'*80}\n"
for attr in dir(self):
if attr.startswith('_') or \
attr in forbidden_attrs or \
type(getattr(self, attr)) == types.MethodType:
continue
elif attr == "X" or attr == "y":
if isinstance(getattr(self, attr), pd.DataFrame):
ret += f"{attr:25} {getattr(self, attr).shape}\n"
continue
elif isinstance(getattr(self, attr), pd.DataFrame):
ret += f"{attr:25} DataFrame {getattr(self, attr).shape}\n"
else:
ret += f"{attr:25} {getattr(self, attr)}\n"
return ret
[docs]
def tune(
self,
training_data: pd.DataFrame,
test_data: pd.DataFrame,
study_name: str = None,
min_loss: float = 0.05,
storage: str = 'sqlite:///rex_tuning.db',
load_if_exists: bool = True,
n_trials: int = DEFAULT_HPO_TRIALS) -> Dict[str, Any]:
"""
Tune the hyperparameters of the model using Optuna.
"""
class Objective:
"""
A class to define the objective function for the hyperparameter optimization
Some of the parameters for NNRegressor have been taken to default values to
reduce the number of hyperparameters to optimize.
Include this class in the hyperparameter optimization as follows:
>>> study = optuna.create_study(direction='minimize',
>>> study_name='study_name_here',
>>> storage='sqlite:///db.sqlite3',
>>> load_if_exists=True)
>>> study.optimize(Objective(train_data, test_data), n_trials=100)
The only dependency is you need to pass the train and test data to the class
constructor. Tha class will build the data loaders for them from the
dataframes.
"""
def __init__(
self,
train_data,
test_data,
device='cpu',
prog_bar=True,
verbose=False):
self.train_data = train_data
self.test_data = test_data
self.device = device
self.n_layers = None
self.activation = None
self.learning_rate = None
self.dropout = None
self.batch_size = None
self.num_epochs = None
self.models = None
self.prog_bar = prog_bar
self.verbose = verbose
def __call__(self, trial):
"""
This method is called by Optuna to evaluate the objective function.
"""
self.n_layers = trial.suggest_int('n_layers', 1, 6)
self.layers = []
for i in range(self.n_layers):
self.layers.append(
trial.suggest_int(f'n_units_l{i}', 1, 182))
self.activation = trial.suggest_categorical(
'activation', ['relu', 'selu', 'linear'])
self.learning_rate = trial.suggest_loguniform(
'learning_rate', 1e-5, 1e-1)
self.dropout = trial.suggest_uniform('dropout', 0.0, 0.5)
self.batch_size = trial.suggest_int('batch_size', 8, 128)
self.num_epochs = trial.suggest_int('num_epochs', 10, 80)
self.models = NNRegressor(
hidden_dim=self.layers,
activation=self.activation,
learning_rate=self.learning_rate,
dropout=self.dropout,
batch_size=self.batch_size,
num_epochs=self.num_epochs,
loss_fn='mse',
device=self.device,
random_state=42,
verbose=self.verbose,
prog_bar=True & (not self.verbose) & (self.prog_bar),
silent=True)
self.models.fit(self.train_data)
# Now, measure the performance of the model with the test data.
loss = []
for target in list(self.train_data.columns):
model = self.models.regressor[target].model
loader = DataLoader(
ColumnsDataset(target, self.test_data),
batch_size=self.batch_size,
shuffle=False)
avg_loss, _, _ = self.compute_loss(model, loader)
loss.append(avg_loss)
return np.median(loss)
def compute_loss(
self,
model: torch.nn.Module,
dataloader: torch.utils.data.DataLoader,
n_repeats: int = 10) -> Tuple[float, float, np.ndarray]:
"""
Computes the average MSE loss for a given model and dataloader.
Parameters:
-----------
model: torch.nn.Module
The model to compute the loss for.
dataloader: torch.utils.data.DataLoader
The dataloader to use for computing the loss.
shuffle: int
If > 0, the column of the input data to shuffle.
Returns:
--------
avg_loss: float
The average MSE loss.
std_loss: float
The standard deviation of the MSE loss.
losses: np.ndarray
The MSE loss for each batch.
"""
mse = np.array([])
num_batches = 0
for _ in range(n_repeats):
loss = []
for _, (X, y) in enumerate(dataloader):
X = X.to(self.device)
y = y.to(self.device)
yhat = model.forward(X)
loss.append(model.loss_fn(yhat, y).item())
num_batches += 1
if len(mse) == 0:
mse = np.array(loss)
else:
mse = np.vstack((mse, [loss]))
return np.mean(mse), np.std(mse), mse
# Callback to stop the study if the MSE is below a threshold.
def callback(study: optuna.study.Study, trial: optuna.trial.FrozenTrial):
if trial.value < min_loss or study.best_value < min_loss:
study.stop()
if self.verbose is False:
optuna.logging.set_verbosity(optuna.logging.WARNING)
# Create and run the HPO study.
study = optuna.create_study(
direction='minimize', study_name=study_name, storage=storage,
load_if_exists=load_if_exists)
study.optimize(
Objective(
training_data, test_data, prog_bar=self.prog_bar, verbose=self.verbose),
n_trials=n_trials,
show_progress_bar=(self.optuna_prog_bar & (
not self.silent) & (not self.verbose)),
callbacks=[callback]
)
# Capture the best parameters and the minimum loss.
best_trials = sorted(study.best_trials, key=lambda x: x.values[0])
self.best_params = best_trials[0].params
self.min_tunned_loss = best_trials[0].values[0]
if self.verbose and not self.silent:
print(f"Best params (min loss:{self.min_tunned_loss:.6f}):")
for k, v in self.best_params.items():
print(f"\t{k:<15s}: {v}")
regressor_args = {
'hidden_dim': [self.best_params[f'n_units_l{i}']
for i in range(self.best_params['n_layers'])],
'activation': self.best_params['activation'],
'learning_rate': self.best_params['learning_rate'],
'dropout': self.best_params['dropout'],
'batch_size': self.best_params['batch_size'],
'num_epochs': self.best_params['num_epochs']}
return regressor_args
[docs]
def tune_fit(
self,
X: pd.DataFrame,
hpo_study_name: str = None,
hpo_min_loss: float = 0.05,
hpo_storage: str = 'sqlite:///rex_tuning.db',
hpo_load_if_exists: bool = True,
hpo_n_trials: int = DEFAULT_HPO_TRIALS):
"""
Tune the hyperparameters of the model using Optuna, and the fit the model
with the best parameters.
"""
# split X into train and test
train_data = X.sample(frac=0.9, random_state=self.random_state)
test_data = X.drop(train_data.index)
# tune the model
regressor_args = self.tune(
train_data, test_data, n_trials=hpo_n_trials, study_name=hpo_study_name,
min_loss=hpo_min_loss, storage=hpo_storage,
load_if_exists=hpo_load_if_exists)
if self.verbose and not self.silent:
print(f"Best params (min loss:{self.min_tunned_loss:.6f}):")
for k, v in regressor_args.items():
print(f"\t{k:<15s}: {v}")
# Set the object parameters to the best parameters found.
for k, v in regressor_args.items():
setattr(self, k, v)
# Fit the model with the best parameters.
self.fit(X)
#
# Main function
#
[docs]
def custom_main(score: bool = False, tune: bool = False):
import os
from causalexplain.common import utils
path = "/Users/renero/phd/data/RC4/risks"
output_path = "/Users/renero/phd/output/RC4/"
experiment_name = 'transformed_data'
# ref_graph = utils.graph_from_dot_file(f"{path}{experiment_name}.dot")
data = pd.read_csv(f"{os.path.join(path, experiment_name)}.csv")
scaler = StandardScaler()
data = pd.DataFrame(scaler.fit_transform(data), columns=data.columns)
# Split the dataframe into train and test
train = data.sample(frac=0.9, random_state=42)
test = data.drop(train.index)
if score:
rex = utils.load_experiment(f"{experiment_name}", output_path)
rex.is_fitted_ = True
print(f"Loaded experiment {experiment_name}")
rex.models.score(test)
elif tune:
nn = NNRegressor()
nn.tune_fit(data, hpo_n_trials=10)
if __name__ == "__main__":
custom_main(tune=True)