# Copyright 2021 Juan L Gamella
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# pylint: disable=E1101:no-member, W0201:attribute-defined-outside-init, W0511:fixme
# pylint: disable=C0103:invalid-name
# pylint: disable=C0116:missing-function-docstring
# pylint: disable=R0913:too-many-arguments
# pylint: disable=R0914:too-many-locals, R0915:too-many-statements
# pylint: disable=W0106:expression-not-assigned, R1702:too-many-branches
"""The main module, containing the implementation of GES, including
the logic for the insert, delete and turn operators. The
implementation is directly based on two papers:
1. The 2002 GES paper by Chickering, "Optimal Structure
Identification With Greedy Search" -
https://www.jmlr.org/papers/volume3/chickering02b/chickering02b.pdf
2. For the turn operator, the 2012 GIES paper by Hauser & Bühlmann,
"Characterization and Greedy Learning of Interventional Markov
Equivalence Classes of Directed Acyclic Graphs" -
https://www.jmlr.org/papers/volume13/hauser12a/hauser12a.pdf
Further credit is given where due.
Additional modules / packages:
- ges.utils contains auxiliary functions and the logic to transform
a PDAG into a CPDAG, used after each application of an operator.
- ges.scores contains the modules with the score classes:
- ges.scores.decomposable_score contains the base class for
decomposable score classes (see that module for more details).
- ges.scores.gauss_obs_l0_pen contains a cached implementation
of the gaussian likelihood BIC score used in the original GES
paper.
- ges.test contains the modules with the unit tests and tests
comparing against the algorithm's implementation in the R package
'pcalg'.
"""
import numpy as np
import pandas as pd
import networkx as nx
from sklearn.discriminant_analysis import StandardScaler
from .scores import GaussObsL0Pen
from . import utils
from ...common.utils import graph_from_adjacency, graph_from_dot_file
from ...metrics.compare_graphs import evaluate_graph
[docs]
class GES(object):
"""
The GES (Greedy Equivalence Search) algorithm for learning causal
graphs from observational data.
Parameters
----------
phases : list, optional
Which phases of the GES procedure are run, and in which order.
Defaults to `['forward', 'backward', 'turning']`.
iterate : bool, default=False
Indicates whether the given phases should be iterated more than once.
debug : int, optional
Debug level for printing traces. Higher values correspond to increased
verbosity.
Attributes
----------
dag : numpy.ndarray
The adjacency matrix of the estimated CPDAG (completed partially
directed acyclic graph).
ges_adjmat : numpy.ndarray
The adjacency matrix of the estimated PDAG (partially directed acyclic
graph) before completion.
ges_score : float
The score of the estimated CPDAG.
phases : list
The phases of the GES procedure that are run, and in which order.
iterate : bool
Indicates whether the given phases should be iterated more than once.
Methods
-------
fit(data, labels=None, A0=None)
Fit the GES algorithm to the given data.
fit_bic(data, A0=None, iterate=False, debug=0)
Run GES on the given data using the Gaussian BIC score.
forward_step(A, cache, debug=0)
Score and apply the highest scoring insert operator.
backward_step(A, cache, debug=0)
Score and apply the highest scoring delete operator.
Examples
--------
>>> import numpy as np # doctest: +SKIP
>>> from causalexplain.estimators.ges import GES # doctest: +SKIP
>>> data = np.array([[3.23125779, 3.24950062, 13.430682, 24.67939513], # doctest: +SKIP
... [1.90913354, -0.06843781, 6.93957057, 16.10164608],# doctest: +SKIP
... [2.68547149, 1.88351553, 8.78711076, 17.18557716], # doctest: +SKIP
... [0.16850822, 1.48067393, 5.35871419, 11.82895779], # doctest: +SKIP
... [0.07355872, 1.06857039, 2.05006096, 3.07611922]]) # doctest: +SKIP
>>> ges = GES() # doctest: +SKIP
>>> ges.fit(data) # doctest: +SKIP
"""
dag = None
ges_adjmat = None
ges_score = None
phases = None
iterate = None
debug = 0
is_fitted_ = False
feature_names = None
metrics = None
[docs]
def __init__(
self,
name: str,
phases=None,
iterate=False,
debug=0,
verbose=False):
self.name = name
self.phases = [
'forward', 'backward', 'turning'
] if phases is None else phases
self.iterate = iterate
self.debug = int(verbose)
[docs]
def fit(self, X, A0=None):
assert isinstance(X, pd.DataFrame), "Data must be a pandas DataFrame"
self.feature_names = list(X.columns)
X = X.values
self.ges_adjmat, self.ges_score = self.fit_bic(X, A0=A0)
self.dag = graph_from_adjacency(
self.ges_adjmat, node_labels=self.feature_names)
self.is_fitted_ = True
return self
[docs]
def fit_predict(self, train, test, ref_graph: nx.DiGraph = None, A0=None):
"""
Fit the GES algorithm to the given data and return the
adjacency matrix of the estimated CPDAG.
Parameters
----------
data : numpy.ndarray
The n x p array containing the observations, where columns
correspond to variables and rows to observations.
labels : list, optional
The list of variable names. If not given, the columns of
data are used as labels.
A0 : numpy.ndarray, optional
The initial CPDAG on which GES will run, where where `A0[i,j]
!= 0` implies the edge `i -> j` and `A[i,j] != 0 & A[j,i] !=
0` implies the edge `i - j`. Defaults to the empty graph
(i.e. matrix of zeros).
Returns
-------
estimate : numpy.ndarray
The adjacency matrix of the estimated CPDAG.
Raises
------
TypeError:
If the type of some of the parameters was not expected,
e.g. if data is not a numpy array.
ValueError:
If the value of some of the parameters is not appropriate,
e.g. a wrong phase is specified.
"""
self.fit(train, A0)
if ref_graph:
self.metrics = evaluate_graph(
ref_graph, self.dag, self.feature_names)
return self.dag
[docs]
def fit_bic(self, data, A0=None):
"""
Run GES on the given data, using the Gaussian BIC score
(l0-penalized Gaussian Likelihood). The data is not assumed to be
centered, i.e. an intercept is fitted.
To use a custom score, see ges.fit.
Parameters
----------
data : numpy.ndarray
The n x p array containing the observations, where columns
correspond to variables and rows to observations.
A0 : numpy.ndarray, optional
The initial CPDAG on which GES will run, where where `A0[i,j]
!= 0` implies the edge `i -> j` and `A[i,j] != 0 & A[j,i] !=
0` implies the edge `i - j`. Defaults to the empty graph
(i.e. matrix of zeros).
Returns
-------
estimate : numpy.ndarray
The adjacency matrix of the estimated CPDAG.
total_score : float
The score of the estimate.
Raises
------
TypeError:
If the type of some of the parameters was not expected,
e.g. if data is not a numpy array.
ValueError:
If the value of some of the parameters is not appropriate,
e.g. a wrong phase is specified.
Example
-------
Data from a linear-gaussian SCM (generated using
`sempler <https://github.com/juangamella/sempler>`__)
>>> import numpy as np
>>> data = np.array([[3.23125779, 3.24950062, 13.430682, 24.67939513],
... [1.90913354, -0.06843781, 6.93957057, 16.10164608],
... [2.68547149, 1.88351553, 8.78711076, 17.18557716],
... [0.16850822, 1.48067393, 5.35871419, 11.82895779],
... [0.07355872, 1.06857039, 2.05006096, 3.07611922]])
Run GES using the gaussian BIC score:
>>> from causalexplain.estimators.ges import GES # doctest: +SKIP
>>> ges = GES() # doctest: +SKIP
>>> ges.fit_bic(data) # doctest: +SKIP
(array([[0, 1, 1, 0], # doctest: +SKIP
[0, 0, 0, 0], # doctest: +SKIP
[1, 1, 0, 1], # doctest: +SKIP
[0, 1, 1, 0]]), 15.674267611628233) # doctest: +SKIP
"""
# Initialize Gaussian BIC score (precomputes scatter matrices, sets up cache)
cache = GaussObsL0Pen(data)
# Unless indicated otherwise, initialize to the empty graph
A0 = np.zeros((cache.p, cache.p)) if A0 is None else A0
return self._fit(cache, None, A0)
def _fit(
self,
score_class,
completion_algorithm=None,
A0=None):
"""
Run GES using a user defined score.
Parameters
----------
score_class : ges.DecomposableScore
an instance of a class which inherits from
ges.decomposable_score.DecomposableScore (or defines a
local_score function and a p attribute, see
ges.decomposable_score for more info).
completion_algorithm : function, optional
the "completion algorithm" used to go from PDAG to CPDAG after
the application of each operator. If `None`, the algorithm
used in the original GES paper is used.
A0 : np.array, optional
the initial CPDAG on which GES will run, where where A0[i,j]
!= 0 implies i -> j and A[i,j] != 0 & A[j,i] != 0 implies i -
j. Defaults to the empty graph.
Returns
-------
estimate : np.array
the adjacency matrix of the estimated CPDAG
total_score : float
the score of the estimate
Raises
------
ValueError
If no phase is specified.
"""
# Select the desired phases
if len(self.phases) == 0:
raise ValueError("Must specify at least one phase")
# Set the completion algorithm
completion_algorithm = utils.pdag_to_cpdag if completion_algorithm is None else completion_algorithm
# Unless indicated otherwise, initialize to the empty graph
A0 = np.zeros((score_class.p, score_class.p)) if A0 is None else A0
# GES procedure
total_score = 0
A, score_change = A0, np.Inf
# Run each phase
while True:
last_total_score = total_score
for phase in self.phases:
if phase == 'forward':
fun = self.forward_step
elif phase == 'backward':
fun = self.backward_step
elif phase == 'turning':
fun = self.turning_step
else:
raise ValueError(f'Invalid phase "{phase}" specified')
print(f"\nGES {phase} phase start\n"
f"-------------------------") if self.debug else None
while True:
score_change, new_A = fun(
A, score_class, max(0, self.debug - 1))
if score_change > 0:
A = completion_algorithm(new_A)
total_score += score_change
else:
break
print("-----------------------") if self.debug else None
print("GES %s phase end" % phase) if self.debug else None
print("Total score: %0.4f" %
total_score) if self.debug else None
[print(row) for row in A] if self.debug else None
if total_score <= last_total_score or not self.iterate:
break
return A, total_score
[docs]
def forward_step(self, A, cache, debug=0):
"""
Scores all valid insert operators that can be applied to the current
CPDAG A, and applies the highest scoring one.
Parameters
----------
A : np.array
the adjacency matrix of a CPDAG, where A[i,j] != 0 => i -> j
and A[i,j] != 0 & A[j,i] != 0 => i - j.
cache : DecomposableScore
an instance of the score class, which computes the change in
score and acts as cache.
debug : int, optional
if larger than 0, debug are traces printed. Higher values
correspond to increased verbosity
Returns
-------
score : float
the change in score resulting from applying the highest
scoring operator (note, can be smaller than 0).
new_A : np.array
the adjacency matrix of the PDAG resulting from applying the
operator (not yet a CPDAG).
"""
# Construct edge candidates (i.e. edges between non-adjacent nodes)
fro, to = np.where((A + A.T + np.eye(len(A))) == 0)
edge_candidates = list(zip(fro, to))
# For each edge, enumerate and score all valid operators
valid_operators = []
print(" %d candidate edges" %
len(edge_candidates)) if debug > 1 else None
for (x, y) in edge_candidates:
valid_operators += self.score_valid_insert_operators(
x, y, A, cache, debug=max(0, debug - 1))
# Pick the edge/operator with the highest score
if len(valid_operators) == 0:
print(" No valid insert operators remain") if debug else None
return 0, A
else:
scores = [op[0] for op in valid_operators]
score, new_A, x, y, T = valid_operators[np.argmax(scores)]
if debug:
print(
f" Best operator: insert({x}, {y}, {T}) -> ({score:.4f})")
return score, new_A
[docs]
def backward_step(self, A, cache, debug=0):
"""
Scores all valid delete operators that can be applied to the current
CPDAG A, and applies the highest scoring one.
Parameters
----------
A : np.array
the adjacency matrix of a CPDAG, where A[i,j] != 0 => i -> j
and A[i,j] != 0 & A[j,i] != 0 => i - j.
cache : DecomposableScore
an instance of the score class, which computes the change in
score and acts as cache.
debug : int
if larger than 0, debug are traces printed. Higher values
correspond to increased verbosity
Returns
-------
score : float
the change in score resulting from applying the highest
scoring operator (note, can be smaller than 0).
new_A : np.array
the adjacency matrix of the PDAG resulting from applying the
operator (not yet a CPDAG).
"""
# Construct edge candidates:
# - directed edges
# - undirected edges, counted only once
fro, to = np.where(utils.only_directed(A))
directed_edges = zip(fro, to)
fro, to = np.where(utils.only_undirected(A))
undirected_edges = filter(
lambda e: e[0] > e[1], zip(fro, to)) # zip(fro,to)
edge_candidates = list(directed_edges) + list(undirected_edges)
assert len(edge_candidates) == utils.skeleton(A).sum() / 2
# For each edge, enumerate and score all valid operators
valid_operators = []
print(" %d candidate edges" %
len(edge_candidates)) if debug > 1 else None
for (x, y) in edge_candidates:
valid_operators += self.score_valid_delete_operators(
x, y, A, cache, debug=max(0, debug - 1))
# Pick the edge/operator with the highest score
if len(valid_operators) == 0:
print(" No valid delete operators remain") if debug else None
return 0, A
else:
scores = [op[0] for op in valid_operators]
score, new_A, x, y, H = valid_operators[np.argmax(scores)]
if debug:
print(
f" Best operator: delete({x}, {y}, {H}) -> ({score:.4f})")
return score, new_A
[docs]
def turning_step(self, A, cache, debug=0):
"""
Scores all valid turn operators that can be applied to the current
CPDAG A, and applies the highest scoring one.
Parameters
----------
A : np.array
the adjacency matrix of a CPDAG, where A[i,j] != 0 => i -> j
and A[i,j] != 0 & A[j,i] != 0 => i - j.
cache : DecomposableScore
an instance of the score class, which computes the change in
score and acts as cache.
debug : int
if larger than 0, debug are traces printed. Higher values
correspond to increased verbosity
Returns
-------
score : float
the change in score resulting from applying the highest
scoring operator (note, can be smaller than 0).
new_A : np.array
the adjacency matrix of the PDAG resulting from applying the
operator (not yet a CPDAG).
"""
# Construct edge candidates:
# - directed edges, reversed
# - undirected edges
fro, to = np.where(A != 0)
edge_candidates = list(zip(to, fro))
# For each edge, enumerate and score all valid operators
valid_operators = []
if debug > 1:
print(f" {len(edge_candidates)} candidate edges")
for (x, y) in edge_candidates:
valid_operators += self.score_valid_turn_operators(x, y, A, cache,
debug=max(0, debug - 1))
# Pick the edge/operator with the highest score
if len(valid_operators) == 0:
print(" No valid turn operators remain") if debug else None
return 0, A
else:
scores = [op[0] for op in valid_operators]
score, new_A, x, y, C = valid_operators[np.argmax(scores)]
if debug:
print(f" Best operator: turn({x}, {y}, {C}) -> ({score:.4f})")
return score, new_A
[docs]
def insert(self, x, y, T, A):
# --------------------------------------------------------------------
# Insert operator
# 1. definition in function insert
# 2. enumeration logic (to enumerate and score only valid
# operators) function in score_valid_insert_operators
"""
Applies the insert operator:
1) adds the edge x -> y
2) for all t in T, orients the previously undirected edge t -> y
Parameters
----------
x : int
the origin node (i.e. x -> y)
y : int
the target node
T : iterable of ints
a subset of the neighbors of y which are not adjacent to x
A : np.array
the current adjacency matrix
Returns
-------
new_A : np.array
the adjacency matrix resulting from applying the operator
"""
# Check inputs
T = sorted(T)
if A[x, y] != 0 or A[y, x] != 0:
raise ValueError("x=%d and y=%d are already connected" % (x, y))
if len(T) == 0:
pass
elif not (A[T, y].all() and A[y, T].all()):
raise ValueError(
"Not all nodes in T=%s are neighbors of y=%d" % (T, y))
elif A[T, x].any() or A[x, T].any():
raise ValueError(
"Some nodes in T=%s are adjacent to x=%d" % (T, x))
# Apply operator
new_A = A.copy()
# Add edge x -> y
new_A[x, y] = 1
# Orient edges t - y to t -> y, for t in T
new_A[T, y] = 1
new_A[y, T] = 0
return new_A
[docs]
def score_valid_insert_operators(self, x, y, A, cache, debug=0):
"""Generate and score all valid insert(x,y,T) operators involving the edge
x-> y, and all possible subsets T of neighbors of y which
are NOT adjacent to x.
Parameters
----------
x : int
the origin node (i.e. x -> y)
y : int
the target node
A : np.array
the current adjacency matrix
cache : instance of ges.scores.DecomposableScore
the score cache to compute the score of the
operators that are valid
debug : int
if larger than 0, debug are traces printed. Higher values
correspond to increased verbosity
Returns
-------
valid_operators : list of tuples
a list of tubles, each containing a valid operator, its score
and the resulting connectivity matrix
"""
p = len(A)
if A[x, y] != 0 or A[y, x] != 0:
raise ValueError("x=%d and y=%d are already connected" % (x, y))
# One-hot encode all subsets of T0, plus one extra column to mark
# if they pass validity condition 2 (see below)
T0 = sorted(utils.neighbors(y, A) - utils.adj(x, A))
if len(T0) == 0:
subsets = np.zeros((1, p + 1), dtype=bool)
else:
subsets = np.zeros((2 ** len(T0), p + 1), dtype=bool)
subsets[:, T0] = utils.cartesian([np.array([False, True])] * len(T0),
dtype=bool)
valid_operators = []
print(" insert(%d,%d) T0=" % (x, y), set(T0)) if debug > 1 else None
while len(subsets) > 0:
print(" len(subsets)=%d, len(valid_operators)=%d" %
(len(subsets), len(valid_operators))) if debug > 1 else None
# Access the next subset
T = np.where(subsets[0, :-1])[0]
passed_cond_2 = subsets[0, -1]
subsets = subsets[1:]
# Check that the validity conditions hold for T
na_yxT = utils.na(y, x, A) | set(T)
# Condition 1: Test that NA_yx U T is a clique
cond_1 = utils.is_clique(na_yxT, A)
if not cond_1:
# Remove from consideration all other sets T' which
# contain T, as the clique condition will also not hold
supersets = subsets[:, T].all(axis=1)
subsets = utils.delete(subsets, supersets, axis=0)
# Condition 2: Test that all semi-directed paths from y to x contain a
# member from NA_yx U T
if passed_cond_2:
# If a subset of T satisfied condition 2, so does T
cond_2 = True
else:
# Check condition 2
cond_2 = True
for path in utils.semi_directed_paths(y, x, A):
if len(na_yxT & set(path)) == 0:
cond_2 = False
break
if cond_2:
# If condition 2 holds for NA_yx U T, then it holds for all supersets of T
supersets = subsets[:, T].all(axis=1)
subsets[supersets, -1] = True
print(" insert(%d,%d,%s)" % (x, y, T), "na_yx U T = ",
na_yxT, "validity:", cond_1, cond_2) if debug > 1 else None
# If both conditions hold, apply operator and compute its score
if cond_1 and cond_2:
# Apply operator
new_A = self.insert(x, y, T, A)
# Compute the change in score
aux = na_yxT | utils.pa(y, A)
old_score = cache.local_score(y, aux)
new_score = cache.local_score(y, aux | {x})
print(" new: s(%d, %s) = %0.6f old: s(%d, %s) = %0.6f" %
(y, aux | {x}, new_score, y, aux, old_score)) if debug > 1 else None
# Add to the list of valid operators
valid_operators.append((new_score - old_score, new_A, x, y, T))
print(" insert(%d,%d,%s) -> %0.16f" %
(x, y, T, new_score - old_score)) if debug else None
# Return all the valid operators
return valid_operators
[docs]
def delete_operator(self, x, y, H, A):
# --------------------------------------------------------------------
# Delete operator
# 1. definition in function delete
# 2. enumeration logic (to enumerate and score only valid
# operators) function in valid_delete_operators
"""
Applies the delete operator:
1) deletes the edge x -> y or x - y
2) for every node h in H
* orients the edge y -> h
* if the edge with x is undirected, orients it as x -> h
Note that H must be a subset of the neighbors of y which are
adjacent to x. A ValueError exception is thrown otherwise.
Parameters
----------
x : int
the "origin" node (i.e. x -> y or x - y)
y : int
the "target" node
H : iterable of ints
a subset of the neighbors of y which are adjacent to x
A : np.array
the current adjacency matrix
Returns
-------
new_A : np.array
the adjacency matrix resulting from applying the operator
"""
H = set(H)
# Check inputs
if A[x, y] == 0:
raise ValueError(
"There is no (un)directed edge from x=%d to y=%d" % (x, y))
# neighbors of y which are adjacent to x
na_yx = utils.na(y, x, A)
if not H <= na_yx:
raise ValueError(
"The given set H is not valid, H=%s is not a subset of NA_yx=%s" % (
H, na_yx))
# Apply operator
new_A = A.copy()
# delete the edge between x and y
new_A[x, y], new_A[y, x] = 0, 0
# orient the undirected edges between y and H towards H
new_A[list(H), y] = 0
# orient any undirected edges between x and H towards H
n_x = utils.neighbors(x, A)
new_A[list(H & n_x), x] = 0
return new_A
[docs]
def score_valid_delete_operators(self, x, y, A, cache, debug=0):
"""Generate and score all valid delete(x,y,H) operators involving the edge
x -> y or x - y, and all possible subsets H of neighbors of y which
are adjacent to x.
Parameters
----------
x : int
the "origin" node (i.e. x -> y or x - y)
y : int
the "target" node
A : np.array
the current adjacency matrix
cache : instance of ges.scores.DecomposableScore
the score cache to compute the score of the
operators that are valid
debug : int
if larger than 0, debug are traces printed. Higher values
correspond to increased verbosity
Returns
-------
valid_operators : list of tuples
a list of tubles, each containing a valid operator, its score
and the resulting connectivity matrix
"""
# Check inputs
if A[x, y] == 0:
raise ValueError(
"There is no (un)directed edge from x=%d to y=%d" % (x, y))
# One-hot encode all subsets of H0, plus one column to mark if
# they have already passed the validity condition
na_yx = utils.na(y, x, A)
H0 = sorted(na_yx)
p = len(A)
if len(H0) == 0:
subsets = np.zeros((1, (p + 1)), dtype=bool)
else:
subsets = np.zeros((2 ** len(H0), (p + 1)), dtype=bool)
subsets[:, H0] = utils.cartesian([np.array([False, True])] * len(H0),
dtype=bool)
valid_operators = []
print(" delete(%d,%d) H0=" % (x, y), set(H0)) if debug > 1 else None
while len(subsets) > 0:
print(" len(subsets)=%d, len(valid_operators)=%d" %
(len(subsets), len(valid_operators))) if debug > 1 else None
# Access the next subset
H = np.where(subsets[0, :-1])[0]
cond_1 = subsets[0, -1]
subsets = subsets[1:]
# Check if the validity condition holds for H, i.e. that
# NA_yx \ H is a clique.
# If it has not been tested previously for a subset of H,
# check it now
if not cond_1 and utils.is_clique(na_yx - set(H), A):
cond_1 = True
# For all supersets H' of H, the validity condition will also hold
supersets = subsets[:, H].all(axis=1)
subsets[supersets, -1] = True
# If the validity condition holds, apply operator and compute its score
print(" delete(%d,%d,%s)" % (x, y, H), "na_yx - H = ",
na_yx - set(H), "validity:", cond_1) if debug > 1 else None
if cond_1:
# Apply operator
new_A = self.delete_operator(x, y, H, A)
# Compute the change in score
aux = (na_yx - set(H)) | utils.pa(y, A) | {x}
# print(x,y,H,"na_yx:",na_yx,"old:",aux,"new:", aux - {x})
old_score = cache.local_score(y, aux)
new_score = cache.local_score(y, aux - {x})
print(" new: s(%d, %s) = %0.6f old: s(%d, %s) = %0.6f" %
(y, aux - {x}, new_score, y, aux, old_score)) if debug > 1 else None
# Add to the list of valid operators
valid_operators.append((new_score - old_score, new_A, x, y, H))
print(" delete(%d,%d,%s) -> %0.16f" %
(x, y, H, new_score - old_score)) if debug else None
# Return all the valid operators
return valid_operators
[docs]
def turn(self, x, y, C, A):
# --------------------------------------------------------------------
# Turn operator
# 1. definition in function turn
# 2. enumeration logic (to enumerate and score only valid
# operators) function in score_valid_insert_operators
"""
Applies the turning operator: For an edge x - y or x <- y,
1) orients the edge as x -> y
2) for all c in C, orients the previously undirected edge c -> y
Parameters
----------
x : int
the origin node (i.e. x -> y)
y : int
the target node
C : iterable of ints
a subset of the neighbors of y
A : np.array
the current adjacency matrix
Returns
-------
new_A : np.array
the adjacency matrix resulting from applying the operator
"""
# Check inputs
if A[x, y] != 0 and A[y, x] == 0:
raise ValueError("The edge %d -> %d is already exists" % (x, y))
if A[x, y] == 0 and A[y, x] == 0:
raise ValueError("x=%d and y=%d are not connected" % (x, y))
if not C <= utils.neighbors(y, A):
raise ValueError(
"Not all nodes in C=%s are neighbors of y=%d" % (C, y))
if len({x, y} & C) > 0:
raise ValueError("C should not contain x or y")
# Apply operator
new_A = A.copy()
# Turn edge x -> y
new_A[y, x] = 0
new_A[x, y] = 1
# Orient edges c -> y for c in C
new_A[y, list(C)] = 0
return new_A
[docs]
def score_valid_turn_operators(self, x, y, A, cache, debug=0):
"""Generate and score all valid turn(x,y,C) operators that can be
applied to the edge x <- y or x - y, iterating through the valid
subsets C of neighbors of y.
Parameters
----------
x : int
the origin node (i.e. orient x -> y)
y : int
the target node
A : np.array
the current adjacency matrix
cache : instance of ges.scores.DecomposableScore
the score cache to compute the score of the
operators that are valid
debug : int
if larger than 0, debug are traces printed. Higher values
correspond to increased verbosity
Returns
-------
valid_operators : list of tuples
a list of tubles, each containing a valid operator, its score
and the resulting connectivity matrix
"""
if A[x, y] != 0 and A[y, x] == 0:
raise ValueError(f"The edge {x} -> {y} already exists")
if A[x, y] == 0 and A[y, x] == 0:
raise ValueError(f"x={x} and y={y} are not connected")
# Different validation/scoring logic when the edge to be turned is
# essential (x <- x) or not (x - y)
if A[x, y] != 0 and A[y, x] != 0:
return self.score_valid_turn_operators_undir(x, y, A, cache, debug=debug)
return self.score_valid_turn_operators_dir(x, y, A, cache, debug=debug)
[docs]
def score_valid_turn_operators_dir(self, x, y, A, cache, debug=0):
"""Logic for finding and scoring the valid turn operators that can be
applied to the edge x <- y.
Parameters
----------
x : int
the origin node (i.e. x -> y)
y : int
the target node
A : np.array
the current adjacency matrix
cache : instance of ges.scores.DecomposableScore
the score cache to compute the score of the
operators that are valid
debug : bool or string
if debug traces should be printed (True/False). If a non-empty
string is passed, traces are printed with the given string as
prefix (useful for indenting the prints from a calling
function)
Returns
-------
valid_operators : list of tuples
a list of tubles, each containing a valid operator, its score
and the resulting connectivity matrix
"""
# One-hot encode all subsets of T0, plus one extra column to mark
# if they pass validity condition 2 (see below). The set C passed
# to the turn operator will be C = NAyx U T.
p = len(A)
T0 = sorted(utils.neighbors(y, A) - utils.adj(x, A))
if len(T0) == 0:
subsets = np.zeros((1, p + 1), dtype=bool)
else:
subsets = np.zeros((2 ** len(T0), p + 1), dtype=bool)
subsets[:, T0] = utils.cartesian([np.array([False, True])] * len(T0),
dtype=bool)
valid_operators = []
print(" turn(%d,%d) T0=" % (x, y), set(T0)) if debug > 1 else None
while len(subsets) > 0:
print(" len(subsets)=%d, len(valid_operators)=%d" %
(len(subsets), len(valid_operators))) if debug > 1 else None
# Access the next subset
T = np.where(subsets[0, :-1])[0]
passed_cond_2 = subsets[0, -1]
subsets = subsets[1:] # update the list of remaining subsets
# Check that the validity conditions hold for T
C = utils.na(y, x, A) | set(T)
# Condition 1: Test that C = NA_yx U T is a clique
cond_1 = utils.is_clique(C, A)
if not cond_1:
# Remove from consideration all other sets T' which
# contain T, as the clique condition will also not hold
supersets = subsets[:, T].all(axis=1)
subsets = utils.delete(subsets, supersets, axis=0)
# Condition 2: Test that all semi-directed paths from y to x contain a
# member from C U neighbors(x)
if passed_cond_2:
# If a subset of T satisfied condition 2, so does T
cond_2 = True
else:
# otherwise, check condition 2
cond_2 = True
for path in utils.semi_directed_paths(y, x, A):
if path == [y, x]:
pass
elif len((C | utils.neighbors(x, A)) & set(path)) == 0:
cond_2 = False
break
if cond_2:
# If condition 2 holds for C U neighbors(x), that is,
# for C = NAyx U T U neighbors(x), then it holds for
# all supersets of T
supersets = subsets[:, T].all(axis=1)
subsets[supersets, -1] = True
# If both conditions hold, apply operator and compute its score
print(" turn(%d,%d,%s)" % (x, y, C), "na_yx =", utils.na(y, x, A),
"T =", T, "validity:", cond_1, cond_2) if debug > 1 else None
if cond_1 and cond_2:
# Apply operator
new_A = self.turn(x, y, C, A)
# Compute the change in score
new_score = cache.local_score(y, utils.pa(
y, A) | C | {x}) + cache.local_score(x, utils.pa(x, A) - {y})
old_score = cache.local_score(y, utils.pa(y, A) | C) + \
cache.local_score(x, utils.pa(x, A))
print(" new score = %0.6f, old score = %0.6f, y=%d, C=%s" %
(new_score, old_score, y, C)) if debug > 1 else None
# Add to the list of valid operators
valid_operators.append((new_score - old_score, new_A, x, y, C))
print(" turn(%d,%d,%s) -> %0.16f" %
(x, y, C, new_score - old_score)) if debug else None
# Return all the valid operators
return valid_operators
[docs]
def score_valid_turn_operators_undir(self, x, y, A, cache, debug=0):
"""Logic for finding and scoring the valid turn operators that can be
applied to the edge x - y.
Parameters
----------
x : int
the origin node (i.e. x -> y)
y : int
the target node
A : np.array
the current adjacency matrix
cache : instance of ges.scores.DecomposableScore
the score cache to compute the score of the
operators that are valid
debug : bool or string
if debug traces should be printed (True/False). If a non-empty
string is passed, traces are printed with the given string as
prefix (useful for indenting the prints from a calling
function)
Returns
-------
valid_operators : list of tuples
a list of tubles, each containing a valid operator, its score
and the resulting connectivity matrix
"""
# Proposition 31, condition (ii) in GIES paper (Hauser & Bühlmann
# 2012) is violated if:
# 1. all neighbors of y are adjacent to x, or
# 2. y has no neighbors (besides u)
# then there are no valid operators.
non_adjacents = list(utils.neighbors(y, A) - utils.adj(x, A) - {x})
if len(non_adjacents) == 0:
print(" turn(%d,%d) : ne(y) \\ adj(x) = Ø => stopping" % (
x, y)) if debug > 1 else None
return []
# Otherwise, construct all the possible subsets which will satisfy
# condition (ii), i.e. all subsets of neighbors of y with at least
# one which is not adjacent to x
p = len(A)
C0 = sorted(utils.neighbors(y, A) - {x})
subsets = np.zeros((2 ** len(C0), p + 1), dtype=bool)
subsets[:, C0] = utils.cartesian(
[np.array([False, True])] * len(C0), dtype=bool)
# Remove all subsets which do not contain at least one non-adjacent node to x
to_remove = (subsets[:, non_adjacents] == False).all(axis=1)
subsets = utils.delete(subsets, to_remove, axis=0)
# With condition (ii) guaranteed, we now check conditions (i,iii)
# for each subset
valid_operators = []
print(" turn(%d,%d) C0=" % (x, y), set(C0)) if debug > 1 else None
while len(subsets) > 0:
print(" len(subsets)=%d, len(valid_operators)=%d" %
(len(subsets), len(valid_operators))) if debug > 1 else None
# Access the next subset
C = set(np.where(subsets[0, :])[0])
subsets = subsets[1:]
# Condition (i): C is a clique in the subgraph induced by the
# chain component of y. Because C is composed of neighbors of
# y, this is equivalent to C being a clique in A. NOTE: This
# is also how it is described in Alg. 5 of the paper
cond_1 = utils.is_clique(C, A)
if not cond_1:
# Remove from consideration all other sets C' which
# contain C, as the clique condition will also not hold
supersets = subsets[:, list(C)].all(axis=1)
subsets = utils.delete(subsets, supersets, axis=0)
continue
# Condition (iii): Note that condition (iii) from proposition
# 31 appears to be wrong in the GIES paper; instead we use the
# definition of condition (iii) from Alg. 5 of the paper:
# Let na_yx (N in the GIES paper) be the neighbors of Y which
# are adjacent to X. Then, {x,y} must separate C and na_yx \ C
# in the subgraph induced by the chain component of y,
# i.e. all the simple paths from one set to the other contain
# a node in {x,y}.
subgraph = utils.induced_subgraph(utils.chain_component(y, A), A)
na_yx = utils.na(y, x, A)
if not utils.separates({x, y}, C, na_yx - C, subgraph):
continue
# At this point C passes both conditions
# Apply operator
new_A = self.turn(x, y, C, A)
# Compute the change in score
new_score = cache.local_score(y, utils.pa(
y, A) | C | {x}) + cache.local_score(x, utils.pa(x, A) | (C & na_yx))
old_score = cache.local_score(y, utils.pa(y, A) | C) + \
cache.local_score(x, utils.pa(x, A) | (C & na_yx) | {y})
print(" new score = %0.6f, old score = %0.6f, y=%d, C=%s" %
(new_score, old_score, y, C)) if debug > 1 else None
# Add to the list of valid operators
valid_operators.append((new_score - old_score, new_A, x, y, C))
print(" turn(%d,%d,%s) -> %0.16f" % (
x, y, C, new_score - old_score)) if debug else None
# Return all valid operators
return valid_operators
[docs]
def main(dataset_name,
input_path="/Users/renero/phd/data/sachs/",
output_path="/Users/renero/phd/output/RC4/sachs/compared",
save=False):
ref_graph = graph_from_dot_file(f"{input_path}{dataset_name}.dot")
data = pd.read_csv(f"{input_path}{dataset_name}.csv")
scaler = StandardScaler()
data = pd.DataFrame(scaler.fit_transform(data), columns=data.columns)
ges = GES(dataset_name, phases=["forward", "backward", "turning"], debug=0)
ges.fit_predict(train=data, test=None, ref_graph=ref_graph)
for edge in ges.dag.edges():
print(edge)
print(ges.metrics)
# if save:
# where_to = utils.save_experiment(rex.name, output_path, rex)
# print(f"Saved '{rex.name}' to '{where_to}'")
if __name__ == "__main__":
main("sachs_long")