Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions pyrecest/distributions/hypertorus/hypertoroidal_grid_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import numpy as np
from .abstract_hypertoroidal_distribution import AbstractHypertoroidalDistribution
from ..abstract_grid_distribution import AbstractGridDistribution
from scipy.fftpack import fftn, ifftn, fftshift, ifftshift
from .hypertoroidal_fourier_distribution import HypertoroidalFourierDistribution
from .hypertoroidal_dirac_distribution import HypertoroidalDiracDistribution

class HypertoroidalGridDistribution(AbstractGridDistribution, AbstractHypertoroidalDistribution):
def __init__(self, grid_values, grid_type = "custom", grid = None, enforce_pdf_nonnegative=True, dim=None):
# Constructor
if dim is None and grid is None or grid.ndim<=1:
dim = 1
elif dim is None:
dim = grid.shape[1]

AbstractHypertoroidalDistribution.__init__(self, dim)
AbstractGridDistribution.__init__(self, grid_values, grid_type, grid, dim)
self.enforce_pdf_nonnegative = enforce_pdf_nonnegative
# Check if normalized. If not: Normalize
self.normalize_in_place()

def get_closest_point(self, xs):
min_error = np.inf # Initialize with infinity
closest_point = None

for point in self.grid:
error = AbstractHypertoroidalDistribution.angular_error(x, point)
if error < min_error:
min_error = error
closest_point = point

return closest_point

def get_manifold_size(self):
return AbstractHypertoroidalDistribution.get_manifold_size(self)

@staticmethod
def generate_cartesian_product_grid(n_grid_points):
grid_individual_axis = [np.linspace(0, 2 * np.pi - 2 * np.pi / n, n) for n in n_grid_points]
meshgrids = np.meshgrid(*grid_individual_axis, indexing='ij')
grid = np.column_stack([grid.ravel() for grid in meshgrids])
return grid

def multiply(self, other):
assert np.all(self.grid == other.grid), 'Multiply:IncompatibleGrid: Can only multiply for equal grids.'
return super().multiply(other)

def pdf(self, xs):
assert self.grid_type == 'CartesianProd', 'pdf is not defined. Can only interpolate for certain grid types.'
print('Warning: PDF:UseInterpolated: pdf is not defined. Using interpolation with Fourier series.')

transformation = 'sqrt' if self.enforce_pdf_nonnegative else 'identity'

print('Warning: Cannot interpolate if number of grid points are not specified. Assuming equidistant grid')
fd = HypertoroidalFourierDistribution.from_function_values(
np.reshape(self.grid_values, self.grid_density_description["n_grid_values"]),
self.grid_density_description["n_grid_values"] + (self.grid_density_description["n_grid_values"] % 2 == 0),
transformation
)
return fd.pdf(xs)

def get_grid(self):
if self.grid.size > 0:
return self.grid
elif self.grid_type == 'CartesianProd':
print('Warning: Grid:GenerateDuringRunTime: Generating grid anew on call to .getGrid(). If you require the grid frequently, store it in the class.')
return np.squeeze(self.generate_cartesian_product_grid(self.n_grid_points))
else:
raise ValueError('Grid:UnknownGrid: Grid was not provided and is thus unavailable')

def pdf_unnormalized(self, xs):
assert self.grid_type == 'CartesianProd', 'pdf is not defined. Can only interpolate for certain grid types.'
p = self.integrate() * self.pdf(xs)
return p

def shift(self, shift_by):
if np.linalg.norm(shift_by) == 0:
return self

assert self.grid_type == 'CartesianProd'

# Initialize with some uniform distribution and then replace coefficient matrix
coeff_mat_tmp = np.zeros(tuple([3] * self.dim))
coeff_mat_tmp[0] = 1 / (2 * np.pi) ** self.dim
hfd = HypertoroidalFourierDistribution(fftshift(coeff_mat_tmp), 'identity')
hfd.C = fftshift(fftn(np.reshape(self.grid_values, self.n_grid_points)))
hfd_shifted = hfd.shift(shift_by)

shifted_distribution = self.__class__(self.grid, self.grid_values)
shifted_distribution.grid_values = np.reshape(ifftn(ifftshift(hfd_shifted.C), overwrite_x=True), self.grid_values.shape)
return shifted_distribution

