"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/site/sshimizu06/lingam
"""
# pylint: disable=E1101:no-member, W0201:attribute-defined-outside-init, W0511:fixme
# pylint: disable=C0103:invalid-name
# pylint: disable=C0116:missing-function-docstring
# pylint: disable=R0913:too-many-arguments
# pylint: disable=R0914:too-many-locals, R0915:too-many-statements
# pylint: disable=W0106:expression-not-assigned, R1702:too-many-branches
import numpy as np
import pandas as pd
import networkx as nx
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import scale
from sklearn.utils import check_array
from ...common import utils
from ...metrics.compare_graphs import evaluate_graph
from .base import _BaseLiNGAM
[docs]
class DirectLiNGAM(_BaseLiNGAM):
"""Implementation of DirectLiNGAM Algorithm [1]_ [2]_
References
----------
.. [1] S. Shimizu, T. Inazumi, Y. Sogawa, A. Hyvärinen, Y. Kawahara, T. Washio,
P. O. Hoyer and K. Bollen.
DirectLiNGAM: A direct method for learning a linear non-Gaussian structural
equation model.
Journal of Machine Learning Research, 12(Apr): 1225--1248, 2011.
.. [2] A. Hyvärinen and S. M. Smith. Pairwise likelihood ratios for estimation
of non-Gaussian structural eauation models.
Journal of Machine Learning Research 14:111-152, 2013.
"""
is_fitted_ = False
metrics = None
dag = None
feature_names = None
[docs]
def __init__(
self,
name: str,
random_state=None,
prior_knowledge=None,
apply_prior_knowledge_softly=False,
measure="pwling",
th=0.0,
inverse=False,
absolute_values=False,
verbose=False
):
"""Construct a DirectLiNGAM model.
Parameters
----------
random_state : int, optional (default=None)
``random_state`` is the seed used by the random number generator.
prior_knowledge : array-like, shape (n_features, n_features),
optional (default=None)
Prior knowledge used for causal discovery, where ``n_features`` is the
number of features.
The elements of prior knowledge matrix are defined as follows [1]_:
* ``0`` : :math:`x_i` does not have a directed path to :math:`x_j`
* ``1`` : :math:`x_i` has a directed path to :math:`x_j`
* ``-1`` : No prior knowledge is available to know if either of the two
cases above (0 or 1) is true.
apply_prior_knowledge_softly : boolean, optional (default=False)
If True, apply prior knowledge softly.
measure : {'pwling', 'kernel'}, optional (default='pwling')
Measure to evaluate independence: 'pwling' [2]_ or 'kernel' [1]_.
"""
super().__init__(random_state)
self.name = name
self._Aknw = prior_knowledge
self._apply_prior_knowledge_softly = apply_prior_knowledge_softly
self._measure = measure
self.th = th
self.inverse = inverse
self.absolute_values = absolute_values
self.verbose = verbose
if self._Aknw is not None:
self._Aknw = check_array(self._Aknw)
self._Aknw = np.where(self._Aknw < 0, np.nan, self._Aknw)
# Extract all partial orders in prior knowledge matrix
if not self._apply_prior_knowledge_softly:
self._partial_orders = self._extract_partial_orders(self._Aknw)
[docs]
def fit(self, X):
"""Fit the model to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of features.
Returns
-------
self : object
Returns the instance itself.
"""
self.feature_names = list(X.columns) if hasattr(X, "columns") else None
# Check parameters
X = check_array(X)
n_features = X.shape[1]
if self._Aknw is not None:
if (n_features, n_features) != self._Aknw.shape:
raise ValueError(
"The shape of prior knowledge must be (n_features, n_features)"
)
# Causal discovery
U = np.arange(n_features)
K = []
X_ = np.copy(X)
if self._measure == "kernel":
X_ = scale(X_)
for _ in range(n_features):
if self._measure == "kernel":
m = self._search_causal_order_kernel(X_, U)
else:
m = self._search_causal_order(X_, U)
for i in U:
if i != m:
X_[:, i] = self._residual(X_[:, i], X_[:, m])
K.append(m)
U = U[U != m]
# Update partial orders
if (self._Aknw is not None) and (not self._apply_prior_knowledge_softly):
self._partial_orders = self._partial_orders[
self._partial_orders[:, 0] != m
]
self._causal_order = K
self._estimate_adjacency_matrix(X, prior_knowledge=self._Aknw)
self.dag = utils.graph_from_adjacency(
self.adjacency_matrix_,
node_labels=self.feature_names,
th=self.th,
inverse=self.inverse,
absolute_values=self.absolute_values)
self.is_fitted_ = True
return self
[docs]
def fit_predict(self, train, test, ref_graph: nx.DiGraph = None):
"""Fit the model to X and return the estimated DAG.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of features.
Returns
-------
self : object
Returns the instance itself.
"""
self.fit(train)
if ref_graph:
self.metrics = evaluate_graph(
ref_graph, self.dag, self.feature_names)
return self.dag
def _extract_partial_orders(self, pk):
"""Extract partial orders from prior knowledge."""
path_pairs = np.array(np.where(pk == 1)).transpose()
no_path_pairs = np.array(np.where(pk == 0)).transpose()
# Check for inconsistencies in pairs with path
check_pairs = np.concatenate([path_pairs, path_pairs[:, [1, 0]]])
if len(check_pairs) > 0:
pairs, counts = np.unique(check_pairs, axis=0, return_counts=True)
if len(pairs[counts > 1]) > 0:
raise ValueError(
f"The prior knowledge contains inconsistencies at the following indices: {pairs[counts > 1].tolist()}"
)
# Check for inconsistencies in pairs without path.
# If there are duplicate pairs without path, they cancel out and are not ordered.
check_pairs = np.concatenate([no_path_pairs, no_path_pairs[:, [1, 0]]])
if len(check_pairs) > 0:
pairs, counts = np.unique(check_pairs, axis=0, return_counts=True)
check_pairs = np.concatenate([no_path_pairs, pairs[counts > 1]])
pairs, counts = np.unique(check_pairs, axis=0, return_counts=True)
no_path_pairs = pairs[counts < 2]
check_pairs = np.concatenate([path_pairs, no_path_pairs[:, [1, 0]]])
if len(check_pairs) == 0:
# If no pairs are extracted from the specified prior knowledge,
# discard the prior knowledge.
self._Aknw = None
return None
pairs = np.unique(check_pairs, axis=0)
return pairs[:, [1, 0]] # [to, from] -> [from, to]
def _residual(self, xi, xj):
"""The residual when xi is regressed on xj."""
return xi - (np.cov(xi, xj)[0, 1] / np.var(xj)) * xj
def _entropy(self, u):
"""Calculate entropy using the maximum entropy approximations."""
k1 = 79.047
k2 = 7.4129
gamma = 0.37457
return (1 + np.log(2 * np.pi)) / 2 - k1 * (
np.mean(np.log(np.cosh(u))) - gamma) ** 2 - k2 * (
np.mean(u * np.exp((-(u ** 2)) / 2))) ** 2
def _diff_mutual_info(self, xi_std, xj_std, ri_j, rj_i):
"""Calculate the difference of the mutual informations."""
return (self._entropy(xj_std) + self._entropy(ri_j / np.std(ri_j))) - (
self._entropy(xi_std) + self._entropy(rj_i / np.std(rj_i))
)
def _search_candidate(self, U):
"""Search for candidate features"""
# If no prior knowledge is specified, nothing to do.
if self._Aknw is None:
return U, []
# Apply prior knowledge in a strong way
if not self._apply_prior_knowledge_softly:
Uc = [i for i in U if i not in self._partial_orders[:, 1]]
return Uc, []
# Find exogenous features
Uc = []
for j in U:
index = U[U != j]
if self._Aknw[j][index].sum() == 0:
Uc.append(j)
# Find endogenous features, and then find candidate features
if len(Uc) == 0:
U_end = []
for j in U:
index = U[U != j]
if np.nansum(self._Aknw[j][index]) > 0:
U_end.append(j)
# Find sink features (original)
for i in U:
index = U[U != i]
if self._Aknw[index, i].sum() == 0:
U_end.append(i)
Uc = [i for i in U if i not in set(U_end)]
# make V^(j)
Vj = []
for i in U:
if i in Uc:
continue
if self._Aknw[i][Uc].sum() == 0:
Vj.append(i)
return Uc, Vj
def _search_causal_order(self, X, U):
"""Search the causal ordering."""
Uc, Vj = self._search_candidate(U)
if len(Uc) == 1:
return Uc[0]
M_list = []
for i in Uc:
M = 0
for j in U:
if i != j:
xi_std = (X[:, i] - np.mean(X[:, i])) / np.std(X[:, i])
xj_std = (X[:, j] - np.mean(X[:, j])) / np.std(X[:, j])
ri_j = (
xi_std
if i in Vj and j in Uc
else self._residual(xi_std, xj_std)
)
rj_i = (
xj_std
if j in Vj and i in Uc
else self._residual(xj_std, xi_std)
)
M += np.min(
[0, self._diff_mutual_info(xi_std, xj_std, ri_j, rj_i)]) ** 2
M_list.append(-1.0 * M)
return Uc[np.argmax(M_list)]
def _mutual_information(self, x1, x2, param):
"""Calculate the mutual informations."""
kappa, sigma = param
n = len(x1)
X1 = np.tile(x1, (n, 1))
K1 = np.exp(-1 / (2 * sigma ** 2) *
(X1 ** 2 + X1.T ** 2 - 2 * X1 * X1.T))
X2 = np.tile(x2, (n, 1))
K2 = np.exp(-1 / (2 * sigma ** 2) *
(X2 ** 2 + X2.T ** 2 - 2 * X2 * X2.T))
tmp1 = K1 + n * kappa * np.identity(n) / 2
tmp2 = K2 + n * kappa * np.identity(n) / 2
K_kappa = np.r_[np.c_[tmp1 @ tmp1, K1 @ K2],
np.c_[K2 @ K1, tmp2 @ tmp2]]
D_kappa = np.r_[
np.c_[tmp1 @ tmp1, np.zeros([n, n])
], np.c_[np.zeros([n, n]), tmp2 @ tmp2]
]
sigma_K = np.linalg.svd(K_kappa, compute_uv=False)
sigma_D = np.linalg.svd(D_kappa, compute_uv=False)
return (-1 / 2) * (np.sum(np.log(sigma_K)) - np.sum(np.log(sigma_D)))
def _search_causal_order_kernel(self, X, U):
"""Search the causal ordering by kernel method."""
Uc, Vj = self._search_candidate(U)
if len(Uc) == 1:
return Uc[0]
if X.shape[0] > 1000:
param = [2e-3, 0.5]
else:
param = [2e-2, 1.0]
Tkernels = []
for j in Uc:
Tkernel = 0
for i in U:
if i != j:
ri_j = (
X[:, i]
if j in Vj and i in Uc
else self._residual(X[:, i], X[:, j])
)
Tkernel += self._mutual_information(X[:, j], ri_j, param)
Tkernels.append(Tkernel)
return Uc[np.argmin(Tkernels)]
[docs]
def main(dataset_name,
threshold=0.0,
inverse=True,
absolute_values=True,
input_path="/Users/renero/phd/data/sachs/",
output_path="/Users/renero/phd/output/RC4/sachs/compared/",
save=False):
ref_graph = utils.graph_from_dot_file(f"{input_path}{dataset_name}.dot")
data = pd.read_csv(f"{input_path}{dataset_name}.csv")
scaler = StandardScaler()
data = pd.DataFrame(scaler.fit_transform(data), columns=data.columns)
train = data.sample(frac=0.8, random_state=42)
test = data.drop(train.index)
lingam = DirectLiNGAM(dataset_name)
lingam.fit_predict(train=data, test=None, ref_graph=ref_graph)
for edge in lingam.dag.edges():
print(edge)
print(lingam.metrics)
if __name__ == '__main__':
main("sachs_long", inverse=False, absolute_values=False)