import numpy as np
import networkx as nx
from scipy.special import expit as sigmoid
import igraph as ig
import random
[docs]
def set_random_seed(seed):
random.seed(seed)
np.random.seed(seed)
[docs]
def is_dag(W):
G = ig.Graph.Weighted_Adjacency(W.tolist())
return G.is_dag()
[docs]
def simulate_dag(d, s0, graph_type):
"""Simulate random DAG with some expected number of edges.
Args:
d (int): num of nodes
s0 (int): expected num of edges
graph_type (str): ER, SF, BP
Returns:
B (np.ndarray): [d, d] binary adj matrix of DAG
"""
def _random_permutation(M):
# np.random.permutation permutes first axis only
P = np.random.permutation(np.eye(M.shape[0]))
return P.T @ M @ P
def _random_acyclic_orientation(B_und):
return np.tril(_random_permutation(B_und), k=-1)
def _graph_to_adjmat(G):
return np.array(G.get_adjacency().data)
if graph_type == 'ER':
# Erdos-Renyi
G_und = ig.Graph.Erdos_Renyi(n=d, m=s0)
B_und = _graph_to_adjmat(G_und)
B = _random_acyclic_orientation(B_und)
elif graph_type == 'SF':
# Scale-free, Barabasi-Albert
G = ig.Graph.Barabasi(n=d, m=int(round(s0 / d)), directed=True)
B = _graph_to_adjmat(G)
elif graph_type == 'BP':
# Bipartite, Sec 4.1 of (Gu, Fu, Zhou, 2018)
top = int(0.2 * d)
G = ig.Graph.Random_Bipartite(top, d - top, m=s0, directed=True, neimode=ig.OUT)
B = _graph_to_adjmat(G)
else:
raise ValueError('unknown graph type')
B_perm = _random_permutation(B)
assert ig.Graph.Adjacency(B_perm.tolist()).is_dag()
return B_perm
[docs]
def simulate_parameter(B, w_ranges=((-2.0, -0.5), (0.5, 2.0))):
"""Simulate SEM parameters for a DAG.
Args:
B (np.ndarray): [d, d] binary adj matrix of DAG
w_ranges (tuple): disjoint weight ranges
Returns:
W (np.ndarray): [d, d] weighted adj matrix of DAG
"""
W = np.zeros(B.shape)
S = np.random.randint(len(w_ranges), size=B.shape) # which range
for i, (low, high) in enumerate(w_ranges):
U = np.random.uniform(low=low, high=high, size=B.shape)
W += B * (S == i) * U
return W
[docs]
def simulate_linear_sem(W, n, sem_type, noise_scale=None):
"""Simulate samples from linear SEM with specified type of noise.
For uniform, noise z ~ uniform(-a, a), where a = noise_scale.
Args:
W (np.ndarray): [d, d] weighted adj matrix of DAG
n (int): num of samples, n=inf mimics population risk
sem_type (str): gauss, exp, gumbel, uniform, logistic, poisson
noise_scale (np.ndarray): scale parameter of additive noise, default all ones
Returns:
X (np.ndarray): [n, d] sample matrix, [d, d] if n=inf
"""
def _simulate_single_equation(X, w, scale):
"""X: [n, num of parents], w: [num of parents], x: [n]"""
if sem_type == 'gauss':
z = np.random.normal(scale=scale, size=n)
x = X @ w + z
elif sem_type == 'exp':
z = np.random.exponential(scale=scale, size=n)
x = X @ w + z
elif sem_type == 'gumbel':
z = np.random.gumbel(scale=scale, size=n)
x = X @ w + z
elif sem_type == 'uniform':
z = np.random.uniform(low=-scale, high=scale, size=n)
x = X @ w + z
elif sem_type == 'logistic':
x = np.random.binomial(1, sigmoid(X @ w)) * 1.0
elif sem_type == 'poisson':
x = np.random.poisson(np.exp(X @ w)) * 1.0
else:
raise ValueError('unknown sem type')
return x
d = W.shape[0]
if noise_scale is None:
scale_vec = np.ones(d)
elif np.isscalar(noise_scale):
scale_vec = noise_scale * np.ones(d)
else:
if len(noise_scale) != d:
raise ValueError('noise scale must be a scalar or has length d')
scale_vec = noise_scale
if not is_dag(W):
raise ValueError('W must be a DAG')
if np.isinf(n): # population risk for linear gauss SEM
if sem_type == 'gauss':
# make 1/d X'X = true cov
X = np.sqrt(d) * np.diag(scale_vec) @ np.linalg.inv(np.eye(d) - W)
return X
else:
raise ValueError('population risk not available')
# empirical risk
G = ig.Graph.Weighted_Adjacency(W.tolist())
ordered_vertices = G.topological_sorting()
assert len(ordered_vertices) == d
X = np.zeros([n, d])
for j in ordered_vertices:
parents = G.neighbors(j, mode=ig.IN)
X[:, j] = _simulate_single_equation(X[:, parents], W[parents, j], scale_vec[j])
return X
[docs]
def simulate_nonlinear_sem(B, n, sem_type, noise_scale=None):
"""Simulate samples from nonlinear SEM.
Args:
B (np.ndarray): [d, d] binary adj matrix of DAG
n (int): num of samples
sem_type (str): mlp, mim, gp, gp-add
noise_scale (np.ndarray): scale parameter of additive noise, default all ones
Returns:
X (np.ndarray): [n, d] sample matrix
"""
def _simulate_single_equation(X, scale):
"""X: [n, num of parents], x: [n]"""
z = np.random.normal(scale=scale, size=n)
pa_size = X.shape[1]
if pa_size == 0:
return z
if sem_type == 'mlp':
hidden = 100
W1 = np.random.uniform(low=0.5, high=2.0, size=[pa_size, hidden])
W1[np.random.rand(*W1.shape) < 0.5] *= -1
W2 = np.random.uniform(low=0.5, high=2.0, size=hidden)
W2[np.random.rand(hidden) < 0.5] *= -1
x = sigmoid(X @ W1) @ W2 + z
elif sem_type == 'mim':
w1 = np.random.uniform(low=0.5, high=2.0, size=pa_size)
w1[np.random.rand(pa_size) < 0.5] *= -1
w2 = np.random.uniform(low=0.5, high=2.0, size=pa_size)
w2[np.random.rand(pa_size) < 0.5] *= -1
w3 = np.random.uniform(low=0.5, high=2.0, size=pa_size)
w3[np.random.rand(pa_size) < 0.5] *= -1
x = np.tanh(X @ w1) + np.cos(X @ w2) + np.sin(X @ w3) + z
elif sem_type == 'gp':
from sklearn.gaussian_process import GaussianProcessRegressor
gp = GaussianProcessRegressor()
x = gp.sample_y(X, random_state=None).flatten() + z
elif sem_type == 'gp-add':
from sklearn.gaussian_process import GaussianProcessRegressor
gp = GaussianProcessRegressor()
x = sum([gp.sample_y(X[:, i, None], random_state=None).flatten()
for i in range(X.shape[1])]) + z
else:
raise ValueError('unknown sem type')
return x
d = B.shape[0]
scale_vec = noise_scale if noise_scale else np.ones(d)
X = np.zeros([n, d])
G = ig.Graph.Adjacency(B.tolist())
ordered_vertices = G.topological_sorting()
assert len(ordered_vertices) == d
for j in ordered_vertices:
parents = G.neighbors(j, mode=ig.IN)
X[:, j] = _simulate_single_equation(X[:, parents], scale_vec[j])
return X
[docs]
def count_accuracy(B_true, B_est):
"""Compute various accuracy metrics for B_est.
true positive = predicted association exists in condition in correct direction
reverse = predicted association exists in condition in opposite direction
false positive = predicted association does not exist in condition
Args:
B_true (np.ndarray): [d, d] ground truth graph, {0, 1}
B_est (np.ndarray): [d, d] estimate, {0, 1, -1}, -1 is undirected edge in CPDAG
Returns:
fdr: (reverse + false positive) / prediction positive
tpr: (true positive) / condition positive
fpr: (reverse + false positive) / condition negative
shd: undirected extra + undirected missing + reverse
nnz: prediction positive
"""
if (B_est == -1).any(): # cpdag
if not ((B_est == 0) | (B_est == 1) | (B_est == -1)).all():
raise ValueError('B_est should take value in {0,1,-1}')
if ((B_est == -1) & (B_est.T == -1)).any():
raise ValueError('undirected edge should only appear once')
else: # dag
if not ((B_est == 0) | (B_est == 1)).all():
raise ValueError('B_est should take value in {0,1}')
if not is_dag(B_est):
raise ValueError('B_est should be a DAG')
d = B_true.shape[0]
# linear index of nonzeros
pred_und = np.flatnonzero(B_est == -1)
pred = np.flatnonzero(B_est == 1)
cond = np.flatnonzero(B_true)
cond_reversed = np.flatnonzero(B_true.T)
cond_skeleton = np.concatenate([cond, cond_reversed])
# true pos
true_pos = np.intersect1d(pred, cond, assume_unique=True)
# treat undirected edge favorably
true_pos_und = np.intersect1d(pred_und, cond_skeleton, assume_unique=True)
true_pos = np.concatenate([true_pos, true_pos_und])
# false pos
false_pos = np.setdiff1d(pred, cond_skeleton, assume_unique=True)
false_pos_und = np.setdiff1d(pred_und, cond_skeleton, assume_unique=True)
false_pos = np.concatenate([false_pos, false_pos_und])
# reverse
extra = np.setdiff1d(pred, cond, assume_unique=True)
reverse = np.intersect1d(extra, cond_reversed, assume_unique=True)
# compute ratio
pred_size = len(pred) + len(pred_und)
cond_neg_size = 0.5 * d * (d - 1) - len(cond)
fdr = float(len(reverse) + len(false_pos)) / max(pred_size, 1)
tpr = float(len(true_pos)) / max(len(cond), 1)
fpr = float(len(reverse) + len(false_pos)) / max(cond_neg_size, 1)
# structural hamming distance
pred_lower = np.flatnonzero(np.tril(B_est + B_est.T))
cond_lower = np.flatnonzero(np.tril(B_true + B_true.T))
extra_lower = np.setdiff1d(pred_lower, cond_lower, assume_unique=True)
missing_lower = np.setdiff1d(cond_lower, pred_lower, assume_unique=True)
shd = len(extra_lower) + len(missing_lower) + len(reverse)
return {'fdr': fdr, 'tpr': tpr, 'fpr': fpr, 'shd': shd, 'nnz': pred_size}
[docs]
def threshold_output(W, desired_edges=None, verbose=False):
if desired_edges != None:
# Implements binary search for acyclic graph with the desired number of edges
ws = sorted([abs(i) for i in np.unique(W)])
best = W
done = False
mid = int(len(ws)/2.0)
floor = 0
ceil = len(ws)-1
while not done:
cut = np.array([[0.0 if abs(i) <= ws[mid] else 1.0 for i in j] for j in W])
g = nx.from_numpy_array(cut, create_using=nx.DiGraph())
try:
nx.find_cycle(g)
floor = mid
mid = int((ceil-mid)/2.0) + mid
if mid == ceil or mid == floor:
done = True
except:
if nx.number_of_edges(g) == desired_edges:
best = cut
done = True
elif nx.number_of_edges(g) >= desired_edges:
best = cut
floor = mid
mid = int((ceil-mid)/2.0) + mid
if mid == ceil or mid == floor:
done = True
else:
best = cut
ceil = mid
mid = int((floor-mid)/2.0) + floor
if mid == ceil or mid == floor:
done = True
else:
ws = sorted([abs(i) for i in np.unique(W)])
best = None
for w in ws:
cut = np.array([[0.0 if abs(i) <= w else 1.0 for i in j] for j in W])
g = nx.from_numpy_array(cut, create_using=nx.DiGraph())
try:
nx.find_cycle(g)
except:
return cut
return best
[docs]
def generate_random_dag(num_nodes, num_edges, probabilistic=False, edge_coefficient_range=[0.5, 2.0]):
adj_mat = np.zeros([num_nodes, num_nodes])
ranking = np.array([i for i in range(num_nodes)])
np.random.shuffle(ranking)
if not probabilistic:
for e_i in range(num_edges):
open = np.argwhere(adj_mat == 0)
open = list(filter(lambda x: ranking[x[0]] < ranking[x[1]], open))
choice = np.random.choice([i for i in range(len(open))])
adj_mat[open[choice][0]][open[choice][1]] = np.random.uniform(edge_coefficient_range[0], edge_coefficient_range[1]) * np.random.choice([-1.0, 1.0])
else:
p = num_edges/((num_nodes**2)/2.0)
for i in range(num_nodes):
for j in range(num_nodes):
if ranking[i] < ranking[j]:
if np.random.rand() < p:
adj_mat[i][j] = np.random.uniform(edge_coefficient_range[0], edge_coefficient_range[1]) * np.random.choice([-1.0, 1.0])
G_true = nx.from_numpy_array(adj_mat, create_using=nx.DiGraph())
return adj_mat, G_true
[docs]
def simulate_from_dag_lg(tam, n_sample, mean=0, variance=1):
num_nodes = len(tam)
def get_value(i, e):
if values[i] == None:
val = e[i]
for j in range(num_nodes):
if tam[j][i] != 0.0:
val += get_value(j, e) * tam[j][i]
values[i] = val
return val
else:
return values[i]
simulation_data = []
for i in range(n_sample):
errors = np.random.normal(mean, variance, num_nodes)
values = [None for _ in range(num_nodes)]
for i in range(num_nodes):
values[i] = get_value(i, errors)
simulation_data.append(values)
return simulation_data
[docs]
def compare_graphs_undirected(true_graph, estimated_graph):
num_edges = len(true_graph[np.where(true_graph != 0.0)])
tam = np.array([[1 if x != 0.0 else 0.0 for x in y] for y in true_graph])
tam_undir = tam + tam.T
tam_undir = np.array([[1 if x != 0.0 else 0.0 for x in y] for y in tam_undir])
tam_undir = np.triu(tam_undir)
eam = np.array([[1 if x != 0.0 else 0.0 for x in y] for y in estimated_graph])
eam_undir = eam + eam.T
eam_undir = [[1 if x > 0 else 0 for x in y] for y in eam_undir]
eam_undir = np.triu(eam_undir)
tp = len(np.argwhere((tam_undir + eam_undir) == 2))
fp = len(np.argwhere((tam_undir - eam_undir) < 0))
tn = len(np.argwhere((tam_undir + eam_undir) == 0))
fn = num_edges - tp
return [tp, fp, tn, fn]
[docs]
def compare_graphs_precision(x):
if x[0] + x[1] == 0: return 0
return float(x[0]) / float(x[0] + x[1])
[docs]
def compare_graphs_recall(x):
if x[0] + x[3] == 0: return 0
return float(x[0]) / float(x[0] + x[3])
[docs]
def compare_graphs_specificity(x):
if x[2] + x[1] == 0: return 0
return float(x[2]) / float(x[2] + x[1])