"""
Defining a set of classes that represent causal functions/ mechanisms.
"""
import random
import numpy as np
from scipy.stats import bernoulli
from sklearn.mixture import GaussianMixture as GMM
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.gaussian_process import GaussianProcessRegressor
[docs]
def gmm_cause(n, k=4, p1=2, p2=2, verbose=False):
g = GMM(k)
g.means_ = p1 * np.random.randn(k, 1)
g.covars_ = np.power(abs(p2 * np.random.randn(k, 1) + 1), 2)
g.weights_ = abs(np.random.rand(k, 1))
g.weights_ = g.weights_ / sum(g.weights_)
if(verbose):
print(f" GMM cause initialized with {k} components and {n} points")
return np.random.uniform(-1, 1, n)
[docs]
class LinearMechanism(object):
"""Linear mechanism, where Effect = alpha*Cause + Noise."""
[docs]
def __init__(self, ncauses, points, d=4, noise_coeff=.7, verbose=False):
"""Init the mechanism."""
super(LinearMechanism, self).__init__()
self.n_causes = ncauses
self.points = points
self.coefflist = []
for i in range(ncauses):
self.coefflist.append(random.random())
self.noise = np.random.randn(points, 1)
self.d = d
[docs]
def __call__(self, causes, verbose=False):
"""Run the mechanism."""
# Additive only, for now
effect = np.zeros((self.points, 1))
# Compute each cause's contribution
for par in range(causes.shape[1]):
effect[:, 0] = effect[:, 0] + self.coefflist[par]*causes[:, par]
effect[:, 0] = effect[:, 0] + self.noise[:, 0]
return effect
[docs]
class Polynomial_Mechanism(object):
[docs]
def __init__(self, ncauses, points, d=2, noise_coeff=.7, verbose=False):
"""Init the mechanism."""
super(Polynomial_Mechanism, self).__init__()
self.n_causes = ncauses
self.points = points
self.d = d
self.polycause = []
self.verbose = verbose
for c in range(ncauses):
self.coefflist = []
for j in range(self.d + 1):
self.coefflist.append(random.random())
self.polycause.append(self.coefflist)
self.ber = bernoulli.rvs(0.5)
self.noise = 0.1*np.random.randn(points, 1)
if self.verbose:
print(f"Polynomial Mechanism Initialized w {ncauses} causes + {points} p")
[docs]
def mechanism(self, x, par):
list_coeff = self.polycause[par]
result = np.zeros((self.points, 1))
for i in range(self.points):
for j in range(self.d+1):
result[i, 0] += list_coeff[j]*np.power(x[i], j)
# XXX
# ARGH!!!!!!!!!!!!!!!
# result[i, 0] = min(result[i, 0], 1)
# result[i, 0] = max(result[i, 0], -1)
if self.verbose:
print(f" Coeffs: ", end="", sep="")
for j in range(self.d+1):
print(f"{list_coeff[j]:.2f}, ", end="", sep="")
print("")
print(f" {par}th cause = ", end="", sep="")
for j in range(self.d+1):
print(f"{list_coeff[j]:.2f}*x^{j} + ", end="", sep="")
print(f"noise")
return result
[docs]
def __call__(self, causes, verbose=False):
"""Run the mechanism."""
if verbose:
print(" Polynomial Mechanism Called")
effect = np.zeros((self.points, 1))
# Compute each cause's contribution
for par in range(causes.shape[1]):
effect[:, 0] = effect[:, 0] + self.mechanism(causes[:, par], par)[:, 0]
# I remove the multiplicative noise since it destroys the effect
# if(self.ber > 0 and causes.shape[1] > 0):
# effect[:, 0] = effect[:, 0] * self.noise[:, 0]
# if verbose:
# print(" Noise **multiplied** to effect")
# else:
effect[:, 0] = effect[:, 0] + self.noise[:, 0]
if verbose:
print(f" Noise added: [", end="")
for i in range(self.noise.shape[0]):
print(f"{self.noise[i, 0]:.2f}, ", end="")
print("]")
return effect
[docs]
class SigmoidAM_Mechanism(object):
[docs]
def __init__(self, ncauses, points, d=4, noise_coeff=.7, verbose=False):
"""Init the mechanism."""
super(SigmoidAM_Mechanism, self).__init__()
self.n_causes = ncauses
self.points = points
self.a = np.random.exponential(1/4) + 1
ber = bernoulli.rvs(0.5)
self.b = ber * np.random.uniform(-2, -0.5) + (1-ber)*np.random.uniform(0.5, 2)
self.c = np.random.uniform(-2, 2)
self.noise = 0.1*np.random.randn(points, 1)
[docs]
def mechanism(self, x):
result = np.\
zeros((self.points, 1))
for i in range(self.points):
result[i, 0] = self.a * self.b * (x[i] + self.c) / (1 + abs(self.b * (x[i] + self.c)))
return result
[docs]
def __call__(self, causes, verbose=False):
"""Run the mechanism."""
# Additive only
effect = np.zeros((self.points, 1))
# Compute each cause's contribution
for par in range(causes.shape[1]):
effect[:, 0] = effect[:, 0] + self.mechanism(causes[:, par])[:, 0]
effect[:, 0] = effect[:, 0] + self.noise[:, 0]
return effect
[docs]
class SigmoidMix_Mechanism(object):
[docs]
def __init__(self, ncauses, points, d=4, noise_coeff=.7, verbose=False):
"""Init the mechanism."""
super(SigmoidMix_Mechanism, self).__init__()
self.n_causes = ncauses
self.points = points
self.a = np.random.exponential(1/4) + 1
ber = bernoulli.rvs(0.5)
self.b = ber * np.random.uniform(-2, -0.5) + (1-ber)*np.random.uniform(0.5, 2)
self.c = np.random.uniform(-2, 2)
self.noise = 0.1*np.random.randn(points, 1)
[docs]
def mechanism(self, causes):
result = np.zeros((self.points, 1))
for i in range(self.points):
pre_add_effect = 0
for c in range(causes.shape[1]):
pre_add_effect += causes[i, c]
pre_add_effect += self.noise[i]
result[i, 0] = self.a * self.b * \
(pre_add_effect + self.c)/(1 + abs(self.b*(pre_add_effect + self.c)))
return result
[docs]
def __call__(self, causes, verbose=False):
"""Run the mechanism."""
effect = np.zeros((self.points, 1))
# Compute each cause's contribution
effect[:, 0] = self.mechanism(causes)[:, 0]
return effect
[docs]
def computeGaussKernel(x):
xnorm = np.power(euclidean_distances(x, x), 2)
return np.exp(-xnorm / (2.0))
[docs]
class GaussianProcessAdd_Mechanism(object):
[docs]
def __init__(self, ncauses, points, verbose=False):
"""Init the mechanism."""
super(GaussianProcessAdd_Mechanism, self).__init__()
self.n_causes = ncauses
self.points = points
self.noise = 0.1*np.random.randn(points, 1)
self.nb_step = 0
[docs]
def mechanism(self, x):
"""Run the mechanism."""
self.nb_step += 1
x = np.reshape(x, (x.shape[0], 1))
if(self.nb_step < 5):
cov = computeGaussKernel(x)
mean = np.zeros((1, self.points))[0, :]
y = np.random.multivariate_normal(mean, cov)
elif(self.nb_step == 5):
cov = computeGaussKernel(x)
mean = np.zeros((1, self.points))[0, :]
y = np.random.multivariate_normal(mean, cov)
self.gpr = GaussianProcessRegressor()
self.gpr.fit(x, y)
y = self.gpr.predict(x)
else:
y = self.gpr.predict(x)
return y
[docs]
def __call__(self, causes, verbose=False):
"""Run the mechanism."""
# Additive only
effect = np.zeros((self.points, 1))
# Compute each cause's contribution
for par in range(causes.shape[1]):
effect[:, 0] = effect[:, 0] + self.mechanism(causes[:, par])
effect[:, 0] = effect[:, 0] + self.noise[:, 0]
return effect
[docs]
class GaussianProcessMix_Mechanism(object):
[docs]
def __init__(self, ncauses, points, verbose=False):
"""Init the mechanism."""
super(GaussianProcessMix_Mechanism, self).__init__()
self.n_causes = ncauses
self.points = points
self.noise = 0.1*np.random.randn(points, 1)
self.nb_step = 0
[docs]
def mechanism(self, x):
self.nb_step += 1
x = np.reshape(x, (x.shape[0], x.shape[1]))
if(self.nb_step < 2):
cov = computeGaussKernel(x)
mean = np.zeros((1, self.points))[0, :]
y = np.random.multivariate_normal(mean, cov)
elif(self.nb_step == 2):
cov = computeGaussKernel(x)
mean = np.zeros((1, self.points))[0, :]
y = np.random.multivariate_normal(mean, cov)
self.gpr = GaussianProcessRegressor()
self.gpr.fit(x, y)
y = self.gpr.predict(x)
else:
y = self.gpr.predict(x)
return y
[docs]
def __call__(self, causes, verbose=False):
"""Run the mechanism."""
effect = np.zeros((self.points, 1))
# Compute each cause's contribution
if(causes.shape[1] > 0):
mix = np.hstack((causes, self.noise))
effect[:, 0] = self.mechanism(mix)
else:
effect[:, 0] = self.mechanism(self.noise)
return effect
[docs]
def gaussian_cause(n):
return np.random.randn(n, 1)[:, 0]
[docs]
def noise(n, v):
return v * np.random.rand(1) * np.random.randn(n, 1) + random.sample([2, -2], 1)