def value_of_closest(self, xa):
p = np.empty(xa.shape[1])
for i in range(xa.shape[1]):
dists = np.sum(np.minimum((self.grid - xa[:, i]) ** 2, (2 * np.pi - (self.grid - xa[:, i])) ** 2), axis=0)
min_ind = np.argmin(dists)
p[i] = self.grid_values[min_ind]
return p

def convolve(self, other):
assert self.grid_type == 'CartesianProd' and other.grid_type == 'CartesianProd'
assert np.all(self.n_grid_points == other.no_of_grid_points)
assert np.all(self.grid == other.grid)

this_tensor = np.reshape(self.grid_values, self.n_grid_points)
other_tensor = np.reshape(other.grid_values, other.no_of_grid_points)

res_tensor = (2 * np.pi) ** self.dim / self.grid_values.size * ifftn(fftn(this_tensor) * fftn(other_tensor))
convolved_distribution = self.__class__(self.grid, np.reshape(res_tensor, self.grid_values.shape))
return convolved_distribution

@staticmethod
def from_distribution(distribution, n_grid_points, grid_type='CartesianProd', enforce_pdf_nonnegative = True):
assert isinstance(grid_type, str)
if grid_type == 'CartesianProd' and isinstance(distribution, HypertoroidalFourierDistribution) and \
(distribution.C.shape == n_grid_points or (np.isscalar(n_grid_points) and n_grid_points == distribution.C.shape[0])):
grid_values = distribution.pdf_on_grid()
grid = HypertoroidalGridDistribution.generate_cartesian_product_grid(n_grid_points)
hgd = HypertoroidalGridDistribution(grid, grid_values.flatten())
hgd.grid_type = grid_type
else:
hgd = HypertoroidalGridDistribution.from_function(distribution.pdf, n_grid_points, distribution.dim, grid_type=grid_type, enforce_pdf_nonnegative=enforce_pdf_nonnegative)
return hgd

@staticmethod
def from_function(fun, n_gridpoints, dim, grid_type='CartesianProd', enforce_pdf_nonnegative = True):
assert isinstance(grid_type, str)
if grid_type == 'CartesianProd':
if np.isscalar(n_gridpoints):
n_gridpoints = np.repeat(n_gridpoints, dim)
else:
assert len(n_gridpoints) == dim

grid = HypertoroidalGridDistribution.generate_cartesian_product_grid(n_gridpoints)
else:
raise ValueError("Grid scheme not recognized")

grid_values = np.apply_along_axis(fun, 1, grid)
sgd = HypertoroidalGridDistribution(grid_values, grid=grid, enforce_pdf_nonnegative=enforce_pdf_nonnegative, grid_type=grid_type)
return sgd

def plot(self, *args, **kwargs):
hdd = HypertoroidalDiracDistribution(self.grid, (1 / len(self.grid_values)) * np.ones(len(self.grid_values)))
hdd.w = self.grid_values.T
h = hdd.plot(*args, **kwargs)
return h

def plot_interpolated(self, *args, **kwargs):
if self.dim > 2:
raise ValueError("Can currently only plot for T1 and T2 torus.")
if self.enforce_pdf_nonnegative:
transformation = 'sqrt'
else:
transformation = 'identity'
sizes = np.repeat(len(self.grid_values) ** (1 / self.dim), self.dim)
fd = HypertoroidalFourierDistribution.from_function_values(np.reshape(self.grid_values, [sizes, 1]), sizes, transformation)
h = fd.plot(*args, **kwargs)
return h

def trigonometric_moment(self, n):
hwd = HypertoroidalDiracDistribution(self.grid, self.grid_values.T / sum(self.grid_values))
m = hwd.trigonometric_moment(n)
return m

def slice_at(self, dims, val, use_fftn=True):
assert self.grid_type == 'CartesianProd', "This operation is only supported for grids generated based on a Cartesian product."
assert all([dim <= self.dim for dim in dims]), "Cannot perform this operation for a dimension that is higher than the dimensionality of the distribution."

