Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion gemact/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,7 @@
'uniform': 'distributions.Uniform',
'multinomial': 'distributions.Multinomial',
'dirichlet multinomial' : 'distributions.Dirichlet_Multinomial',
'negative multinomial' : 'distributions.NegMultinom'
'negative multinomial' : 'distributions.NegMultinom',
'negative multinomial' : 'distributions.MultivariateBinomial',
'negative multinomial' : 'distributions.MultivariatePoisson'
}
380 changes: 380 additions & 0 deletions gemact/distributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6667,4 +6667,384 @@ def mgf(self, t):
"""
exponent = np.sum(self.p * np.exp(t))
return (self.p0 / (1 - exponent)) ** self.x0


class MultivariateBinomial:
"""
Multivariate Binomial distribution.
Implementation from "Multivariate Binomial and Poisson Distribution", A.S. Krishnamoorthy, 2013

:param \\**kwargs:
See below

:Keyword Arguments:
* *n* (``int``) --
Number of trials.
* *p_joint* (``numpy.ndarray``) --
Joint probability matrix where p_joint[i,j] = P(Xi=1,Xj=1)

"""

def __init__(self, **kwargs):

self.n = kwargs['n']
self.p = np.diag(kwargs['p_joint'])
self.k = len(self.p)
self.q = 1 - self.p


self.p_joint = np.asarray(kwargs['p_joint'])
assert self.p_joint.shape == (self.k, self.k), "p_joint must be k x k"

# Calculate dependence measures d_ij
self.d = self.p_joint - np.outer(self.p, self.p)

@property
def n(self):
return self.__n

@n.setter
def n(self,value):
hf.assert_type_value(value, 'n', logger, (float, int), lower_bound=0, lower_close=False)
self.__n = value

@property
def p(self):
return self.__p

@p.setter
def p(self, value):

for element in value:
hf.assert_type_value(element, 'p', logger, (float, np.floating), lower_bound=0, upper_bound=1, lower_close=True, upper_close=True)
value = np.array(value)

if sum(value) >= 1:
raise ValueError("success probabilities must not be greater than 1.")

self.__p = value

@staticmethod
def name():
return 'multivariate binomial'

@staticmethod
def category():
return {''}


def pmf(self, x):
"""
Probability mass function.

Uses G-polynomial expansion for dependence correction.

:param x: Quantile where PMF is evaluated.
:type x: ``numpy.ndarray``
:return: Probability mass function evaluated at x.
:rtype: ``numpy.float64``
"""
x = np.asarray(x)
marg_prod = np.prod([binom(self.n, xi) * (self.p[i]**xi) * (self.q[i]**(self.n-xi))
for i, xi in enumerate(x)])

correction = 1.0
for i in range(self.k):
for j in range(i+1, self.k):
if np.abs(self.d[i,j]) > 1e-10:
g_i = self._G_polynomial(1, x[i], self.p[i])
g_j = self._G_polynomial(1, x[j], self.p[j])
denom = self.n * self.p[i] * self.q[i] * self.p[j] * self.q[j]
correction += self.d[i,j] * g_i * g_j / denom

return marg_prod * correction

def logpmf(self, x):
return mt.log(self.pmf(x))

def mean(self):
"""
Mean vector of the distribution.

:return: Mean vector.
:rtype: ``numpy.ndarray``
"""
return self.n * self.p

def cov(self):
"""
Covariance matrix of the distribution.

:return: Covariance matrix.
:rtype: ``numpy.ndarray``
"""
cov_mat = np.zeros((self.k, self.k))
for i in range(self.k):
for j in range(self.k):
if i == j:
cov_mat[i,j] = self.n * self.p[i] * self.q[i]
else:
cov_mat[i,j] = self.n * (self.p_joint[i,j] - self.p[i]*self.p[j])
return cov_mat

def var(self):
"""
Variance.

:return: Variance array.
:rtype: ``numpy.ndarray``
"""

return np.diag(self.cov())

def correlation(self):
"""
Correlation matrix of the distribution.

:return: Correlation matrix.
:rtype: ``numpy.ndarray``
"""
cov = self.cov()
std_devs = np.sqrt(np.diag(cov))
return cov / np.outer(std_devs, std_devs)

def _G_polynomial(self, r, x, p):
"""
Compute G_r(x;p) orthogonal polynomial.

:param r: Order of the polynomial
:param x: Evaluation point
:param p: Probability parameter
:return: Polynomial value
:rtype: ``float``
"""
q = 1 - p
if r == 0:
return 1.0
elif r == 1:
return (x - self.n*p) / np.sqrt(self.n*p*q)
else:
return ((x - self.n*p) * self._G_polynomial(r-1, x, p) -
(r-1)*q * self._G_polynomial(r-2, x, p)) / np.sqrt(self.n*p*q)

def rvs(self, size=1, random_state=None):
"""
Random variates generator function.

:param size: Number of random variates to generate (default=1).
:type size: ``int``
:param random_state: Random state for reproducibility.
:type random_state: ``int``, optional
:return: Array of random variates.
:rtype: ``numpy.ndarray``
"""
if random_state is not None:
np.random.seed(random_state)

corr = self.correlation()
np.fill_diagonal(corr, 1.0) # Ensure diagonal=1

uniforms = norm.cdf(np.random.multivariate_normal(
mean=np.zeros(self.k),
cov=corr,
size=(size, self.n)
))

# Convert to binomial
return np.sum(uniforms < self.p, axis=1).astype(int)


class MultivariatePoisson:

"""
Multivariate Poisson distribution.
Implementation from "Multivariate Binomial and Poisson Distribution", A.S. Krishnamoorthy, 2013

