import itertools
import networkx as nx
import numpy as np
from .independencies import Independencies
[docs]
class DAG(nx.DiGraph):
"""
Base class for all Directed Graphical Models.
Each node in the graph can represent either a random variable, `Factor`,
or a cluster of random variables. Edges in the graph represent the
dependencies between these.
Parameters
----------
data: input graph
Data to initialize graph. If data=None (default) an empty graph is
created. The data can be an edge list or any Networkx graph object.
"""
[docs]
def __init__(self, ebunch=None, latents=set()):
super(DAG, self).__init__(ebunch)
self.latents = set(latents)
cycles = []
try:
cycles = list(nx.find_cycle(self))
except nx.NetworkXNoCycle:
pass
else:
out_str = "Cycles are not allowed in a DAG."
out_str += "\nEdges indicating the path taken for a loop: "
out_str += "".join([f"({u},{v}) " for (u, v) in cycles])
raise ValueError(out_str)
[docs]
def add_node(self, node, weight=None, latent=False):
"""
Adds a single node to the Graph.
Parameters
----------
node: str, int, or any hashable python object.
The node to add to the graph.
weight: int, float
The weight of the node.
latent: boolean (default: False)
Specifies whether the variable is latent or not.
"""
# Check for networkx 2.0 syntax
if isinstance(node, tuple) and len(node) == 2 and isinstance(node[1], dict):
node, attrs = node
if attrs.get("weight", None) is not None:
attrs["weight"] = weight
else:
attrs = {"weight": weight}
if latent:
self.latents.add(node)
super(DAG, self).add_node(node, weight=weight)
[docs]
def add_nodes_from(self, nodes, weights=None, latent=False):
"""
Add multiple nodes to the Graph.
**The behviour of adding weights is different than in networkx.
Parameters
----------
nodes: iterable container
A container of nodes (list, dict, set, or any hashable python
object).
weights: list, tuple (default=None)
A container of weights (int, float). The weight value at index i
is associated with the variable at index i.
latent: list, tuple (default=False)
A container of boolean. The value at index i tells whether the
node at index i is latent or not.
"""
nodes = list(nodes)
if isinstance(latent, bool):
latent = [latent] * len(nodes)
if weights:
if len(nodes) != len(weights):
raise ValueError(
"The number of elements in nodes and weights" "should be equal."
)
for index in range(len(nodes)):
self.add_node(
node=nodes[index], weight=weights[index], latent=latent[index]
)
else:
for index in range(len(nodes)):
self.add_node(node=nodes[index], latent=latent[index])
[docs]
def add_edge(self, u, v, weight=None):
"""
Add an edge between u and v.
The nodes u and v will be automatically added if they are
not already in the graph.
Parameters
----------
u, v : nodes
Nodes can be any hashable Python object.
weight: int, float (default=None)
The weight of the edge
"""
super(DAG, self).add_edge(u, v, weight=weight)
[docs]
def add_edges_from(self, ebunch, weights=None):
"""
Add all the edges in ebunch.
If nodes referred in the ebunch are not already present, they
will be automatically added. Node names can be any hashable python
object.
**The behavior of adding weights is different than networkx.
Parameters
----------
ebunch : container of edges
Each edge given in the container will be added to the graph.
The edges must be given as 2-tuples (u, v).
weights: list, tuple (default=None)
A container of weights (int, float). The weight value at index i
is associated with the edge at index i.
"""
ebunch = list(ebunch)
if weights:
if len(ebunch) != len(weights):
raise ValueError(
"The number of elements in ebunch and weights" "should be equal"
)
for index in range(len(ebunch)):
self.add_edge(ebunch[index][0], ebunch[index]
[1], weight=weights[index])
else:
for edge in ebunch:
self.add_edge(edge[0], edge[1])
[docs]
def get_parents(self, node):
"""
Returns a list of parents of node.
Throws an error if the node is not present in the graph.
Parameters
----------
node: string, int or any hashable python object.
The node whose parents would be returned.
"""
return list(self.predecessors(node))
[docs]
def moralize(self):
"""
Removes all the immoralities in the DAG and creates a moral
graph (UndirectedGraph).
A v-structure X->Z<-Y is an immorality if there is no directed edge
between X and Y.
"""
moral_graph = UndirectedGraph()
moral_graph.add_nodes_from(self.nodes())
moral_graph.add_edges_from(self.to_undirected().edges())
for node in self.nodes():
moral_graph.add_edges_from(
itertools.combinations(self.get_parents(node), 2)
)
return moral_graph
[docs]
def get_leaves(self):
"""
Returns a list of leaves of the graph.
"""
return [node for node, out_degree in self.out_degree_iter() if out_degree == 0]
[docs]
def out_degree_iter(self, nbunch=None, weight=None):
if nx.__version__.startswith("1"):
return super(DAG, self).out_degree_iter(nbunch, weight)
else:
return iter(self.out_degree(nbunch, weight))
[docs]
def in_degree_iter(self, nbunch=None, weight=None):
if nx.__version__.startswith("1"):
return super(DAG, self).in_degree_iter(nbunch, weight)
else:
return iter(self.in_degree(nbunch, weight))
[docs]
def get_roots(self):
"""
Returns a list of roots of the graph.
"""
return [
node for node, in_degree in dict(self.in_degree()).items() if in_degree == 0
]
[docs]
def get_children(self, node):
"""
Returns a list of children of node.
Throws an error if the node is not present in the graph.
Parameters
----------
node: string, int or any hashable python object.
The node whose children would be returned.
"""
return list(self.successors(node))
[docs]
def get_independencies(self, latex=False, include_latents=False):
"""
Computes independencies in the DAG, by checking d-seperation.
Parameters
----------
latex: boolean
If latex=True then latex string of the independence assertion
would be created.
include_latents: boolean
If True, includes latent variables in the independencies. Otherwise,
only generates independencies on observed variables.
"""
nodes = set(self.nodes())
if not include_latents:
nodes = set(self.nodes()) - self.latents
independencies = Independencies()
for start in nodes:
if not include_latents:
rest = set(self.nodes()) - {start} - self.latents
else:
rest = set(self.nodes()) - {start}
for r in range(len(rest)):
for observed in itertools.combinations(rest, r):
d_seperated_variables = (
rest
- set(observed)
- set(
self.active_trail_nodes(
start,
observed=observed,
include_latents=include_latents,
)[start]
)
)
if d_seperated_variables:
independencies.add_assertions(
[start, d_seperated_variables, observed]
)
independencies.reduce()
if not latex:
return independencies
else:
return independencies.latex_string()
[docs]
def local_independencies(self, variables):
"""
Returns an instance of Independencies containing the local independencies
of each of the variables.
Parameters
----------
variables: str or array like
variables whose local independencies are to be found.
"""
independencies = Independencies()
for variable in (
variables if isinstance(variables, (list, tuple)) else [variables]
):
non_descendents = (
set(self.nodes())
- {variable}
- set(nx.dfs_preorder_nodes(self, variable))
)
parents = set(self.get_parents(variable))
if non_descendents - parents:
independencies.add_assertions(
[variable, non_descendents - parents, parents]
)
return independencies
[docs]
def is_iequivalent(self, model):
"""
Checks whether the given model is I-equivalent
Two graphs G1 and G2 are said to be I-equivalent if they have same skeleton
and have same set of immoralities.
Parameters
----------
model : A DAG object, for which you want to check I-equivalence
Returns
--------
boolean : True if both are I-equivalent, False otherwise
"""
if not isinstance(model, DAG):
raise TypeError(
f"Model must be an instance of DAG. Got type: {type(model)}"
)
if (self.to_undirected().edges() == model.to_undirected().edges()) and (
self.get_immoralities() == model.get_immoralities()
):
return True
return False
[docs]
def get_immoralities(self):
"""
Finds all the immoralities in the model
A v-structure X -> Z <- Y is an immorality if there is no direct edge between X and Y .
Returns
-------
set: A set of all the immoralities in the model
"""
immoralities = set()
for node in self.nodes():
for parents in itertools.combinations(self.predecessors(node), 2):
if not self.has_edge(parents[0], parents[1]) and not self.has_edge(
parents[1], parents[0]
):
immoralities.add(tuple(sorted(parents)))
return immoralities
[docs]
def is_dconnected(self, start, end, observed=None):
"""
Returns True if there is an active trail (i.e. d-connection) between
`start` and `end` node given that `observed` is observed.
Parameters
----------
start, end : int, str, any hashable python object.
The nodes in the DAG between which to check the d-connection/active trail.
observed : list, array-like (optional)
If given the active trail would be computed assuming these nodes to
be observed.
"""
if end in self.active_trail_nodes(start, observed)[start]:
return True
else:
return False
[docs]
def minimal_dseparator(self, start, end):
"""
Finds the minimal d-separating set for `start` and `end`.
Parameters
----------
start: node
The first node.
end: node
The second node.
References
----------
[1] Algorithm 4, Page 10: Tian, Jin, Azaria Paz, and Judea Pearl. Finding
minimal d-separators. Computer Science Department, University of California,
1998.
"""
if (end in self.neighbors(start)) or (start in self.neighbors(end)):
raise ValueError(
"No possible separators because start and end are adjacent"
)
an_graph = self.get_ancestral_graph([start, end])
separator = set(
itertools.chain(self.predecessors(start), self.predecessors(end))
)
# If any of the parents were latents, take the latent's parent
while len(separator.intersection(self.latents)) != 0:
separator_copy = separator.copy()
for u in separator:
if u in self.latents:
separator_copy.remove(u)
separator_copy.update(set(self.predecessors(u)))
separator = separator_copy
# Remove the start and end nodes in case it reaches there while removing latents.
separator.difference_update({start, end})
# If the initial set is not able to d-separate, no d-separator is possible.
if an_graph.is_dconnected(start, end, observed=separator):
return None
# Go through the separator set, remove one element and check if it remains
# a dseparating set.
minimal_separator = separator.copy()
for u in separator:
if not an_graph.is_dconnected(start, end, observed=minimal_separator - {u}):
minimal_separator.remove(u)
return minimal_separator
[docs]
def get_markov_blanket(self, node):
"""
Returns a markov blanket for a random variable. In the case
of Bayesian Networks, the markov blanket is the set of
node's parents, its children and its children's other parents.
Returns
-------
list(blanket_nodes): List of nodes contained in Markov Blanket
Parameters
----------
node: string, int or any hashable python object.
The node whose markov blanket would be returned.
"""
children = self.get_children(node)
parents = self.get_parents(node)
blanket_nodes = children + parents
for child_node in children:
blanket_nodes.extend(self.get_parents(child_node))
blanket_nodes = set(blanket_nodes)
blanket_nodes.discard(node)
return list(blanket_nodes)
[docs]
def active_trail_nodes(self, variables, observed=None, include_latents=False):
"""
Returns a dictionary with the given variables as keys and all the nodes reachable
from that respective variable as values.
Parameters
----------
variables: str or array like
variables whose active trails are to be found.
observed : List of nodes (optional)
If given the active trails would be computed assuming these nodes to be
observed.
include_latents: boolean (default: False)
Whether to include the latent variables in the returned active trail nodes.
References
----------
Details of the algorithm can be found in 'Probabilistic Graphical Model
Principles and Techniques' - Koller and Friedman
Page 75 Algorithm 3.1
"""
if observed:
if isinstance(observed, set):
observed = list(observed)
observed_list = (
observed if isinstance(observed, (list, tuple)) else [observed]
)
else:
observed_list = []
ancestors_list = self._get_ancestors_of(observed_list)
# Direction of flow of information
# up -> from parent to child
# down -> from child to parent
active_trails = {}
for start in variables if isinstance(variables, (list, tuple)) else [variables]:
visit_list = set()
visit_list.add((start, "up"))
traversed_list = set()
active_nodes = set()
while visit_list:
node, direction = visit_list.pop()
if (node, direction) not in traversed_list:
if node not in observed_list:
active_nodes.add(node)
traversed_list.add((node, direction))
if direction == "up" and node not in observed_list:
for parent in self.predecessors(node):
visit_list.add((parent, "up"))
for child in self.successors(node):
visit_list.add((child, "down"))
elif direction == "down":
if node not in observed_list:
for child in self.successors(node):
visit_list.add((child, "down"))
if node in ancestors_list:
for parent in self.predecessors(node):
visit_list.add((parent, "up"))
if include_latents:
active_trails[start] = active_nodes
else:
active_trails[start] = active_nodes - self.latents
return active_trails
def _get_ancestors_of(self, nodes):
"""
Returns a dictionary of all ancestors of all the observed nodes including the
node itself.
Parameters
----------
nodes: string, list-type
name of all the observed nodes
"""
if not isinstance(nodes, (list, tuple)):
nodes = [nodes]
for node in nodes:
if node not in self.nodes():
raise ValueError(f"Node {node} not in not in graph")
ancestors_list = set()
nodes_list = set(nodes)
while nodes_list:
node = nodes_list.pop()
if node not in ancestors_list:
nodes_list.update(self.predecessors(node))
ancestors_list.add(node)
return ancestors_list
[docs]
def to_pdag(self):
"""
Returns the PDAG (the equivalence class of DAG; also known as CPDAG) of the DAG.
Returns
-------
PDAG: An instance of pgmpy.base.PDAG.
Examples
--------
"""
pass
[docs]
def do(self, nodes, inplace=False):
"""
Applies the do operator to the graph and returns a new DAG with the
transformed graph.
The do-operator, do(X = x) has the effect of removing all edges from
the parents of X and setting X to the given value x.
Parameters
----------
nodes : list, array-like
The names of the nodes to apply the do-operator for.
inplace: boolean (default: False)
If inplace=True, makes the changes to the current object,
otherwise returns a new instance.
Returns
-------
pgmpy.base.DAG: A new instance of DAG modified by the do-operator
References
----------
Causality: Models, Reasoning, and Inference, Judea Pearl (2000). p.70.
"""
dag = self if inplace else self.copy()
if isinstance(nodes, (str, int)):
nodes = [nodes]
else:
nodes = list(nodes)
if not set(nodes).issubset(set(self.nodes())):
raise ValueError(
f"Nodes not found in the model: {set(nodes) - set(self.nodes)}"
)
for node in nodes:
parents = list(dag.predecessors(node))
for parent in parents:
dag.remove_edge(parent, node)
return dag
[docs]
def get_ancestral_graph(self, nodes):
"""
Returns the ancestral graph of the given `nodes`. The ancestral graph only
contains the nodes which are ancestors of atleast one of the variables in
node.
Parameters
----------
node: iterable
List of nodes whose ancestral graph needs to be computed.
Returns
-------
pgmpy.base.DAG instance: The ancestral graph.
"""
return self.subgraph(nodes=self._get_ancestors_of(nodes=nodes))
[docs]
def to_daft(
self,
node_pos="circular",
latex=True,
pgm_params={},
edge_params={},
node_params={},
):
"""
Returns a daft (https://docs.daft-pgm.org/en/latest/) object which can be rendered for
publication quality plots. The returned object's render method can be called to see the plots.
Parameters
----------
node_pos: str or dict (default: circular)
If str: Must be one of the following: circular, kamada_kawai, planar, random, shell, sprint,
spectral, spiral. Please refer: https://networkx.org/documentation/stable//reference/drawing.html#module-networkx.drawing.layout for details on these layouts.
If dict should be of the form {node: (x coordinate, y coordinate)} describing the x and y coordinate of each
node.
If no argument is provided uses circular layout.
latex: boolean
Whether to use latex for rendering the node names.
pgm_params: dict (optional)
Any additional parameters that need to be passed to `daft.PGM` initializer.
Should be of the form: {param_name: param_value}
edge_params: dict (optional)
Any additional edge parameters that need to be passed to `daft.add_edge` method.
Should be of the form: {(u1, v1): {param_name: param_value}, (u2, v2): {...} }
node_params: dict (optional)
Any additional node parameters that need to be passed to `daft.add_node` method.
Should be of the form: {node1: {param_name: param_value}, node2: {...} }
Returns
-------
daft.PGM object: A plot of the DAG.
"""
try:
from daft import PGM
except ImportError as e:
raise ImportError(
"Package daft required. Please visit: https://docs.daft-pgm.org/en/latest/ for installation instructions."
)
if isinstance(node_pos, str):
supported_layouts = {
"circular": nx.circular_layout,
"kamada_kawai": nx.kamada_kawai_layout,
"planar": nx.planar_layout,
"random": nx.random_layout,
"shell": nx.shell_layout,
"spring": nx.spring_layout,
"spectral": nx.spectral_layout,
"spiral": nx.spiral_layout,
}
if node_pos not in supported_layouts.keys():
raise ValueError(
"Unknown node_pos argument. Please refer docstring for accepted values"
)
else:
node_pos = supported_layouts[node_pos](self)
elif isinstance(node_pos, dict):
for node in self.nodes():
if node not in node_pos.keys():
raise ValueError(f"No position specified for {node}.")
else:
raise ValueError(
"Argument node_pos not valid. Please refer to the docstring."
)
daft_pgm = PGM(**pgm_params)
for node in self.nodes():
try:
extra_params = node_params[node]
except KeyError:
extra_params = dict()
if latex:
daft_pgm.add_node(
node,
fr"${node}$",
node_pos[node][0],
node_pos[node][1],
observed=True,
**extra_params,
)
else:
daft_pgm.add_node(
node,
f"{node}",
node_pos[node][0],
node_pos[node][1],
observed=True,
**extra_params,
)
for u, v in self.edges():
try:
extra_params = edge_params[(u, v)]
except KeyError:
extra_params = dict()
daft_pgm.add_edge(u, v, **extra_params)
return daft_pgm
[docs]
@staticmethod
def get_random(n_nodes=5, edge_prob=0.5, latents=False):
"""
Returns a randomly generated DAG with `n_nodes` number of nodes with
edge probability being `edge_prob`.
Parameters
----------
n_nodes: int
The number of nodes in the randomly generated DAG.
edge_prob: float
The probability of edge between any two nodes in the topologically
sorted DAG.
latents: bool (default: False)
If True, includes latent variables in the generated DAG.
Returns
-------
pgmpy.base.DAG instance: The randomly generated DAG.
"""
# Step 1: Generate a matrix of 0 and 1. Prob of choosing 1 = edge_prob
adj_mat = np.random.choice(
[0, 1], size=(n_nodes, n_nodes), p=[1 - edge_prob, edge_prob]
)
# Step 2: Use the upper triangular part of the matrix as adjacency.
nodes = list(range(n_nodes))
edges = nx.convert_matrix.from_numpy_array(
np.triu(adj_mat, k=1), create_using=nx.DiGraph
).edges()
dag = DAG(edges)
dag.add_nodes_from(nodes)
if latents:
dag.latents = set(
np.random.choice(
dag.nodes(), np.random.randint(low=0, high=len(dag.nodes()))
)
)
return dag