Source code for causalexplain.estimators.cam.cam

"""

(C) Original Code from R implementation of the Causal Additive Model (CAM)

@article{buhlmann2014cam,
  title={CAM: Causal additive models, high-dimensional order search and 
          penalized regression},
  author={B{\"u}hlmann, Peter and Peters, Jonas and Ernest, Jan},
  journal={The Annals of Statistics},
  volume={42},
  number={6},
  pages={2526--2556},
  year={2014},
  publisher={Institute of Mathematical Statistics}
}

- **Imports**: Imported necessary modules and functions. Assumed that
`computeScoreMat`, `updateScoreMat`, `pruning`, `selGamBoost`, and `selGam`
are defined in separate Python files in the same directory.
- **Function Definition**: Translated the R function `CAM` to Python.
- **Variable Initialization**: Initialized variables and handled default values.
- **Variable Selection**: Used `numpy` and `multiprocessing` for parallel 
    processing.
- **Edge Inclusion**: Translated the logic for including edges and updating the
score matrix.
- **Pruning**: Translated the pruning step.
- **Output and Return**: Collected and printed the results.

Make sure the corresponding Python files (`computeScoreMat.py`, 
    `updateScoreMat.py`,
`pruning.py`, `selGamBoost.py`, `selGam.py`) are present in the same directory 
and contain the necessary functions.
"""
# pylint: disable=E1101:no-member, W0201:attribute-defined-outside-init
# pylint: disable=C0103:invalid-name, W0221:arguments-differ
# pylint: disable=C0116:missing-function-docstring, W0511:fixme
# pylint: disable=R0913:too-many-arguments, E0401:import-error
# pylint: disable=R0914:too-many-locals, R0915:too-many-statements
# pylint: disable=W0106:expression-not-assigned, R1702:too-many-branches

import os
import time
from multiprocessing import Pool

import networkx as nx
import numpy as np
import pandas as pd

from ...common import utils
from .computeScoreMat import computeScoreMat
from .pruning import pruning
from .selGam import selGam
from .selGamBoost import selGamBoost
from .updateScoreMat import updateScoreMat
from ...metrics.compare_graphs import evaluate_graph