:param \\**kwargs:
See below

:Keyword Arguments:
* *mu* (``numpy.ndarray``) --
Array (k,) of marginal means.
* *mu_joint* (``numpy.ndarray``) --
Matrix (k,k) of Cov(Xi,Xj).

"""

def __init__(self, **kwargs):
required_params = ['mu', 'mu_joint']
for param in required_params:
if param not in kwargs:
raise ValueError(f"Missing required parameter: {param}")

self.mu = np.array(kwargs["mu"])
self.k = len(self.mu)
self.mu_joint = np.asarray(kwargs["mu_joint"])

@property
def mu(self):
return self.__mu

@mu.setter
def mu(self,value):

for element in value:
hf.assert_type_value(element, 'mu', logger, (float, int), lower_bound=None, upper_bound=None, lower_close=True, upper_close=True)
value = np.array(value)

self.__mu = value

@property
def mu_joint(self):
return self.__mu_joint

@mu_joint.setter
def mu_joint(self, value):
value = np.asarray(value)

for element in value.flatten():
hf.assert_type_value(element, 'mu_joint', logger, (float, np.floating, int), lower_bound=None, upper_bound=None, lower_close=True, upper_close=True)

if value.shape != (self.k, self.k):
raise ValueError("mu_joint must be k x k")

np.fill_diagonal(value, 0)
self.__mu_joint = value

@staticmethod
def name():
return 'multivariate poisson'

@staticmethod
def category():
return {''}

def pmf(self, x):
"""
Probability mass function using Charlier polynomials expansion.

:param x: Quantile where PMF is evaluated.
:type x: ``numpy.ndarray``
:return: Probability mass function evaluated at x.
:rtype: ``numpy.float64``
"""
x = np.asarray(x)


marg_prod = np.prod([poisson.pmf(xi, self.mu[i])
for i, xi in enumerate(x)])

correction = 1.0
for i in range(self.k):
for j in range(i+1, self.k):
if np.abs(self.mu_joint[i,j]) > 1e-10:
k_i = self._charlier_poly(x[i], self.mu[i], 1)
k_j = self._charlier_poly(x[j], self.mu[j], 1)
correction += self.mu_joint[i,j] * k_i * k_j / (self.mu[i] * self.mu[j])

return marg_prod * correction

def logpmf(self, x):
return mt.log(self.pmf(x))

def mean(self):
"""
Mean vector of the distribution.

:return: Mean vector.
:rtype: ``numpy.ndarray``
"""
return self.mu

def cov(self):
"""
Covariance matrix of the distribution.

:return: Covariance matrix.
:rtype: ``numpy.ndarray``
"""
cov_mat = np.diag(self.mu)

for i in range(self.k):
for j in range(i+1, self.k):
cov_mat[i,j] = self.mu_joint[i,j]
cov_mat[j,i] = self.mu_joint[i,j]

return cov_mat

def var(self):
"""
Variance.

:return: Variance array.
:rtype: ``numpy.ndarray``
"""

return np.diag(self.cov())

def correlation(self):
"""
Correlation matrix of the distribution.

:return: Correlation matrix.
:rtype: ``numpy.ndarray``
"""
cov = self.covariance()
std_devs = np.sqrt(np.diag(cov))
return cov / np.outer(std_devs, std_devs)

def _charlier_poly(self, x, mu, r):
"""
Charlier polynomial K_r(x; mu).

:param x: Evaluation point
:param mu: Mean parameter
:param r: Order of polynomial
:return: Polynomial value
:rtype: ``float``
"""
if r == 0:
return 1.0
elif r == 1:
return (x - mu) / np.sqrt(mu)
else:

return ((x - mu) * self._charlier_poly(x, mu, r-1) - \
(r-1) * mu * self._charlier_poly(x, mu, r-2)) / np.sqrt(mu)

def rvs(self, size=1, random_state=None):
"""
Random variates generator function.

:param size: Number of random variates to generate (default=1).
:type size: ``int``
:param random_state: Random state for reproducibility.
:type random_state: ``int``, optional
:return: Array of random variates.
:rtype: ``numpy.ndarray``
"""
if random_state is not None:
np.random.seed(random_state)
samples = np.zeros((size, self.k), dtype=int)

common_shocks = {}
for i in range(self.k):
for j in range(i+1, self.k):
if self.mu_joint[i,j] > 0:
common_shocks[(i,j)] = np.random.poisson(
self.mu_joint[i,j], size=size)

for i in range(self.k):
# independent component
samples[:,i] = np.random.poisson(
self.mu[i] - np.sum([self.mu_joint[i,j] for j in range(self.k) if j != i]),
size=size)

# Shocks
for j in range(self.k):
if i < j and (i,j) in common_shocks:
samples[:,i] += common_shocks[(i,j)]
elif i > j and (j,i) in common_shocks:
samples[:,i] += common_shocks[(j,i)]
return samples.squeeze()

2 changes: 2 additions & 0 deletions gemact/libraries.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@
from itertools import groupby
from scipy.special import gammaln
import math as mt
from math import comb