fvals_on_grid = np.reshape(self.grid_values, self.n_grid_points)
if np.all(val == np.zeros(len(val))):
grid_shifted_full = fvals_on_grid
elif use_fftn:
# Use HypertoroidalFourierDistribution, which uses FFTN
shift_vec = np.zeros(self.dim)
shift_vec[dims] = -val
hfd_sqrt = HypertoroidalFourierDistribution.from_function_values(fvals_on_grid, self.n_grid_points, 'sqrt')
hfd_sqrt_shifted = hfd_sqrt.shift(shift_vec)
grid_shifted_full = hfd_sqrt_shifted.pdf_on_grid()
else:

raise NotImplementedError("This part of the code requires further adaptation to work in Python.")
return grid_shifted_full
114 changes: 114 additions & 0 deletions pyrecest/filters/state_space_subdivision_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import numpy as np
import warnings
from scipy.stats import multivariate_normal
from .abstract_hypercylindrical_filter import AbstractHypercylindricalFilter
from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution

class StateSpaceSubdivisionFilter(AbstractHypercylindricalFilter):
def __init__(self):
self.apd = None

def setState(self, apd_):
if not self.apd.is_empty() and (len(self.apd.gaussians) != len(apd_.gaussians)):
warnings.warn("Number of components differ.", "LinPeriodic:NoComponentsDiffer")
self.apd = apd_

def predictLinear(self, transitionDensity=None, covarianceMatrices=None, systemMatrices=None, linearInputVectors=None):
if transitionDensity is not None:
# Various assert checks
pass
n_areas = len(self.apd.linearDistributions)

if transitionDensity is None and covarianceMatrices is None and systemMatrices is None and linearInputVectors is None:
# Case 1
pass
elif transitionDensity is None:
# Case 2
pass
else:
# Case 3
pass

def update(self, likelihoodPeriodicGrid=None, likelihoodsLinear=None):
if isinstance(likelihoodPeriodicGrid, AbstractDistribution):
likelihoodPeriodicGrid = likelihoodPeriodicGrid.pdf(self.apd.gd.getGrid()).T

if likelihoodPeriodicGrid is None and likelihoodsLinear is None:
warnings.warn("Nothing to do for this update step.", "StateSpaceSubdivisionFilter:NoParamsForUpdate")
return

if likelihoodPeriodicGrid is not None:
self.apd.gd.gridValues = self.apd.gd.gridValues * likelihoodPeriodicGrid

if likelihoodsLinear is not None:
# Update current linear distributions
pass

self.apd.gd = self.apd.gd.normalize(warnUnnorm=False)

def get_estimate(self):
return self.apd

def get_point_estimate(self):
return self.apd.hybridMean()

def predict_linear(sysModel, sdGrid, sdSubDivGaussian):
grid = sdGrid.grid
sdHalfCondSdHalf = SdHalfCondSdHalfGridDistribution.fromFunction(
lambda x, y: sysModel.transitionDensity(0, x, y),
17,
funDoesCartesianProduct=False,
gridType='eq_point_set_symm',
dim=2 * sysModel.dim
)

cartesian_product = np.array(np.meshgrid(range(grid.shape[1]), range(grid.shape[1]))).T.reshape(-1, 2)

sdSubDivGaussianNew = []
for i, j in cartesian_product:
temp = sdSubDivGaussian[i].multiply(sdSubDivGaussian[j])
temp = sdSubDivGaussian[i].multiply(sdHalfCondSdHalf.gridValues[j, i])
sdSubDivGaussianNew.append(temp)
sdSubDivGaussianNew = reduce(lambda x, y: x.add(y), sdSubDivGaussianNew)

sdSubDivGaussianNew.gridValues = np.array([sdSubDivGaussianNew.gridValues[j, i] for i, j in cartesian_product])

sdGridNew = sdGrid
sdGridNew.gridValues = sdSubDivGaussianNew.gridValues

return sdGridNew, sdSubDivGaussianNew

def update(self, likelihood_periodic_grid, likelihoods_linear):
if isinstance(likelihood_periodic_grid, AbstractDistribution):
likelihood_periodic_grid = likelihood_periodic_grid.pdf(self.apd.gd.get_grid()).T

