Source code for causalexplain.estimators.notears.notears

"""
NOTEARS

(C) Original code from https://github.com/xunzheng/notears
"""


import networkx as nx
import numpy as np
import pandas as pd
import scipy

from causalexplain.common import utils
from causalexplain.estimators.notears.loss import (least_squares_loss,
                                                 least_squares_loss_grad)
from causalexplain.metrics.compare_graphs import evaluate_graph


[docs] class NOTEARS:
[docs] def __init__( self, name: str, loss=least_squares_loss, loss_grad=least_squares_loss_grad, c=0.25, r=10.0, e=1e-8, rnd_W_init=False, verbose=False): self.name = name self.loss = loss self.loss_grad = loss_grad self.c = c self.r = r self.e = e self.rnd_W_init = rnd_W_init self.verbose = verbose
[docs] def notears_standard(self, data, return_all_progress=False): """ Runs NOTEARS algorithm. Args: data (np.array): n x d data matrix with n samples, d variables c (float): minimum rate of progress, c \in (0,1) r (float): penalty growth rate, r > 1 e (float): optimation accuracy, e > 0 (acyclicity stopping criteria) loss (function): loss function loss_grad (function): gradient of the loss function rnd_W_init (bool): initialize W to std. normal random matrix, rather than zero matrix output_all_progress (bool): return all intermediate values of W, rather than just the final value Returns: dict: { 'h': acyclicity of output, 'loss': loss of output, 'W': resulting optimized adjacency matrix} """ n = np.shape(data)[0] d = np.shape(data)[1] data = np.array(data).astype(dtype=np.float64) cov = np.cov(data.T) if self.rnd_W_init: W = np.random.randn(d, d) else: W = np.zeros([d, d]) # initial guess W = W.astype(dtype=np.float64) a = 0.0 # initial guess p = 1.0 # initial penalty if return_all_progress: ret = [] def h(W): # tr exp(W ◦ W) − d return np.trace(scipy.linalg.expm(np.multiply(W, W))) - d def h_grad(W): # ∇h(W) = [exp(W ◦ W)]^T ◦ 2W return np.multiply( np.transpose(scipy.linalg.expm(np.multiply(W, W))), 2.0 * W) def L(W, p, a): W = np.reshape(W, [d, d]).astype(dtype=np.float64) return self.loss(W, data, cov, d, n) + (p/2.0)*(h(W)**2) + a*(h(W)) def L_grad(W, p, a): W = np.reshape(W, [d, d]).astype(dtype=np.float64) return np.reshape( self.loss_grad(W, data, cov, d, n) + h_grad(W)*(a + (p*h(W))), [d**2]).astype(dtype=np.float64) def get_W_star(p, W, a): W_flat = W.flatten() W_star = scipy.optimize.minimize(L, W_flat, args=( p, a), jac=L_grad, method='L-BFGS-B', options={'disp': False}) W_star.x = W_star.x.reshape(W.shape).astype(dtype=np.float64) return W_star while True: W_star = get_W_star(p, W, a) W_star = W_star['x'] # W_star = W_star.reshape(get_W_star(p, W, a)['x'], # [d, d]).astype(dtype=np.float64) h_W_star = h(W_star) if h(W) != 0.0: while h_W_star >= max(self.c * h(W), self.e): p = self.r*p W_star = np.reshape(get_W_star(p, W, a)['x'], [ d, d]).astype(dtype=np.float64) h_W_star = h(W_star) if self.verbose: print("Increasing p:\t p = {: .2e}\n\t\t h_W_star = {}".format( p, h_W_star)) if return_all_progress: ret.append({'h': h_W_star, 'loss': self.loss( W_star, data, cov, d, n), 'a': a, 'W': W_star}) if h_W_star < self.e: if self.verbose: print("Done:\t\t h = {}\n\t\t loss = {}\nt\t\t a = {}".format( h_W_star, self.loss(W_star, data, cov, d, n), a)) if return_all_progress: return ret return { 'h': h_W_star, 'loss': self.loss(W_star, data, cov, d, n), 'W': W_star} if self.verbose: print("Progress:\t h = {}\n\t\t loss = {}\n\t\t a = {}". format( h_W_star, self.loss(W_star, data, cov, d, n), a)) a = a + p*h_W_star W = W_star
[docs] def fit(self, X: pd.DataFrame, **kwargs): self.labels = list(X.columns) self.model = self.notears_standard(X) if self.verbose: print('Acyclicity loss...: {:.4g}'.format(self.model['h'])) print('Least squares loss: {:.4g}'.format(self.model['loss'])) return self
[docs] def predict(self, ref_graph: nx.DiGraph = None, threshold: float = 0.1): if ref_graph: true_adj_mat = utils.graph_to_adjacency( ref_graph, labels=self.labels) num_nodes = true_adj_mat.shape[0] num_edges = len(ref_graph.edges()) acyclic_W = np.where(self.model['W'] > threshold, 1, 0).T self.dag = utils.graph_from_adjacency( adjacency=acyclic_W, node_labels=self.labels) self.metrics = evaluate_graph(ref_graph, self.dag) return self
[docs] def fit_predict( self, train: pd.DataFrame, test: pd.DataFrame, ref_graph: nx.DiGraph = None, threshold=0.1, **kwargs): self.fit(train) self.predict(ref_graph, threshold) return self.dag
[docs] def main(dataset_name, input_path="/Users/renero/phd/data/", output_path="/Users/renero/phd/output/", save=False): data = pd.read_csv(f"{input_path}{dataset_name}.csv") ref_graph = utils.graph_from_dot_file(f"{input_path}{dataset_name}.dot") notears = NOTEARS(name="main_run") notears.fit_predict(data, test=None, ref_graph=ref_graph) print(notears.metrics)
if __name__ == "__main__": main("toy_dataset")