From 1392e485980444822eb71a935812155ff8c3ecaa Mon Sep 17 00:00:00 2001 From: pavelkomarov Date: Fri, 20 Jun 2025 18:38:17 -0700 Subject: [PATCH 1/4] addressing #68 for the kalman module, discovering bugs in the process --- .../finite_difference/_finite_difference.py | 2 +- pynumdiff/kalman_smooth/_kalman_smooth.py | 825 +++++------------- 2 files changed, 222 insertions(+), 605 deletions(-) diff --git a/pynumdiff/finite_difference/_finite_difference.py b/pynumdiff/finite_difference/_finite_difference.py index 90eba03..8daa3ec 100644 --- a/pynumdiff/finite_difference/_finite_difference.py +++ b/pynumdiff/finite_difference/_finite_difference.py @@ -68,7 +68,7 @@ def _iterate_first_order(x, dt, num_iterations): - **x_hat** -- estimated (smoothed) x - **dxdt_hat** -- estimated derivative of x """ - w = np.arange(len(x)) / (len(x) - 1) # set up weights, [0., ... 1.0] + w = np.linspace(0, 1, len(x)) # set up weights, [0., ... 1.0] # forward backward passes for _ in range(num_iterations): diff --git a/pynumdiff/kalman_smooth/_kalman_smooth.py b/pynumdiff/kalman_smooth/_kalman_smooth.py index 57c1aae..2f72242 100644 --- a/pynumdiff/kalman_smooth/_kalman_smooth.py +++ b/pynumdiff/kalman_smooth/_kalman_smooth.py @@ -6,638 +6,255 @@ #################### # Helper functions # #################### - - -def __kalman_forward_update__(xhat_fm, P_fm, y, u, A, B, C, R, Q): - """ - :param xhat_fm: - :param P_fm: - :param y: - :param u: - :param A: - :param B: - :param C: - :param R: - :param Q: - :return: - """ - I = np.array(np.eye(A.shape[0])) - gammaW = np.array(np.eye(A.shape[0])) - - K_f = P_fm@C.T@np.linalg.pinv(C@P_fm@C.T + R) - - if y is not None and not np.isnan(y): - xhat_fp = xhat_fm + K_f@(y - C@xhat_fm) - P_fp = (I - K_f@C)@P_fm - xhat_fm = A@xhat_fp + B@u - P_fm = A@P_fp@A.T + gammaW@Q@gammaW.T - else: - xhat_fp = xhat_fm - P_fp = (I - K_f@C)@P_fm - xhat_fm = A@xhat_fp + B@u - P_fm = A@P_fp@A.T + gammaW@Q@gammaW.T - - return xhat_fp, xhat_fm, P_fp, P_fm - - -def __kalman_forward_filter__(xhat_fm, P_fm, y, u, A, B, C, R, Q): - """ - :param xhat_fm: - :param P_fm: - :param y: - :param u: - :param A: - :param B: - :param C: - :param R: - :param Q: - :return: - """ - if u is None: - u = np.array(np.zeros([B.shape[1], y.shape[1]])) - - xhat_fp = None - P_fp = [] - P_fm = [P_fm] - - for i in range(y.shape[1]): - _xhat_fp, _xhat_fm, _P_fp, _P_fm = __kalman_forward_update__(xhat_fm[:, [-1]], P_fm[-1], y[:, [i]], u[:, [i]], - A, B, C, R, Q) - if xhat_fp is None: - xhat_fp = _xhat_fp - else: - xhat_fp = np.hstack((xhat_fp, _xhat_fp)) - xhat_fm = np.hstack((xhat_fm, _xhat_fm)) - - P_fp.append(_P_fp) - P_fm.append(_P_fm) - - return xhat_fp, xhat_fm, P_fp, P_fm - - -def __kalman_backward_smooth__(xhat_fp, xhat_fm, P_fp, P_fm, A): +def _kalman_forward_filter(xhat0, P0, y, A, C, Q, R, u=None, B=None): + """Run the forward pass of a Kalman filter + :param np.array xhat0: initial estimate of state (often 0) one step before the first measurement + :param np.array P0: initial guess of state covariance (often identity matrix) before the first measurement + :param np.array y: series of measurements, stacked along axis 0. In this case just the raw noisy data (fully observable) + :param np.array A: discrete-time state transition matrix + :param np.array C: measurement matrix + :param np.array Q: discrete-time process noise covariance + :param np.array R: measurement noise covariance + :param np.array u: optional control input + :param np.array B: optional control matrix + :return: tuple[np.array, np.array, np.array, np.array] of\n + - **xhat_pre** -- a priori estimates of xhat, with axis=0 the batch dimension, so xhat[n] gets the nth step + - **xhat_post** -- a posteriori estimates of xhat + - **P_pre** -- a priori estimates of P + - **P_post** -- a posteriori estimates of P """ - :param xhat_fp: - :param xhat_fm: - :param P_fp: - :param P_fm: - :param A: - :return: - """ - N = xhat_fp.shape[1] - - xhat_smooth = copy.copy(xhat_fp) - P_smooth = copy.copy(P_fp) - for t in range(N-2, -1, -1): - L = P_fp[t]@A.T@np.linalg.pinv(P_fm[t]) - xhat_smooth[:, [t]] = xhat_fp[:, [t]] + L@(xhat_smooth[:, [t+1]] - xhat_fm[:, [t+1]]) - P_smooth[t] = P_fp[t] - L@(P_smooth[t+1] - P_fm[t+1]) - - return xhat_smooth, P_smooth - - -##################### -# Constant Velocity # -##################### - - -def __constant_velocity__(x, dt, params, options=None): - """ - Run a forward-backward constant acceleration RTS Kalman smoother to estimate the derivative. - - :param x: (np.array of floats, 1xN) time series to differentiate - :param dt: (float) time step size - :param params: (list) [r, : (float) covariance of the x noise - q] : (float) covariance of the constant velocity model - :param options: (dict) {'backward'} : (bool) run smoother backwards in time - :return: - """ - if options is None: - options = {'backward': False} - - r, q = params - - if len(x.shape) == 2: - y = x - else: - y = np.reshape(x, [1, len(x)]) - - A = np.array([[1, dt], [0, 1]]) - B = np.array([[0], [0]]) - C = np.array([[1, 0]]) - R = np.array([[r]]) - Q = np.array([[1e-16, 0], [0, q]]) - x0 = np.array([[x[0,0]], [0]]) - P0 = np.array(100*np.eye(2)) - u = None + if u is None: u = np.zeros((B.shape[1], y.shape[0])) + xhat = xhat0 + P = P0 + # _pre variables are a priori predictions based on only past information + # _post variables are a posteriori combinations of all information available at the current step + xhat_pre = []; xhat_post = []; P_pre = []; P_post = [] - - if options['backward']: - A = np.linalg.pinv(A) - y = y[:, ::-1] - - xhat_fp, xhat_fm, P_fp, P_fm = __kalman_forward_filter__(x0, P0, y, u, A, B, C, R, Q) - xhat_smooth, _ = __kalman_backward_smooth__(xhat_fp, xhat_fm, P_fp, P_fm, A) - - x_hat = np.ravel(xhat_smooth[0, :]) - dxdt_hat = np.ravel(xhat_smooth[1, :]) - - if not options['backward']: - return x_hat, dxdt_hat - - return x_hat[::-1], dxdt_hat[::-1] - - -def constant_velocity(x, dt, params, options=None): + for n in range(y.shape[0]): + xhat_ = A @ xhat + B @ u # ending underscores denote an a priori prediction + P_ = A @ P @ A.T + Q + xhat_pre.append(xhat_) # the [n]th index holds _{n|n-1} info + P_pre.append(P_) + + xhat = xhat_ # handle missing data + P = P_ + if not np.isnan(y[n]): + K = P_ @ C.T @ np.linalg.inv(C @ P_ @ C.T + R) + xhat += K @ (y[n] - C @ xhat_) + P -= K @ C @ P_ + xhat_post.append(xhat) # the [n]th index holds _{n|n} info + P_post.append(P) + + return np.stack(xhat_pre, axis=0), np.stack(xhat_post, axis=0), np.stack(P_pre, axis=0), np.stack(P_post, axis=0) + + +def _kalman_backward_smooth(A, xhat_pre, xhat_post, P_pre, P_post): + """Do the additional Rauch-Tung-Striebel smoothing step + :param A: discrete-time state transition matrix + :param xhat_pre: a priori estimates of xhat + :param xhat_post: a posteriori estimates of xhat + :param P_pre: a priori estimates of P + :param P_post: a posteriori estimates of P + :return: tuple[np.array, np.array] of\n + - **xhat_smooth** -- RTS smoothed xhat + - **P_smooth** -- RTS smoothed P """ - Run a forward-backward constant velocity RTS Kalman smoother to estimate the derivative. - - :param x: array of time series to differentiate - :type x: np.array (float) - - :param dt: time step size - :type dt: float - - :param params: a list of two elements: - - - r: covariance of the x noise - - q: covariance of the constant velocity model + xhat_smooth = [xhat_post[-1]] + P_smooth = [P_post[-1]] + for n in range(xhat_pre.shape[0]-2, -1, -1): + C_RTS = P_post[n] @ A.T @ np.linalg.inv(P_pre[n+1]) # the [n+1]th index holds _{n+1|n} info + xhat_smooth.append(xhat_post[n] + C_RTS @ (xhat_smooth[-1] - xhat_pre[n+1])) # The original authors use C, not to + P_smooth.append(P_post[n] + C_RTS @ (P_smooth[-1] - P_pre[n+1]) @ C_RTS.T) # be confused with the measurement matrix - :type params: list (float) + return np.stack(xhat_smooth[::-1], axis=0), np.stack(P_smooth[::-1], axis=0) # reverse lists - :param options: a dictionary indicating whether to run smoother forwards and backwards - (usually achieves better estimate at end points) - :type params: dict {'forwardbackward': boolean}, optional - - :return: a tuple consisting of: - - - x_hat: estimated (smoothed) x - - dxdt_hat: estimated derivative of x - - - :rtype: tuple -> (np.array, np.array) +def _RTS_smooth(xhat0, P0, y, A, C, Q, R, u=None, B=None) + """forward-backward Kalman/Rauch-Tung-Striebel smoother. For params see the helper functions. """ - if len(x.shape) == 2: - pass - else: - x = np.reshape(x, [1, len(x)]) - - if options is None: - options = {'forwardbackward': True} - - if options['forwardbackward']: - x_hat_f, smooth_dxdt_hat_f = __constant_velocity__(x, dt, params, options={'backward': False}) - x_hat_b, smooth_dxdt_hat_b = __constant_velocity__(x, dt, params, options={'backward': True}) - - w = np.arange(0, len(x_hat_f), 1) - w = w/np.max(w) - - x_hat = x_hat_f*w + x_hat_b*(1-w) - smooth_dxdt_hat = smooth_dxdt_hat_f*w + smooth_dxdt_hat_b*(1-w) - - smooth_dxdt_hat_corrected = np.mean((smooth_dxdt_hat, smooth_dxdt_hat_f), axis=0) - - return x_hat, smooth_dxdt_hat_corrected - - return __constant_velocity__(x, dt, params, options={'backward': False}) - - -######################### -# Constant Acceleration # -######################### + xhat_pre, xhat_post, P_pre, P_post = _kalman_forward_filter(xhat0, P0, x, A, C, Q, R) # noisy x are the "y" in Kalman-land + xhat_smooth, _ = _kalman_backward_smooth(A, xhat_pre, xhat_post, P_pre, P_post) # not doing anything with P_smooth + return xhat_smooth -def __constant_acceleration__(x, dt, params, options=None): +######################################### +# Constant 1st, 2nd, and 3rd derivative # +######################################### +def _constant_derivative(x, P0, A, C, R, Q, forwardbackward) + """Helper for `constant_{velocity,acceleration,jerk}` functions, because there was a lot of + repeated code. """ - Run a forward-backward constant acceleration RTS Kalman smoother to estimate the derivative. - - :param x: array of time series to differentiate - :type x: np.array (float) - - :param dt: time step size - :type dt: float - - :param params: a list of two elements: - - - r: covariance of the x noise - - q: covariance of the constant velocity model - - :type params: list (float) + xhat0 = np.zeros(A.shape[0]); xhat0[0] = x[0] + xhat_smooth = _RTS_smooth(xhat0, P0, x, A, C, Q, R) # noisy x are the "y" in Kalman-land + x_hat_forward = xhat_smooth[:, 0] # first dimension is time, so slice first element at all times + dxdt_hat_forward = xhat_smooth[:, 1] + + if not forwardbackward: # bound out here if not doing the same in reverse and then combining + return x_hat_forward, dxdt_hat_forward + + xhat0[0] = x[-1] # starting from the other end of the signal + Ainv = np.linalg.inv(A) # dynamics are inverted + xhat_smooth = _RTS_smooth(xhat0, P0, x[::-1], Ainv, C, Q, R) # noisy x are the "y" in Kalman-land + x_hat_backward = xhat_smooth[:, 0][::-1] # the result is backwards still, so reverse it + dxdt_hat_backward = xhat_smooth[:, 1][::-1] + + w = np.linspace(0, 1, len(x)) + x_hat = x_hat_forward*w + x_hat_backward*(1-w) + dxdt_hat = dxdt_hat_forward*w + dxdt_hat_backward*(1-w) + + dxdt_hat_corrected = np.mean((dxdt_hat, dxdt_hat_forward), axis=0) # What is this line for? + return x_hat, dxdt_hat_corrected + + +def constant_velocity(x, dt, params=None, options=None, r=None, q=None, forwardbackward=True): + """Run a forward-backward constant velocity RTS Kalman smoother to estimate the derivative. + + :param np.array[float] x: data series to differentiate + :param float dt: time step size + :param list[float] params: (**deprecated**, prefer :code:`r` and :code:`q`) + :param options: (**deprecated**, prefer :code:`forwardbackward`) + a dictionary consisting of {'forwardbackward': (bool)} + param float r: variance of the signal noise + param float q: variance of the constant velocity model + :param bool forwardbackward: indicates whether to run smoother forwards and backwards + (usually achieves better estimate at end points) + + :return: tuple[np.array, np.array] of\n + - **x_hat** -- estimated (smoothed) x + - **dxdt_hat** -- estimated derivative of x + """ + if params != None: # boilerplate backwards compatibility code + warn("""`params` and `options` parameters will be removed in a future version. Use `r`, + `q`, and `forwardbackward` instead.""", DeprecationWarning) + r, q = params + if options != None: + if 'forwardbackward' in options: forwardbackward = options['forwardbackward'] + elif r == None or q == None: + raise ValueError("`q` and `r` must be given.") + + A = np.array([[1, dt], [0, 1]]) # states are x, x'. This basically does an Euler update. + C = np.array([[1, 0]]) # we measure only y = noisy x + R = np.array([[r]]) + Q = np.array([[1e-16, 0], [0, q]]) # uncertainty is around the velocity + P0 = np.array(100*np.eye(2)) # Why is this one magnitude 100 vs the other ones being 10? + return _constant_derivative(x, P0, A, C, R, Q, forwardbackward) - :param options: a dictionary indicating whether to run smoother backwards in time - :type params: dict {'backward': boolean}, optional - :return: a tuple consisting of: +def constant_acceleration(x, dt, params=None, options=None, r=None, q=None, forwardbackward=True): + """Run a forward-backward constant acceleration RTS Kalman smoother to estimate the derivative. - - x_hat: estimated (smoothed) x - - dxdt_hat: estimated derivative of x + :param np.array[float] x: data series to differentiate + :param float dt: time step size + :param list[float] params: (**deprecated**, prefer :code:`r` and :code:`q`) + :param options: (**deprecated**, prefer :code:`forwardbackward`) + a dictionary consisting of {'forwardbackward': (bool)} + param float r: variance of the signal noise + param float q: variance of the constant velocity model + :param bool forwardbackward: indicates whether to run smoother forwards and backwards + (usually achieves better estimate at end points) - :rtype: tuple -> (np.array, np.array) + :return: tuple[np.array, np.array] of\n + - **x_hat** -- estimated (smoothed) x + - **dxdt_hat** -- estimated derivative of x """ - - if options is None: - options = {'backward': False} - - r, q = params - - if len(x.shape) == 2: - y = x - else: - y = np.reshape(x, [1, len(x)]) - - A = np.array([[1, dt, 0], - [0, 1, dt], - [0, 0, 1]]) - B = np.array([[0], [0], [0]]) - C = np.array([[1, 0, 0]]) + if params != None: # boilerplate backwards compatibility code + warn("""`params` and `options` parameters will be removed in a future version. Use `r`, + `q`, and `forwardbackward` instead.""", DeprecationWarning) + r, q = params + if options != None: + if 'forwardbackward' in options: forwardbackward = options['forwardbackward'] + elif r == None or q == None: + raise ValueError("`q` and `r` must be given.") + + A = np.array([[1, dt, (dt**2)/2], # states are x, x', x" + [0, 1, dt], + [0, 0, 1]]) + C = np.array([[1, 0, 0]]) # we measure only y = noisy x R = np.array([[r]]) Q = np.array([[1e-16, 0, 0], - [0, 1e-16, 0], - [0, 0, q]]) - x0 = np.array([[x[0,0]], [0], [0]]) + [0, 1e-16, 0], + [0, 0, q]]) # uncertainty is around the acceleration P0 = np.array(10*np.eye(3)) - u = None - - if options['backward']: - A = np.linalg.pinv(A) - y = y[:, ::-1] - - xhat_fp, xhat_fm, P_fp, P_fm = __kalman_forward_filter__(x0, P0, y, u, A, B, C, R, Q) - xhat_smooth, _ = __kalman_backward_smooth__(xhat_fp, xhat_fm, P_fp, P_fm, A) - - x_hat = np.ravel(xhat_smooth[0, :]) - dxdt_hat = np.ravel(xhat_smooth[1, :]) - - if not options['backward']: - return x_hat, dxdt_hat - - return x_hat[::-1], dxdt_hat[::-1] - - -def constant_acceleration(x, dt, params, options=None): - """ - Run a forward-backward constant acceleration RTS Kalman smoother to estimate the derivative. - - :param x: array of time series to differentiate - :type x: np.array (float) - - :param dt: time step size - :type dt: float - - :param params: a list of two elements: - - r: covariance of the x noise - - q: covariance of the constant velocity model + return _constant_derivative(x, P0, A, C, R, Q, forwardbackward) - :type params: list (float) +def constant_jerk(x, dt, params=None, options=None, r=None, q=None, forwardbackward=True): + """Run a forward-backward constant jerk RTS Kalman smoother to estimate the derivative. - :param options: a dictionary indicating whether to run smoother forwards and backwards - (usually achieves better estimate at end points) - :type params: dict {'forwardbackward': boolean}, optional + :param np.array[float] x: data series to differentiate + :param float dt: time step size + :param list[float] params: (**deprecated**, prefer :code:`r` and :code:`q`) + :param options: (**deprecated**, prefer :code:`forwardbackward`) + a dictionary consisting of {'forwardbackward': (bool)} + param float r: variance of the signal noise + param float q: variance of the constant velocity model + :param bool forwardbackward: indicates whether to run smoother forwards and backwards + (usually achieves better estimate at end points) - :return: a tuple consisting of: - - - x_hat: estimated (smoothed) x - - dxdt_hat: estimated derivative of x - - - :rtype: tuple -> (np.array, np.array) - """ - if len(x.shape) == 2: - pass - else: - x = np.reshape(x, [1, len(x)]) - - if options is None: - options = {'forwardbackward': True} - - if options['forwardbackward']: - x_hat_f, smooth_dxdt_hat_f = __constant_acceleration__(x, dt, params, options={'backward': False}) - x_hat_b, smooth_dxdt_hat_b = __constant_acceleration__(x, dt, params, options={'backward': True}) - - w = np.arange(0, len(x_hat_f), 1) - w = w/np.max(w) - - x_hat = x_hat_f*w + x_hat_b*(1-w) - smooth_dxdt_hat = smooth_dxdt_hat_f*w + smooth_dxdt_hat_b*(1-w) - - smooth_dxdt_hat_corrected = np.mean((smooth_dxdt_hat, smooth_dxdt_hat_f), axis=0) - - return x_hat, smooth_dxdt_hat_corrected - - return __constant_acceleration__(x, dt, params, options={'backward': False}) - - -################# -# Constant Jerk # -################# - - -def __constant_jerk__(x, dt, params, options=None): + :return: tuple[np.array, np.array] of\n + - **x_hat** -- estimated (smoothed) x + - **dxdt_hat** -- estimated derivative of x """ - Run a forward-backward constant jerk RTS Kalman smoother to estimate the derivative. - - :param x: array of time series to differentiate - :type x: np.array (float) - - :param dt: time step size - :type dt: float - - :param params: a list of two elements: - - - r: covariance of the x noise - - q: covariance of the constant velocity model - - :type params: list (float) - - - :param options: a dictionary indicating whether to run smoother backwards in time - :type params: dict {'backward': boolean}, optional - - :return: a tuple consisting of: - - - x_hat: estimated (smoothed) x - - dxdt_hat: estimated derivative of x - - :rtype: tuple -> (np.array, np.array) - """ - - if options is None: - options = {'backward': False} - - r, q = params - - if len(x.shape) == 2: - y = x - else: - y = np.reshape(x, [1, len(x)]) - - A = np.array([[1, dt, 0, 0], - [0, 1, dt, 0], - [0, 0, 1, dt], - [0, 0, 0, 1]]) - B = np.array([[0], [0], [0], [0]]) - C = np.array([[1, 0, 0, 0]]) + if params != None: # boilerplate backwards compatibility code + warn("""`params` and `options` parameters will be removed in a future version. Use `r`, + `q`, and `forwardbackward` instead.""", DeprecationWarning) + r, q = params + if options != None: + if 'forwardbackward' in options: forwardbackward = options['forwardbackward'] + elif r == None or q == None: + raise ValueError("`q` and `r` must be given.") + + A = np.array([[1, dt, (dt**2)/2, (dt**3)/6], # states are x, x', x", x'" + [0, 1, dt, (dt**2)/2], + [0, 0, 1, dt], + [0, 0, 0, 1]]) + C = np.array([[1, 0, 0, 0]]) # we measure only y = noisy x R = np.array([[r]]) - Q = np.array([[1e-16, 0, 0, 0], + Q = np.array([[1e-16, 0, 0, 0], [0, 1e-16, 0, 0], [0, 0, 1e-16, 0], - [0, 0, 0, q]]) - x0 = np.array([[x[0,0]], [0], [0], [0]]) + [0, 0, 0, q]]) # uncertainty is around the jerk P0 = np.array(10*np.eye(4)) - y = np.array(x) - u = None - - if options['backward']: - A = np.linalg.pinv(A) - y = y[:, ::-1] - - xhat_fp, xhat_fm, P_fp, P_fm = __kalman_forward_filter__(x0, P0, y, u, A, B, C, R, Q) - xhat_smooth, _ = __kalman_backward_smooth__(xhat_fp, xhat_fm, P_fp, P_fm, A) - - x_hat = np.ravel(xhat_smooth[0,:]) - dxdt_hat = np.ravel(xhat_smooth[1,:]) - - if not options['backward']: - return x_hat, dxdt_hat - - return x_hat[::-1], dxdt_hat[::-1] - - -def constant_jerk(x, dt, params, options=None): - """ - Run a forward-backward constant jerk RTS Kalman smoother to estimate the derivative. - - :param x: array of time series to differentiate - :type x: np.array (float) - - :param dt: time step size - :type dt: float - - :param params: a list of two elements: - - - r: covariance of the x noise - - q: covariance of the constant velocity model - - :type params: list (float) - - - :param options: a dictionary indicating whether to run smoother forwards and backwards - (usually achieves better estimate at end points) - :type params: dict {'forwardbackward': boolean}, optional - - :return: a tuple consisting of: - - - x_hat: estimated (smoothed) x - - dxdt_hat: estimated derivative of x - - - :rtype: tuple -> (np.array, np.array) - """ - if len(x.shape) == 2: - pass - else: - x = np.reshape(x, [1, len(x)]) - - if options is None: - options = {'forwardbackward': True} - - if options['forwardbackward']: - x_hat_f, smooth_dxdt_hat_f = __constant_jerk__(x, dt, params, options={'backward': False}) - x_hat_b, smooth_dxdt_hat_b = __constant_jerk__(x, dt, params, options={'backward': True}) - - w = np.arange(0, len(x_hat_f), 1) - w = w/np.max(w) - - x_hat = x_hat_f*w + x_hat_b*(1-w) - smooth_dxdt_hat = smooth_dxdt_hat_f*w + smooth_dxdt_hat_b*(1-w) - - smooth_dxdt_hat_corrected = np.mean((smooth_dxdt_hat, smooth_dxdt_hat_f), axis=0) - - return x_hat, smooth_dxdt_hat_corrected - - return __constant_jerk__(x, dt, params, options={'backward': False}) - - -def known_dynamics(x, params, u=None, options=None): - """ - Run a forward RTS Kalman smoother given known dynamics to estimate the derivative. - - :param x: matrix of time series of (noisy) measurements - :type x: np.array (float) - - :param params: a list of: - - x0: inital condition, matrix of Nx1, N = number of states - - P0: initial covariance matrix of NxN - - A: dynamics matrix, NxN - - B: control input matrix, NxM, M = number of measurements - - C: measurement dynamics, MxN - - R: covariance matrix for the measurements, MxM - - Q: covariance matrix for the model, NxN - :type params: list (matrix) - - :param u: matrix of time series of control inputs - :type u: np.array (float) - - :param options: a dictionary indicating whether to run smoother - :type params: dict {'smooth': boolean}, optional - - :return: matrix: - - xhat_smooth: smoothed estimates of the full state x - - :rtype: tuple -> (np.array, np.array) - """ - if len(x.shape) == 2: - y = x - else: - y = np.reshape(x, [1, len(x)]) - - if options is None: - options = {'smooth': True} - - x0, P0, A, B, C, R, Q = params - - xhat_fp, xhat_fm, P_fp, P_fm = __kalman_forward_filter__(x0, P0, y, u, A, B, C, R, Q) - xhat_smooth, _ = __kalman_backward_smooth__(xhat_fp, xhat_fm, P_fp, P_fm, A) - - if not options['smooth']: - return xhat_fp - - return xhat_smooth - - - -################################################################################################### -# Constant Acceleration with Savitzky-Golay pre-estimate (not worth the parameter tuning trouble) # -################################################################################################### - - -def __savgol_const_accel__(x, sg_dxdt_hat, dt, params, options=None): - """ - Run a forward-backward constant acceleration RTS Kalman smoother to estimate the derivative, where initial estimates of the velocity are first estimated using the savitzky-golay filter. - - :param x: array of time series to differentiate - :type x: np.array (float) - - :param sg_dxdt_hat: initial velocity estimate - :type sg_dxdt_hat: np.array (float) - - :param dt: time step size - :type dt: float - - :param params: a list of two elements: - - r1: covariance of the x noise - - r2: covariance of the vel noise - - q: covariance of the constant velocity model - - :type params: list (float) - - - :param options: a dictionary indicating whether to run smoother backwards in time - :type params: dict {'backward': boolean}, optional - - :return: a tuple consisting of: - - - x_hat: estimated (smoothed) x - - dxdt_hat: estimated derivative of x - - :rtype: tuple -> (np.array, np.array) - """ - - if options is None: - options = {'backward': False} - - r1, r2, q = params - A = np.array([[1, dt, 0], - [0, 1, dt], - [0, 0, 1]]) - B = np.array([[0], [0], [0]]) - C = np.array([[1, 0, 0], - [0, 1, 0]]) - R = np.array([[r1, 0], - [0, r2]]) - Q = np.array([[1e-16, 0, 0], - [0, 1e-16, 0], - [0, 0, q]]) - x0 = np.array([[x[0]], [sg_dxdt_hat[0]], [0]]) - P0 = np.array(10*np.eye(3)) - y = np.array(np.vstack((x, sg_dxdt_hat))) - u = None - - if options['backward']: - A = np.linalg.pinv(A) - y = y[:, ::-1] - - xhat_fp, xhat_fm, P_fp, P_fm = __kalman_forward_filter__(x0, P0, y, u, A, B, C, R, Q) - xhat_smooth, _ = __kalman_backward_smooth__(xhat_fp, xhat_fm, P_fp, P_fm, A) - - x_hat = np.ravel(xhat_smooth[0, :]) - dxdt_hat = np.ravel(xhat_smooth[1, :]) - - if not options['backward']: - return x_hat, dxdt_hat - - return x_hat[::-1], dxdt_hat[::-1] - - -def savgol_const_accel(x, dt, params, options=None): - """ - Run a forward-backward constant acceleration RTS Kalman smoother to estimate the derivative, where initial estimates of the velocity are first estimated using the savitzky-golay filter. - - :param x: array of time series to differentiate - :type x: np.array (float) - - :param dt: time step size - :type dt: float - - :param params: a list of six elements: - - N: for savgoldiff, order of the polynomial - - window_size: for savgoldiff, size of the sliding window, must be odd (if not, 1 is added) - - smoothing_win: for savgoldiff, size of the window used for gaussian smoothing, a good default is window_size, but smaller for high frequnecy data - - r1: covariance of the x noise - - r2: covariance of the vel noise - - q: covariance of the constant velocity model - - :type params: list (float) - - - :param options: a dictionary indicating whether to run smoother forwards and backwards - (usually achieves better estimate at end points) - :type params: dict {'forwardbackward': boolean}, optional - - :return: a tuple consisting of: - - - x_hat: estimated (smoothed) x - - dxdt_hat: estimated derivative of x - - - :rtype: tuple -> (np.array, np.array) - """ - if options is None: - options = {'forwardbackward': True} - - N, window_size, smoothing_win, r1, r2, q = params - - _, sg_dxdt_hat = savgoldiff(x, dt, [N, window_size, smoothing_win]) - - if options['forwardbackward']: - x_hat_f, smooth_dxdt_hat_f = __savgol_const_accel__(x, sg_dxdt_hat, dt, [r1, r2, q], - options={'backward': False}) - x_hat_b, smooth_dxdt_hat_b = __savgol_const_accel__(x, sg_dxdt_hat, dt, [r1, r2, q], - options={'backward': True}) - - w = np.arange(0, len(x_hat_f), 1) - w = w/np.max(w) - - x_hat = x_hat_f*w + x_hat_b*(1-w) - smooth_dxdt_hat = smooth_dxdt_hat_f*w + smooth_dxdt_hat_b*(1-w) - - smooth_dxdt_hat_corrected = np.mean((smooth_dxdt_hat, smooth_dxdt_hat_f), axis=0) - - return x_hat, smooth_dxdt_hat_corrected - return __constant_acceleration__(x, dt, params, options={'backward': False}) + return _constant_derivative(x, P0, A, C, R, Q, forwardbackward) + + +def known_dynamics(x, params, u=None, options=None, xhat0=None, P0=None, A=None, + B=0, C=None, Q=None, R=None, smooth=True): + """Run a forward RTS Kalman smoother given known dynamics. + + :param np.array[float] x: data series of noisy measurements + :param list params: (**deprecated**, prefer :code:`xhat0`, :code:`P0`, :code:`A`, + :code:`B`, :code:`C`, :code:`R`, and :code:`Q`), a list in the order here (note flip of Q and R) + :param np.array[float] u: series of control inputs + :param options: (**deprecated**, prefer :code:`smooth`) + a dictionary consisting of {'smooth': (bool)} + :param np.array xhat0: inital condition, length N = number of states + :param np.array P0: initial covariance matrix of NxN + :param np.array A: dynamics matrix, NxN + :param np.array B: control input matrix, NxM, M = number of measurements + :param np.array C: measurement dynamics, MxN + :param np.array Q: covariance matrix for the model, NxN + :param np.array R: covariance matrix for the measurements, MxM + :parma bool smooth: whether to run the RTS smoother step + + :return: np.array **x_hat** -- estimated (smoothed) x + """ # Why not also returning derivative here? + if params != None: + warn("""`params` and `options` parameters will be removed in a future version. Use `xhat0`, + `P0`, `A`, `B`, `C`, `Q`, `R`, and `smooth` instead.""", DeprecationWarning) + xhat0, P0, A, B, C, R, Q = params + if options != None: + if 'smooth' in options: smooth = options['smooth'] + elif None in [xhat0, P0, A, C, R, Q]: + raise ValueError("`xhat0`, `P0`, `A`, `C`, `Q`, and `R` must be given.") + + xhat_pre, xhat_post, P_pre, P_post = _kalman_forward_filter(xhat0, P0, x, A, C, Q, R, u, B) # noisy x are the "y" in Kalman-land + if not smooth: + return xhat_post + + xhat_smooth, _ = _kalman_backward_smooth(A, xhat_pre, xhat_post, P_pre, P_post) + return xhat_smooth # We're not calculating a derivative here. Why not? From a4d9697a97dcf8e9812897a1197071d32452cf7a Mon Sep 17 00:00:00 2001 From: pavelkomarov Date: Fri, 20 Jun 2025 19:22:01 -0700 Subject: [PATCH 2/4] now the tests'll run --- pynumdiff/kalman_smooth/__init__.py | 4 ++-- pynumdiff/kalman_smooth/_kalman_smooth.py | 13 ++++++------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pynumdiff/kalman_smooth/__init__.py b/pynumdiff/kalman_smooth/__init__.py index 6cf9932..2d1aa8d 100644 --- a/pynumdiff/kalman_smooth/__init__.py +++ b/pynumdiff/kalman_smooth/__init__.py @@ -1,5 +1,5 @@ """This module implements Kalman filters """ -from ._kalman_smooth import constant_velocity, constant_acceleration, constant_jerk, known_dynamics, savgol_const_accel +from ._kalman_smooth import constant_velocity, constant_acceleration, constant_jerk, known_dynamics -__all__ = ['constant_velocity', 'constant_acceleration', 'constant_jerk', 'known_dynamics', 'savgol_const_accel'] # So these get treated as direct members of the module by sphinx +__all__ = ['constant_velocity', 'constant_acceleration', 'constant_jerk', 'known_dynamics'] # So these get treated as direct members of the module by sphinx diff --git a/pynumdiff/kalman_smooth/_kalman_smooth.py b/pynumdiff/kalman_smooth/_kalman_smooth.py index 2f72242..b6555af 100644 --- a/pynumdiff/kalman_smooth/_kalman_smooth.py +++ b/pynumdiff/kalman_smooth/_kalman_smooth.py @@ -1,7 +1,5 @@ -import copy import numpy as np - -from pynumdiff.linear_model import savgoldiff +from warnings import warn #################### # Helper functions # @@ -23,7 +21,8 @@ def _kalman_forward_filter(xhat0, P0, y, A, C, Q, R, u=None, B=None): - **P_pre** -- a priori estimates of P - **P_post** -- a posteriori estimates of P """ - if u is None: u = np.zeros((B.shape[1], y.shape[0])) + if B is None: B = np.zeros((A.shape[0], 1)) + if u is None: u = np.zeros(B.shape[1]) xhat = xhat0 P = P0 @@ -70,10 +69,10 @@ def _kalman_backward_smooth(A, xhat_pre, xhat_post, P_pre, P_post): return np.stack(xhat_smooth[::-1], axis=0), np.stack(P_smooth[::-1], axis=0) # reverse lists -def _RTS_smooth(xhat0, P0, y, A, C, Q, R, u=None, B=None) +def _RTS_smooth(xhat0, P0, y, A, C, Q, R, u=None, B=None): """forward-backward Kalman/Rauch-Tung-Striebel smoother. For params see the helper functions. """ - xhat_pre, xhat_post, P_pre, P_post = _kalman_forward_filter(xhat0, P0, x, A, C, Q, R) # noisy x are the "y" in Kalman-land + xhat_pre, xhat_post, P_pre, P_post = _kalman_forward_filter(xhat0, P0, y, A, C, Q, R) # noisy x are the "y" in Kalman-land xhat_smooth, _ = _kalman_backward_smooth(A, xhat_pre, xhat_post, P_pre, P_post) # not doing anything with P_smooth return xhat_smooth @@ -81,7 +80,7 @@ def _RTS_smooth(xhat0, P0, y, A, C, Q, R, u=None, B=None) ######################################### # Constant 1st, 2nd, and 3rd derivative # ######################################### -def _constant_derivative(x, P0, A, C, R, Q, forwardbackward) +def _constant_derivative(x, P0, A, C, R, Q, forwardbackward): """Helper for `constant_{velocity,acceleration,jerk}` functions, because there was a lot of repeated code. """ From 697d8d45faf96b02f708346dedf35f631999a591 Mon Sep 17 00:00:00 2001 From: pavelkomarov Date: Mon, 23 Jun 2025 13:18:43 -0700 Subject: [PATCH 3/4] velocity to accel and jerk --- pynumdiff/kalman_smooth/_kalman_smooth.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pynumdiff/kalman_smooth/_kalman_smooth.py b/pynumdiff/kalman_smooth/_kalman_smooth.py index b6555af..16c2d57 100644 --- a/pynumdiff/kalman_smooth/_kalman_smooth.py +++ b/pynumdiff/kalman_smooth/_kalman_smooth.py @@ -150,7 +150,7 @@ def constant_acceleration(x, dt, params=None, options=None, r=None, q=None, forw :param options: (**deprecated**, prefer :code:`forwardbackward`) a dictionary consisting of {'forwardbackward': (bool)} param float r: variance of the signal noise - param float q: variance of the constant velocity model + param float q: variance of the constant acceleration model :param bool forwardbackward: indicates whether to run smoother forwards and backwards (usually achieves better estimate at end points) @@ -189,7 +189,7 @@ def constant_jerk(x, dt, params=None, options=None, r=None, q=None, forwardbackw :param options: (**deprecated**, prefer :code:`forwardbackward`) a dictionary consisting of {'forwardbackward': (bool)} param float r: variance of the signal noise - param float q: variance of the constant velocity model + param float q: variance of the constant jerk model :param bool forwardbackward: indicates whether to run smoother forwards and backwards (usually achieves better estimate at end points) From 26dd13b6727ca66173957536654a74cb72873166 Mon Sep 17 00:00:00 2001 From: pavelkomarov Date: Wed, 25 Jun 2025 16:01:38 -0700 Subject: [PATCH 4/4] getting kalman module up to speed on new, passing tests --- pynumdiff/kalman_smooth/_kalman_smooth.py | 3 +- pynumdiff/tests/test_diff_methods.py | 38 ++++++++++++---- pynumdiff/tests/test_kalman_smooth.py | 54 ----------------------- 3 files changed, 31 insertions(+), 64 deletions(-) delete mode 100644 pynumdiff/tests/test_kalman_smooth.py diff --git a/pynumdiff/kalman_smooth/_kalman_smooth.py b/pynumdiff/kalman_smooth/_kalman_smooth.py index 16c2d57..febd68b 100644 --- a/pynumdiff/kalman_smooth/_kalman_smooth.py +++ b/pynumdiff/kalman_smooth/_kalman_smooth.py @@ -102,8 +102,7 @@ def _constant_derivative(x, P0, A, C, R, Q, forwardbackward): x_hat = x_hat_forward*w + x_hat_backward*(1-w) dxdt_hat = dxdt_hat_forward*w + dxdt_hat_backward*(1-w) - dxdt_hat_corrected = np.mean((dxdt_hat, dxdt_hat_forward), axis=0) # What is this line for? - return x_hat, dxdt_hat_corrected + return x_hat, dxdt_hat def constant_velocity(x, dt, params=None, options=None, r=None, q=None, forwardbackward=True): diff --git a/pynumdiff/tests/test_diff_methods.py b/pynumdiff/tests/test_diff_methods.py index def67bc..d56fff5 100644 --- a/pynumdiff/tests/test_diff_methods.py +++ b/pynumdiff/tests/test_diff_methods.py @@ -5,7 +5,7 @@ from ..linear_model import lineardiff, polydiff, savgoldiff, spectraldiff from ..total_variation_regularization import velocity, acceleration, jerk, iterative_velocity -from ..kalman_smooth import * # constant_velocity, constant_acceleration, constant_jerk, known_dynamics +from ..kalman_smooth import constant_velocity, constant_acceleration, constant_jerk, known_dynamics from ..smooth_finite_difference import mediandiff, meandiff, gaussiandiff, friedrichsdiff, butterdiff, splinediff from ..finite_difference import first_order, second_order # Function aliases for testing cases where parameters change the behavior in a big way @@ -28,7 +28,7 @@ def iterated_first_order(*args, **kwargs): return first_order(*args, **kwargs) (5, r"$x(t)=\frac{\sin(8t)}{(t+0.1)^{3/2}}$", lambda t: np.sin(8*t)/((t + 0.1)**(3/2)), # steep challenger lambda t: ((0.8 + 8*t)*np.cos(8*t) - 1.5*np.sin(8*t))/(0.1 + t)**(5/2))] -# Call both ways, with kwargs (new) and with params list with default options dict (legacy), to ensure both work +# Call both ways, with kwargs (new) and with params list and optional options dict (legacy), to ensure both work diff_methods_and_params = [ (first_order, {}), # empty dictionary for the case of no parameters. no params -> no diff in new vs old (iterated_first_order, {'num_iterations':5}), (iterated_first_order, [5], {'iterate':True}), @@ -42,7 +42,11 @@ def iterated_first_order(*args, **kwargs): return first_order(*args, **kwargs) (gaussiandiff, {'window_size':5}), (gaussiandiff, [5]), (friedrichsdiff, {'window_size':5}), (friedrichsdiff, [5]), (butterdiff, {'filter_order':3, 'cutoff_freq':0.074}), (butterdiff, [3, 0.074]), - (splinediff, {'order':5, 's':2}), (splinediff, [5, 2]) + (splinediff, {'order':5, 's':2}), (splinediff, [5, 2]), + (constant_velocity, {'r':1e-4, 'q':1e-2}), (constant_velocity, [1e-4, 1e-2]), + (constant_acceleration, {'r':1e-4, 'q':1e-1}), (constant_acceleration, [1e-4, 1e-1]), + (constant_jerk, {'r':1e-4, 'q':10}), (constant_jerk, [1e-4, 10]), + # TODO (known_dynamics), but presently it doesn't calculate a derivative ] # All the testing methodology follows the exact same pattern; the only thing that changes is the @@ -121,7 +125,25 @@ def iterated_first_order(*args, **kwargs): return first_order(*args, **kwargs) [(-14, -14), (0, 0), (-1, -1), (0, 0)], [(0, 0), (1, 1), (0, 0), (1, 1)], [(1, 0), (2, 2), (1, 0), (2, 2)], - [(1, 0), (3, 3), (1, 0), (3, 3)]] + [(1, 0), (3, 3), (1, 0), (3, 3)]], + constant_velocity: [[(-25, -25), (-25, -25), (0, -1), (0, 0)], + [(-5, -6), (-4, -4), (0, -1), (0, 0)], + [(-1, -2), (0, 0), (0, 0), (1, 0)], + [(0, -1), (1, 0), (0, -1), (1, 1)], + [(1, 1), (2, 2), (1, 1), (2, 2)], + [(1, 1), (3, 3), (1, 1), (3, 3)]], + constant_acceleration: [[(-25, -25), (-25, -25), (0, 0), (1, 0)], + [(-4, -4), (-2, -3), (0, 0), (1, 0)], + [(-3, -3), (-1, -2), (0, 0), (1, 0)], + [(0, -1), (1, 0), (0, 0), (1, 0)], + [(1, 1), (3, 2), (1, 1), (3, 2)], + [(1, 1), (3, 3), (1, 1), (3, 3)]], + constant_jerk: [[(-25, -25), (-25, -25), (0, 0), (1, 0)], + [(-4, -4), (-2, -3), (0, 0), (1, 0)], + [(-3, -3), (-1, -2), (0, 0), (1, 0)], + [(-1, -2), (1, 0), (0, 0), (1, 0)], + [(1, 0), (2, 2), (1, 0), (2, 2)], + [(1, 1), (3, 3), (1, 1), (3, 3)]] } # Essentially run the cartesian product of [diff methods] x [test functions] through this one test @@ -154,10 +176,10 @@ def test_diff_method(diff_method_and_params, test_func_and_deriv, request): # re # check x_hat and x_hat_noisy are close to x and that dxdt_hat and dxdt_hat_noisy are close to dxdt #print("]\n[", end="") for j,(a,b) in enumerate([(x,x_hat), (dxdt,dxdt_hat), (x,x_hat_noisy), (dxdt,dxdt_hat_noisy)]): - #l2_error = np.linalg.norm(a - b) - #linf_error = np.max(np.abs(a - b)) - #print(f"({l2_error},{linf_error})", end=", ") - #print(f"({int(np.ceil(np.log10(l2_error))) if l2_error > 0 else -25}, {int(np.ceil(np.log10(linf_error))) if linf_error > 0 else -25})", end=", ") + # l2_error = np.linalg.norm(a - b) + # linf_error = np.max(np.abs(a - b)) + # print(f"({l2_error},{linf_error})", end=", ") + # print(f"({int(np.ceil(np.log10(l2_error))) if l2_error > 0 else -25}, {int(np.ceil(np.log10(linf_error))) if linf_error > 0 else -25})", end=", ") log_l2_bound, log_linf_bound = error_bounds[diff_method][i][j] assert np.linalg.norm(a - b) < 10**log_l2_bound diff --git a/pynumdiff/tests/test_kalman_smooth.py b/pynumdiff/tests/test_kalman_smooth.py deleted file mode 100644 index df8756c..0000000 --- a/pynumdiff/tests/test_kalman_smooth.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -Unit tests for kalman smoothing methods -""" -# pylint: skip-file - -import numpy as np -from pynumdiff.kalman_smooth import constant_velocity, constant_acceleration, \ - constant_jerk, known_dynamics - - -x = np.array([1., 4., 9., 3., 20., - 8., 16., 2., 15., 10., - 15., 3., 5., 7., 4.]) -dt = 0.01 - - -def test_constant_velocity(): - params = [1e-4, 1e-5] - x_hat, dxdt_hat = constant_velocity(x, dt, params) - x_smooth = np.array([7.952849, 7.714494, 7.769948, 7.81768, 8.330625, 8.332996, - 8.46594, 8.243244, 8.458473, 8.367324, 8.284892, 7.947729, - 7.998362, 8.123646, 8.303191]) - dxdt = np.array([88.750804, 93.567378, 102.828004, 62.994815, 92.01605, - 60.395089, 47.494064, 27.626483, 21.537133, 14.105156, - 8.138253, 7.996629, 4.016616, -0.1122, 2.319358]) - np.testing.assert_almost_equal(x_smooth, x_hat, decimal=3) - np.testing.assert_almost_equal(dxdt, dxdt_hat, decimal=3) - -def test_constant_acceleration(): - params = [1e-4, 1e-2] - x_hat, dxdt_hat = constant_acceleration(x, dt, params) - x_smooth = np.array([5.069524, 6.137091, 7.191819, 8.062104, 9.112695, 9.616349, - 10.029422, 9.945811, 10.05048, 9.703503, 9.180588, 8.309991, - 7.546839, 6.581819, 5.421122]) - dxdt = np.array([170.225553, 164.483647, 156.524187, 103.452558, 113.776639, - 64.258467, 33.813842, -1.889904, -25.372839, -48.272303, - -69.60202, -81.885049, -101.379641, -122.551681, -140.214842]) - np.testing.assert_almost_equal(x_smooth, x_hat, decimal=3) - np.testing.assert_almost_equal(dxdt, dxdt_hat, decimal=3) - -def test_constant_jerk(): - params = [1e-4, 1e-4] - x_hat, dxdt_hat = constant_jerk(x, dt, params) - x_smooth = np.array([5.066536, 6.135826, 7.191131, 8.061294, 9.110784, 9.613802, - 10.026445, 9.943029, 10.047933, 9.701807, 9.179971, 8.310492, - 7.547672, 6.582594, 5.421728]) - dxdt = np.array([170.262874, 164.484367, 156.478206, 103.371112, 113.682324, - 64.169044, 33.742701, -1.935552, -25.398252, -48.273806, - -69.59001, -81.873115, -101.384521, -122.579907, -140.269899]) - np.testing.assert_almost_equal(x_smooth, x_hat, decimal=3) - np.testing.assert_almost_equal(dxdt, dxdt_hat, decimal=3) - -def test_known_dynamics(): - return#TODO. We shouldn't have empty tests hanging around