From 473926152192ee3bdde646b8d384ba68c5d03514 Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Mon, 2 Jun 2025 19:27:58 +0200 Subject: [PATCH 1/3] Update distributions.py Multivariate Binomial and Multivariate Poisson added --- gemact/distributions.py | 380 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 380 insertions(+) diff --git a/gemact/distributions.py b/gemact/distributions.py index a3226b1..fcd84d2 100644 --- a/gemact/distributions.py +++ b/gemact/distributions.py @@ -6667,4 +6667,384 @@ def mgf(self, t): """ exponent = np.sum(self.p * np.exp(t)) return (self.p0 / (1 - exponent)) ** self.x0 + + +class MultivariateBinomial: + """ + Multivariate Binomial distribution. + Implementation from "Multivariate Binomial and Poisson Distribution", A.S. Krishnamoorthy, 2013 + + :param \\**kwargs: + See below + + :Keyword Arguments: + * *n* (``int``) -- + Number of trials. + * *p_joint* (``numpy.ndarray``) -- + Joint probability matrix where p_joint[i,j] = P(Xi=1,Xj=1) + + """ + + def __init__(self, **kwargs): + + self.n = kwargs['n'] + self.p = np.diag(kwargs['p_joint']) + self.k = len(self.p) + self.q = 1 - self.p + + + self.p_joint = np.asarray(kwargs['p_joint']) + assert self.p_joint.shape == (self.k, self.k), "p_joint must be k x k" + + # Calculate dependence measures d_ij + self.d = self.p_joint - np.outer(self.p, self.p) + + @property + def n(self): + return self.__n + + @n.setter + def n(self,value): + hf.assert_type_value(value, 'n', logger, (float, int), lower_bound=0, lower_close=False) + self.__n = value + + @property + def p(self): + return self.__p + + @p.setter + def p(self, value): + + for element in value: + hf.assert_type_value(element, 'p', logger, (float, np.floating), lower_bound=0, upper_bound=1, lower_close=True, upper_close=True) + value = np.array(value) + + if sum(value) >= 1: + raise ValueError("success probabilities must not be greater than 1.") + + self.__p = value + + @staticmethod + def name(): + return 'multivariate binomial' + + @staticmethod + def category(): + return {''} + + + def pmf(self, x): + """ + Probability mass function. + + Uses G-polynomial expansion for dependence correction. + + :param x: Quantile where PMF is evaluated. + :type x: ``numpy.ndarray`` + :return: Probability mass function evaluated at x. + :rtype: ``numpy.float64`` + """ + x = np.asarray(x) + marg_prod = np.prod([binom(self.n, xi) * (self.p[i]**xi) * (self.q[i]**(self.n-xi)) + for i, xi in enumerate(x)]) + + correction = 1.0 + for i in range(self.k): + for j in range(i+1, self.k): + if np.abs(self.d[i,j]) > 1e-10: + g_i = self._G_polynomial(1, x[i], self.p[i]) + g_j = self._G_polynomial(1, x[j], self.p[j]) + denom = self.n * self.p[i] * self.q[i] * self.p[j] * self.q[j] + correction += self.d[i,j] * g_i * g_j / denom + + return marg_prod * correction + + def logpmf(self, x): + return mt.log(self.pmf(x)) + + def mean(self): + """ + Mean vector of the distribution. + + :return: Mean vector. + :rtype: ``numpy.ndarray`` + """ + return self.n * self.p + + def cov(self): + """ + Covariance matrix of the distribution. + + :return: Covariance matrix. + :rtype: ``numpy.ndarray`` + """ + cov_mat = np.zeros((self.k, self.k)) + for i in range(self.k): + for j in range(self.k): + if i == j: + cov_mat[i,j] = self.n * self.p[i] * self.q[i] + else: + cov_mat[i,j] = self.n * (self.p_joint[i,j] - self.p[i]*self.p[j]) + return cov_mat + + def var(self): + """ + Variance. + + :return: Variance array. + :rtype: ``numpy.ndarray`` + """ + + return np.diag(self.cov()) + + def correlation(self): + """ + Correlation matrix of the distribution. + + :return: Correlation matrix. + :rtype: ``numpy.ndarray`` + """ + cov = self.cov() + std_devs = np.sqrt(np.diag(cov)) + return cov / np.outer(std_devs, std_devs) + + def _G_polynomial(self, r, x, p): + """ + Compute G_r(x;p) orthogonal polynomial. + + :param r: Order of the polynomial + :param x: Evaluation point + :param p: Probability parameter + :return: Polynomial value + :rtype: ``float`` + """ + q = 1 - p + if r == 0: + return 1.0 + elif r == 1: + return (x - self.n*p) / np.sqrt(self.n*p*q) + else: + return ((x - self.n*p) * self._G_polynomial(r-1, x, p) - + (r-1)*q * self._G_polynomial(r-2, x, p)) / np.sqrt(self.n*p*q) + + def rvs(self, size=1, random_state=None): + """ + Random variates generator function. + + :param size: Number of random variates to generate (default=1). + :type size: ``int`` + :param random_state: Random state for reproducibility. + :type random_state: ``int``, optional + :return: Array of random variates. + :rtype: ``numpy.ndarray`` + """ + if random_state is not None: + np.random.seed(random_state) + + corr = self.correlation() + np.fill_diagonal(corr, 1.0) # Ensure diagonal=1 + + uniforms = norm.cdf(np.random.multivariate_normal( + mean=np.zeros(self.k), + cov=corr, + size=(size, self.n) + )) + + # Convert to binomial + return np.sum(uniforms < self.p, axis=1).astype(int) + + +class MultivariatePoisson: + + """ + Multivariate Poisson distribution. + Implementation from "Multivariate Binomial and Poisson Distribution", A.S. Krishnamoorthy, 2013 + + :param \\**kwargs: + See below + + :Keyword Arguments: + * *mu* (``numpy.ndarray``) -- + Array (k,) of marginal means. + * *mu_joint* (``numpy.ndarray``) -- + Matrix (k,k) of Cov(Xi,Xj). + + """ + + def __init__(self, **kwargs): + required_params = ['mu', 'mu_joint'] + for param in required_params: + if param not in kwargs: + raise ValueError(f"Missing required parameter: {param}") + + self.mu = np.array(kwargs["mu"]) + self.k = len(self.mu) + self.mu_joint = np.asarray(kwargs["mu_joint"]) + + @property + def mu(self): + return self.__mu + + @mu.setter + def mu(self,value): + + for element in value: + hf.assert_type_value(element, 'mu', logger, (float, int), lower_bound=None, upper_bound=None, lower_close=True, upper_close=True) + value = np.array(value) + + self.__mu = value + + @property + def mu_joint(self): + return self.__mu_joint + + @mu_joint.setter + def mu_joint(self, value): + value = np.asarray(value) + + for element in value.flatten(): + hf.assert_type_value(element, 'mu_joint', logger, (float, np.floating, int), lower_bound=None, upper_bound=None, lower_close=True, upper_close=True) + + if value.shape != (self.k, self.k): + raise ValueError("mu_joint must be k x k") + + np.fill_diagonal(value, 0) + self.__mu_joint = value + + @staticmethod + def name(): + return 'multivariate poisson' + + @staticmethod + def category(): + return {''} + + def pmf(self, x): + """ + Probability mass function using Charlier polynomials expansion. + + :param x: Quantile where PMF is evaluated. + :type x: ``numpy.ndarray`` + :return: Probability mass function evaluated at x. + :rtype: ``numpy.float64`` + """ + x = np.asarray(x) + + + marg_prod = np.prod([poisson.pmf(xi, self.mu[i]) + for i, xi in enumerate(x)]) + + correction = 1.0 + for i in range(self.k): + for j in range(i+1, self.k): + if np.abs(self.mu_joint[i,j]) > 1e-10: + k_i = self._charlier_poly(x[i], self.mu[i], 1) + k_j = self._charlier_poly(x[j], self.mu[j], 1) + correction += self.mu_joint[i,j] * k_i * k_j / (self.mu[i] * self.mu[j]) + + return marg_prod * correction + + def logpmf(self, x): + return mt.log(self.pmf(x)) + + def mean(self): + """ + Mean vector of the distribution. + + :return: Mean vector. + :rtype: ``numpy.ndarray`` + """ + return self.mu + + def cov(self): + """ + Covariance matrix of the distribution. + + :return: Covariance matrix. + :rtype: ``numpy.ndarray`` + """ + cov_mat = np.diag(self.mu) + + for i in range(self.k): + for j in range(i+1, self.k): + cov_mat[i,j] = self.mu_joint[i,j] + cov_mat[j,i] = self.mu_joint[i,j] + + return cov_mat + + def var(self): + """ + Variance. + + :return: Variance array. + :rtype: ``numpy.ndarray`` + """ + + return np.diag(self.cov()) + + def correlation(self): + """ + Correlation matrix of the distribution. + + :return: Correlation matrix. + :rtype: ``numpy.ndarray`` + """ + cov = self.covariance() + std_devs = np.sqrt(np.diag(cov)) + return cov / np.outer(std_devs, std_devs) + + def _charlier_poly(self, x, mu, r): + """ + Charlier polynomial K_r(x; mu). + + :param x: Evaluation point + :param mu: Mean parameter + :param r: Order of polynomial + :return: Polynomial value + :rtype: ``float`` + """ + if r == 0: + return 1.0 + elif r == 1: + return (x - mu) / np.sqrt(mu) + else: + + return ((x - mu) * self._charlier_poly(x, mu, r-1) - \ + (r-1) * mu * self._charlier_poly(x, mu, r-2)) / np.sqrt(mu) + + def rvs(self, size=1, random_state=None): + """ + Random variates generator function. + + :param size: Number of random variates to generate (default=1). + :type size: ``int`` + :param random_state: Random state for reproducibility. + :type random_state: ``int``, optional + :return: Array of random variates. + :rtype: ``numpy.ndarray`` + """ + if random_state is not None: + np.random.seed(random_state) + samples = np.zeros((size, self.k), dtype=int) + + common_shocks = {} + for i in range(self.k): + for j in range(i+1, self.k): + if self.mu_joint[i,j] > 0: + common_shocks[(i,j)] = np.random.poisson( + self.mu_joint[i,j], size=size) + + for i in range(self.k): + # independent component + samples[:,i] = np.random.poisson( + self.mu[i] - np.sum([self.mu_joint[i,j] for j in range(self.k) if j != i]), + size=size) + + # Shocks + for j in range(self.k): + if i < j and (i,j) in common_shocks: + samples[:,i] += common_shocks[(i,j)] + elif i > j and (j,i) in common_shocks: + samples[:,i] += common_shocks[(j,i)] + return samples.squeeze() From bc9e50639464a620129b0c6e761a7bfa82c53306 Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Mon, 2 Jun 2025 19:30:14 +0200 Subject: [PATCH 2/3] Update config.py dictionary updated (Multivariate Binomial and Multivariate Poisson) --- gemact/config.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gemact/config.py b/gemact/config.py index 050761c..49a91e8 100644 --- a/gemact/config.py +++ b/gemact/config.py @@ -60,5 +60,7 @@ 'uniform': 'distributions.Uniform', 'multinomial': 'distributions.Multinomial', 'dirichlet multinomial' : 'distributions.Dirichlet_Multinomial', - 'negative multinomial' : 'distributions.NegMultinom' + 'negative multinomial' : 'distributions.NegMultinom', + 'negative multinomial' : 'distributions.MultivariateBinomial', + 'negative multinomial' : 'distributions.MultivariatePoisson' } From 551113125cb1fe40688c356eb030644bf15a70e2 Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Mon, 2 Jun 2025 19:32:27 +0200 Subject: [PATCH 3/3] Update libraries.py --- gemact/libraries.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gemact/libraries.py b/gemact/libraries.py index cd435b1..f87e8a6 100644 --- a/gemact/libraries.py +++ b/gemact/libraries.py @@ -15,3 +15,5 @@ from itertools import groupby from scipy.special import gammaln import math as mt +from math import comb +