Source code for causalexplain.estimators.notears.utils

import numpy as np
import networkx as nx
from scipy.special import expit as sigmoid
import igraph as ig
import random


[docs] def set_random_seed(seed): random.seed(seed) np.random.seed(seed)
[docs] def is_dag(W): G = ig.Graph.Weighted_Adjacency(W.tolist()) return G.is_dag()
[docs] def simulate_dag(d, s0, graph_type): """Simulate random DAG with some expected number of edges. Args: d (int): num of nodes s0 (int): expected num of edges graph_type (str): ER, SF, BP Returns: B (np.ndarray): [d, d] binary adj matrix of DAG """ def _random_permutation(M): # np.random.permutation permutes first axis only P = np.random.permutation(np.eye(M.shape[0])) return P.T @ M @ P def _random_acyclic_orientation(B_und): return np.tril(_random_permutation(B_und), k=-1) def _graph_to_adjmat(G): return np.array(G.get_adjacency().data) if graph_type == 'ER': # Erdos-Renyi G_und = ig.Graph.Erdos_Renyi(n=d, m=s0) B_und = _graph_to_adjmat(G_und) B = _random_acyclic_orientation(B_und) elif graph_type == 'SF': # Scale-free, Barabasi-Albert G = ig.Graph.Barabasi(n=d, m=int(round(s0 / d)), directed=True) B = _graph_to_adjmat(G) elif graph_type == 'BP': # Bipartite, Sec 4.1 of (Gu, Fu, Zhou, 2018) top = int(0.2 * d) G = ig.Graph.Random_Bipartite(top, d - top, m=s0, directed=True, neimode=ig.OUT) B = _graph_to_adjmat(G) else: raise ValueError('unknown graph type') B_perm = _random_permutation(B) assert ig.Graph.Adjacency(B_perm.tolist()).is_dag() return B_perm
[docs] def simulate_parameter(B, w_ranges=((-2.0, -0.5), (0.5, 2.0))): """Simulate SEM parameters for a DAG. Args: B (np.ndarray): [d, d] binary adj matrix of DAG w_ranges (tuple): disjoint weight ranges Returns: W (np.ndarray): [d, d] weighted adj matrix of DAG """ W = np.zeros(B.shape) S = np.random.randint(len(w_ranges), size=B.shape) # which range for i, (low, high) in enumerate(w_ranges): U = np.random.uniform(low=low, high=high, size=B.shape) W += B * (S == i) * U return W
[docs] def simulate_linear_sem(W, n, sem_type, noise_scale=None): """Simulate samples from linear SEM with specified type of noise. For uniform, noise z ~ uniform(-a, a), where a = noise_scale. Args: W (np.ndarray): [d, d] weighted adj matrix of DAG n (int): num of samples, n=inf mimics population risk sem_type (str): gauss, exp, gumbel, uniform, logistic, poisson noise_scale (np.ndarray): scale parameter of additive noise, default all ones Returns: X (np.ndarray): [n, d] sample matrix, [d, d] if n=inf """ def _simulate_single_equation(X, w, scale): """X: [n, num of parents], w: [num of parents], x: [n]""" if sem_type == 'gauss': z = np.random.normal(scale=scale, size=n) x = X @ w + z elif sem_type == 'exp': z = np.random.exponential(scale=scale, size=n) x = X @ w + z elif sem_type == 'gumbel': z = np.random.gumbel(scale=scale, size=n) x = X @ w + z elif sem_type == 'uniform': z = np.random.uniform(low=-scale, high=scale, size=n) x = X @ w + z elif sem_type == 'logistic': x = np.random.binomial(1, sigmoid(X @ w)) * 1.0 elif sem_type == 'poisson': x = np.random.poisson(np.exp(X @ w)) * 1.0 else: raise ValueError('unknown sem type') return x d = W.shape[0] if noise_scale is None: scale_vec = np.ones(d) elif np.isscalar(noise_scale): scale_vec = noise_scale * np.ones(d) else: if len(noise_scale) != d: raise ValueError('noise scale must be a scalar or has length d') scale_vec = noise_scale if not is_dag(W): raise ValueError('W must be a DAG') if np.isinf(n): # population risk for linear gauss SEM if sem_type == 'gauss': # make 1/d X'X = true cov X = np.sqrt(d) * np.diag(scale_vec) @ np.linalg.inv(np.eye(d) - W) return X else: raise ValueError('population risk not available') # empirical risk G = ig.Graph.Weighted_Adjacency(W.tolist()) ordered_vertices = G.topological_sorting() assert len(ordered_vertices) == d X = np.zeros([n, d]) for j in ordered_vertices: parents = G.neighbors(j, mode=ig.IN) X[:, j] = _simulate_single_equation(X[:, parents], W[parents, j], scale_vec[j]) return X
[docs] def simulate_nonlinear_sem(B, n, sem_type, noise_scale=None): """Simulate samples from nonlinear SEM. Args: B (np.ndarray): [d, d] binary adj matrix of DAG n (int): num of samples sem_type (str): mlp, mim, gp, gp-add noise_scale (np.ndarray): scale parameter of additive noise, default all ones Returns: X (np.ndarray): [n, d] sample matrix """ def _simulate_single_equation(X, scale): """X: [n, num of parents], x: [n]""" z = np.random.normal(scale=scale, size=n) pa_size = X.shape[1] if pa_size == 0: return z if sem_type == 'mlp': hidden = 100 W1 = np.random.uniform(low=0.5, high=2.0, size=[pa_size, hidden]) W1[np.random.rand(*W1.shape) < 0.5] *= -1 W2 = np.random.uniform(low=0.5, high=2.0, size=hidden) W2[np.random.rand(hidden) < 0.5] *= -1 x = sigmoid(X @ W1) @ W2 + z elif sem_type == 'mim': w1 = np.random.uniform(low=0.5, high=2.0, size=pa_size) w1[np.random.rand(pa_size) < 0.5] *= -1 w2 = np.random.uniform(low=0.5, high=2.0, size=pa_size) w2[np.random.rand(pa_size) < 0.5] *= -1 w3 = np.random.uniform(low=0.5, high=2.0, size=pa_size) w3[np.random.rand(pa_size) < 0.5] *= -1 x = np.tanh(X @ w1) + np.cos(X @ w2) + np.sin(X @ w3) + z elif sem_type == 'gp': from sklearn.gaussian_process import GaussianProcessRegressor gp = GaussianProcessRegressor() x = gp.sample_y(X, random_state=None).flatten() + z elif sem_type == 'gp-add': from sklearn.gaussian_process import GaussianProcessRegressor gp = GaussianProcessRegressor() x = sum([gp.sample_y(X[:, i, None], random_state=None).flatten() for i in range(X.shape[1])]) + z else: raise ValueError('unknown sem type') return x d = B.shape[0] scale_vec = noise_scale if noise_scale else np.ones(d) X = np.zeros([n, d]) G = ig.Graph.Adjacency(B.tolist()) ordered_vertices = G.topological_sorting() assert len(ordered_vertices) == d for j in ordered_vertices: parents = G.neighbors(j, mode=ig.IN) X[:, j] = _simulate_single_equation(X[:, parents], scale_vec[j]) return X
[docs] def count_accuracy(B_true, B_est): """Compute various accuracy metrics for B_est. true positive = predicted association exists in condition in correct direction reverse = predicted association exists in condition in opposite direction false positive = predicted association does not exist in condition Args: B_true (np.ndarray): [d, d] ground truth graph, {0, 1} B_est (np.ndarray): [d, d] estimate, {0, 1, -1}, -1 is undirected edge in CPDAG Returns: fdr: (reverse + false positive) / prediction positive tpr: (true positive) / condition positive fpr: (reverse + false positive) / condition negative shd: undirected extra + undirected missing + reverse nnz: prediction positive """ if (B_est == -1).any(): # cpdag if not ((B_est == 0) | (B_est == 1) | (B_est == -1)).all(): raise ValueError('B_est should take value in {0,1,-1}') if ((B_est == -1) & (B_est.T == -1)).any(): raise ValueError('undirected edge should only appear once') else: # dag if not ((B_est == 0) | (B_est == 1)).all(): raise ValueError('B_est should take value in {0,1}') if not is_dag(B_est): raise ValueError('B_est should be a DAG') d = B_true.shape[0] # linear index of nonzeros pred_und = np.flatnonzero(B_est == -1) pred = np.flatnonzero(B_est == 1) cond = np.flatnonzero(B_true) cond_reversed = np.flatnonzero(B_true.T) cond_skeleton = np.concatenate([cond, cond_reversed]) # true pos true_pos = np.intersect1d(pred, cond, assume_unique=True) # treat undirected edge favorably true_pos_und = np.intersect1d(pred_und, cond_skeleton, assume_unique=True) true_pos = np.concatenate([true_pos, true_pos_und]) # false pos false_pos = np.setdiff1d(pred, cond_skeleton, assume_unique=True) false_pos_und = np.setdiff1d(pred_und, cond_skeleton, assume_unique=True) false_pos = np.concatenate([false_pos, false_pos_und]) # reverse extra = np.setdiff1d(pred, cond, assume_unique=True) reverse = np.intersect1d(extra, cond_reversed, assume_unique=True) # compute ratio pred_size = len(pred) + len(pred_und) cond_neg_size = 0.5 * d * (d - 1) - len(cond) fdr = float(len(reverse) + len(false_pos)) / max(pred_size, 1) tpr = float(len(true_pos)) / max(len(cond), 1) fpr = float(len(reverse) + len(false_pos)) / max(cond_neg_size, 1) # structural hamming distance pred_lower = np.flatnonzero(np.tril(B_est + B_est.T)) cond_lower = np.flatnonzero(np.tril(B_true + B_true.T)) extra_lower = np.setdiff1d(pred_lower, cond_lower, assume_unique=True) missing_lower = np.setdiff1d(cond_lower, pred_lower, assume_unique=True) shd = len(extra_lower) + len(missing_lower) + len(reverse) return {'fdr': fdr, 'tpr': tpr, 'fpr': fpr, 'shd': shd, 'nnz': pred_size}
[docs] def threshold_output(W, desired_edges=None, verbose=False): if desired_edges != None: # Implements binary search for acyclic graph with the desired number of edges ws = sorted([abs(i) for i in np.unique(W)]) best = W done = False mid = int(len(ws)/2.0) floor = 0 ceil = len(ws)-1 while not done: cut = np.array([[0.0 if abs(i) <= ws[mid] else 1.0 for i in j] for j in W]) g = nx.from_numpy_array(cut, create_using=nx.DiGraph()) try: nx.find_cycle(g) floor = mid mid = int((ceil-mid)/2.0) + mid if mid == ceil or mid == floor: done = True except: if nx.number_of_edges(g) == desired_edges: best = cut done = True elif nx.number_of_edges(g) >= desired_edges: best = cut floor = mid mid = int((ceil-mid)/2.0) + mid if mid == ceil or mid == floor: done = True else: best = cut ceil = mid mid = int((floor-mid)/2.0) + floor if mid == ceil or mid == floor: done = True else: ws = sorted([abs(i) for i in np.unique(W)]) best = None for w in ws: cut = np.array([[0.0 if abs(i) <= w else 1.0 for i in j] for j in W]) g = nx.from_numpy_array(cut, create_using=nx.DiGraph()) try: nx.find_cycle(g) except: return cut return best
[docs] def generate_random_dag(num_nodes, num_edges, probabilistic=False, edge_coefficient_range=[0.5, 2.0]): adj_mat = np.zeros([num_nodes, num_nodes]) ranking = np.array([i for i in range(num_nodes)]) np.random.shuffle(ranking) if not probabilistic: for e_i in range(num_edges): open = np.argwhere(adj_mat == 0) open = list(filter(lambda x: ranking[x[0]] < ranking[x[1]], open)) choice = np.random.choice([i for i in range(len(open))]) adj_mat[open[choice][0]][open[choice][1]] = np.random.uniform(edge_coefficient_range[0], edge_coefficient_range[1]) * np.random.choice([-1.0, 1.0]) else: p = num_edges/((num_nodes**2)/2.0) for i in range(num_nodes): for j in range(num_nodes): if ranking[i] < ranking[j]: if np.random.rand() < p: adj_mat[i][j] = np.random.uniform(edge_coefficient_range[0], edge_coefficient_range[1]) * np.random.choice([-1.0, 1.0]) G_true = nx.from_numpy_array(adj_mat, create_using=nx.DiGraph()) return adj_mat, G_true
[docs] def simulate_from_dag_lg(tam, n_sample, mean=0, variance=1): num_nodes = len(tam) def get_value(i, e): if values[i] == None: val = e[i] for j in range(num_nodes): if tam[j][i] != 0.0: val += get_value(j, e) * tam[j][i] values[i] = val return val else: return values[i] simulation_data = [] for i in range(n_sample): errors = np.random.normal(mean, variance, num_nodes) values = [None for _ in range(num_nodes)] for i in range(num_nodes): values[i] = get_value(i, errors) simulation_data.append(values) return simulation_data
[docs] def compare_graphs_undirected(true_graph, estimated_graph): num_edges = len(true_graph[np.where(true_graph != 0.0)]) tam = np.array([[1 if x != 0.0 else 0.0 for x in y] for y in true_graph]) tam_undir = tam + tam.T tam_undir = np.array([[1 if x != 0.0 else 0.0 for x in y] for y in tam_undir]) tam_undir = np.triu(tam_undir) eam = np.array([[1 if x != 0.0 else 0.0 for x in y] for y in estimated_graph]) eam_undir = eam + eam.T eam_undir = [[1 if x > 0 else 0 for x in y] for y in eam_undir] eam_undir = np.triu(eam_undir) tp = len(np.argwhere((tam_undir + eam_undir) == 2)) fp = len(np.argwhere((tam_undir - eam_undir) < 0)) tn = len(np.argwhere((tam_undir + eam_undir) == 0)) fn = num_edges - tp return [tp, fp, tn, fn]
[docs] def compare_graphs_precision(x): if x[0] + x[1] == 0: return 0 return float(x[0]) / float(x[0] + x[1])
[docs] def compare_graphs_recall(x): if x[0] + x[3] == 0: return 0 return float(x[0]) / float(x[0] + x[3])
[docs] def compare_graphs_specificity(x): if x[2] + x[1] == 0: return 0 return float(x[2]) / float(x[2] + x[1])