assert (likelihood_periodic_grid.size == 0 or
np.array_equal(likelihood_periodic_grid.shape, self.apd.gd.grid_values.shape))
assert (likelihoods_linear.size == 0 or
all([ld.dim == self.apd.linD for ld in likelihoods_linear]))

assert len(likelihoods_linear) <= 1 or len(likelihoods_linear) == self.apd.gd.grid_values.shape[0]

if likelihood_periodic_grid.size == 0 and likelihoods_linear.size == 0:
print("Warning: Nothing to do for this update step.")
return

if likelihood_periodic_grid.size != 0:
self.apd.gd.grid_values *= likelihood_periodic_grid

if likelihoods_linear.size != 0:
mu_preds = np.array([ld.mu for ld in self.apd.linear_distributions]).T
mu_likelihoods = np.array([ld.mu for ld in likelihoods_linear]).T
covs = np.dstack([ld.C for ld in self.apd.linear_distributions]) + np.dstack([ld.C for ld in likelihoods_linear])

self.apd.gd.grid_values *= multivariate_normal.pdf(mu_preds.T, mu_likelihoods.T, covs)

for i, ld in enumerate(self.apd.linear_distributions):
j = i if len(likelihoods_linear) > 1 else 0
C_est_inv_curr = np.linalg.inv(ld.C) + np.linalg.inv(likelihoods_linear[j].C)
mu_est_curr = np.linalg.solve(C_est_inv_curr, np.dot(ld.C, ld.mu) + np.dot(likelihoods_linear[j].C, likelihoods_linear[j].mu))

ld.mu = mu_est_curr
ld.C = np.linalg.inv(C_est_inv_curr)

self.apd.gd.normalize(warn_unnorm=False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import numpy as np
from pyrecest.distributions.hypertorus.hypertoroidal_grid_distribution import HypertoroidalGridDistribution
import unittest
from pyrecest.distributions.hypertorus.toroidal_wrapped_normal_distribution import ToroidalWrappedNormalDistribution
from pyrecest.distributions.hypertorus.hypertoroidal_wrapped_normal_distribution import HypertoroidalWrappedNormalDistribution
from pyrecest.distributions.hypertorus.hypertoroidal_mixture import HypertoroidalMixture


class HypertoroidalGridDistributionTest(unittest.TestCase):

def test_get_grid(self):
grid = np.array([[1, 2, 3, 4], [1, 2, 3, 4]]).T
hgd = HypertoroidalGridDistribution(np.array([1, 1, 1, 1]) / ((2 * np.pi) ** 2), grid=grid)
np.testing.assert_allclose(hgd.get_grid(), grid)
np.testing.assert_allclose(np.shape(hgd.get_grid()), (4, 2))

def test_approx_vmmixture_t2(self):
dist = HypertoroidalMixture([
ToroidalWrappedNormalDistribution(np.array([1,1]),np.array([[1,0.5],[0.5,1]])),
ToroidalWrappedNormalDistribution(np.array([3,3]),np.array([[1,-0.5],[-0.5,1]]))],
np.array([0.5,0.5]))

hgd = HypertoroidalGridDistribution.from_distribution(dist, 31)
np.testing.assert_almost_equal(hgd.grid_values, dist.pdf(hgd.get_grid()), decimal=6)
self.assertTrue((np.min(hgd.get_grid(), axis=0) == np.array([0, 0])).all())
self.assertTrue((np.max(hgd.get_grid(), axis=0) > 6).all())
self.assertEqual(hgd.grid_type, 'CartesianProd')

def test_from_function_3D(self):
np.random.seed(0)
test_points = np.random.rand(3, 30)
C = np.array([[0.7, 0.4, 0.2], [0.4, 0.6, 0.1], [0.2, 0.1, 1]])
mu = 2 * np.pi * np.random.rand(3, 1)
hwnd = HypertoroidalWrappedNormalDistribution(mu, C)
n_grid_points = [27, 27, 27]
hfd_id = HypertoroidalGridDistribution.from_function(hwnd.pdf, n_grid_points, 3, 'CartesianProd')
self.assertIsInstance(hfd_id, HypertoroidalGridDistribution)
np.testing.assert_almost_equal(hfd_id.pdf(test_points), hwnd.pdf(test_points), decimal=6)


if __name__ == '__main__':
unittest.main()
Loading