[docs] class CAM:
[docs] def __init__( self, name:str, scoreName="SEMGAM", parsScore=None, numCores=1, maxNumParents=None, verbose=False, variableSel=False, variableSelMethod=selGamBoost, variableSelMethodPars=None, pruning=True, pruneMethod=selGam, pruneMethodPars={"cutOffPVal": 0.05, "numBasisFcts": 10}, intervData=False, intervMat=None): self.name = name self.scoreName = scoreName self.parsScore = parsScore self.numCores = numCores self.maxNumParents = maxNumParents self.verbose = verbose self.variableSel = variableSel self.variableSelMethod = variableSelMethod self.variableSelMethodPars = variableSelMethodPars self.pruning = pruning self.pruneMethod = pruneMethod self.pruneMethodPars = pruneMethodPars self.intervData = intervData self.intervMat = intervMat self.is_fitted_ = False self.metrics = None self.dag = None if parsScore is None: self.parsScore = {"numBasisFcts": 10} if variableSelMethodPars is None: self.variableSelMethodPars = { "atLeastThatMuchSelected": 0.02, "atMostThatManyNeighbors": 10} if pruneMethodPars is None: self.pruneMethodPars = {"cutOffPVal": 0.001, "numBasisFcts": 10} self.maxNumParents = maxNumParents
[docs] def fit(self, X): """ This method implements the entire CAM algorithm. Translated from the R code. Parameters ---------- X : np.array Observational data Returns ------- edgeList : list List of edges scoreVec : list List of scores """ if self.verbose: print(f"number of cores: {self.numCores}") if self.maxNumParents is None: self.maxNumParents = min(X.shape[1] - 1, round(X.shape[0] / 20)) timeCycle = 0 timeUpdate = 0 timeScoreMat = 0 timeSel = 0 timePrune = 0 timeMax = 0 scoreVec = [] edgeList = [] counterUpdate = 0 self.feature_names = list(X.columns) X = X.values p = X.shape[1] if self.variableSel: start_time = time.time() if self.intervData: X2 = X[np.sum(self.intervMat, axis=1) == 0, :] if self.verbose: print("The preliminary neighbourhood selection is done with the " + "observational data only.") else: X2 = X if self.numCores == 1: selMat = np.array( [self.variableSelMethod(X2, self.variableSelMethodPars, self.verbose) for _ in range(p)]) else: with Pool(self.numCores) as pool: selMat = np.array(pool.starmap(self.variableSelMethod, [ (X2, self.variableSelMethodPars, self.verbose) for _ in range(p)])) cou = sum(2 ** np.sum(selMat[:, jk]) for jk in range(p)) if self.verbose: print( f"Instead of p2^(p-1) -Sillander- {p * 2 ** (p - 1)} we have {cou}") print( f"Greedy, on the other hand, is computing {np.sum(selMat)} entries.") timeSel += time.time() - start_time else: selMat = np.ones((p, p), dtype=bool) print('NO variable selection') if self.verbose else None if self.variableSel and self.verbose: if p < 30: print("This is the matrix of possible parents after the first step.") print(selMat) start_time = time.time() computeScoreMatTmp = computeScoreMat( X, score_name=self.scoreName, num_parents=1, num_cores=self.numCores, verbose=self.verbose, sel_mat=selMat, pars_score=self.parsScore, interv_mat=self.intervMat, interv_data=self.intervData) timeScoreMat += time.time() - start_time pathMatrix = np.eye(p, dtype=int) self.adj = np.zeros((p, p), dtype=int) scoreNodes = computeScoreMatTmp['scoreEmptyNodes'] if self.verbose: print(f"scoreNodes: {scoreNodes}") print("Contents of computeScoreMat@CAM()") for i in range(computeScoreMatTmp["scoreMat"].shape[0]): for j in range(computeScoreMatTmp["scoreMat"].shape[1]): if computeScoreMatTmp['scoreMat'][i, j] == -np.inf: print(" -inf ", end="") else: print( f"{computeScoreMatTmp['scoreMat'][i, j]:+.4f} ", end="") print("") print("Pre-WHILE condition: ", np.sum(computeScoreMatTmp['scoreMat'] != -np.Inf)) while np.sum(computeScoreMatTmp['scoreMat'] != -np.inf) > 0: start_time = time.time() ix_max = np.unravel_index(np.argmax( computeScoreMatTmp['scoreMat']), computeScoreMatTmp['scoreMat'].shape) timeMax += time.time() - start_time self.adj[ix_max] = 1 scoreNodes[ix_max[1]] += computeScoreMatTmp['scoreMat'][ix_max] if self.verbose: print(f"ix_max: {ix_max}") print(f"\nIncluded edge (from, to) {ix_max}") computeScoreMatTmp['scoreMat'][ix_max] = -np.inf start_time = time.time() pathMatrix[ix_max[0], ix_max[1]] = 1 DescOfNewChild = np.where(pathMatrix[ix_max[1], :] == 1)[0] AncOfNewParent = np.where(pathMatrix[:, ix_max[0]] == 1)[0] pathMatrix[np.ix_(AncOfNewParent, DescOfNewChild)] = 1 computeScoreMatTmp['scoreMat'][pathMatrix.T == 1] = -np.inf computeScoreMatTmp['scoreMat'][ix_max[1], ix_max[0]] = -np.inf timeCycle += time.time() - start_time scoreVec.append(np.sum(scoreNodes)) edgeList.append(ix_max) start_time = time.time() computeScoreMatTmp['scoreMat'] = updateScoreMat( computeScoreMatTmp['scoreMat'], X, score_name=self.scoreName, i=ix_max[0], j=ix_max[1], score_nodes=scoreNodes, adj=self.adj, num_cores=self.numCores, verbose=self.verbose, max_num_parents=self.maxNumParents, pars_score=self.parsScore, interv_mat=self.intervMat, interv_data=self.intervData) timeUpdate += time.time() - start_time counterUpdate += 1 if self.verbose: print("\n--------------------------") print("Finished step 2 ----------") print("--------------------------") # Step 3 if self.pruning: if self.intervData: X2 = X[np.sum(self.intervMat, axis=1) == 0, :] print("The preliminary neighbourhood selection is done with the " + "observational data only.") else: X2 = X if self.verbose: print("\n Performing pruning ... \n") start_time = time.time() self.adj = pruning(X=X2, G=self.adj, prune_method=self.pruneMethod, prune_method_pars=self.pruneMethodPars, verbose=self.verbose) timePrune += time.time() - start_time timeTotal = timeSel + timeScoreMat + timeCycle + timeUpdate + timeMax + timePrune if self.verbose: print("\nTiming:") print(f" Time for variable selection: {timeSel:.2f}") print(f" Time computing the initial scoreMat: {timeScoreMat:.2f}") print(f" Time checking for cycles: {timeCycle:.2f}") print( f" Time computing updates for the scoreMat: {timeUpdate:.2f}, " f"doing {counterUpdate} updates.") print(f" Time for pruning: {timePrune:.2f}") print(f" Time for finding maximum: {timeMax:.2f}") print(f" Time in total: {timeTotal:.2f}") result = { "Adj": self.adj, "Score": np.sum(scoreNodes), "timesVec": [timeSel, timeScoreMat, timeCycle, timeUpdate, timePrune, timeMax, timeTotal], "scoreVec": scoreVec, "edgeList": edgeList } self.is_fitted_ = True return self
[docs] def predict(self, ref_graph: nx.DiGraph = None): if not self.is_fitted_: raise ValueError( f"This {self.__class__.__name__} instance is not fitted yet." f"Call 'fit' with appropriate arguments before using this method.") self.dag = utils.graph_from_adjacency(self.adj, self.feature_names) if ref_graph is not None: self.metrics = evaluate_graph( ref_graph, self.dag, self.feature_names) else: self.metrics = None return self.dag
[docs] def fit_predict( self, train_data:pd.DataFrame, test_data:pd.DataFrame = None, ref_graph: nx.DiGraph = None): self.fit(train_data) self.predict(ref_graph) return self.dag
[docs] def main(dataset_name, input_path="/Users/renero/phd/data/sachs", output_path="/Users/renero/phd/output/", save=False): data = pd.read_csv(os.path.join(input_path, dataset_name) + ".csv") ref_graph = utils.graph_from_dot_file( os.path.join(input_path, dataset_name) + ".dot") cam = CAM(name="main_run", pruning=True, pruneMethodPars={"cutOffPVal": 0.05, "numBasisFcts": 10}, verbose=False) cam.fit_predict(data, ref_graph=ref_graph) print(cam.metrics)
if __name__ == "__main__": main("sachs")