#!/usr/bin/env python # coding: utf-8 # # Non-linear Gaussian filtering and smoothing # # Provided are two examples of nonlinear state-space models on which one can perform Bayesian filtering and smoothing in order to obtain # a posterior distribution over a latent state trajectory based on noisy observations. # In order to understand the theory behind these methods in detail we refer to [1] and [2]. # # We provide examples for two different types of state-space model: # 1. [Nonlinear, Discrete State-Space Model](#1.-Non-Linear-Discrete-State-Space-Model:-Pendulum): Pendulum # 2. [Nonlinear, Continuous-Discrete State-Space Model](#2.-Non-Linear-Continuous-Discrete-State-Space-Model:-Benes-Daum-Filter): Benes-Daum Filter # # In order to perform Bayesian Filtering and Smoothing on non-linear models, we show how to use functionalities provided by ProbNum to # either linearise the respective model (Extended Kalman Filter, EKF) or to use an Unscented Transform (Unscented Kalman Filter, UKF) which does not require a Jacobian of the nonlinear model. # # **References**: # > [1] Särkkä, Simo, and Solin, Arno. Applied Stochastic Differential Equations. Cambridge University Press, 2019. # > # > [2] Särkkä, Simo. Bayesian Filtering and Smoothing. Cambridge University Press, 2013. # In[1]: import numpy as np import probnum as pn from probnum import filtsmooth, randvars, statespace from probnum.problems import RegressionProblem # In[2]: np.random.seed(12345) # In[3]: # Make inline plots vector graphics instead of raster graphics get_ipython().run_line_magic('matplotlib', 'inline') from IPython.display import set_matplotlib_formats set_matplotlib_formats('pdf', 'svg') # Plotting import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec plt.style.use('../../probnum.mplstyle') # ## 1. **Non-Linear Discrete** State-Space Model: Pendulum # --- # For nonlinear models, we assume that at least one of the components of our state-space model (i.e. either the dynamics, the measurement model, or both) is a nonlinear function of the latent state. # # Consider nonlinear functions $f: \mathbb{R}^d \rightarrow \mathbb{R}^d$ and $h: \mathbb{R}^d \rightarrow \mathbb{R}^m$ where $d$ is the dimension of the latent state and # $m$ is the dimension of the measurements. # # For $k = 1, \dots, K$ and $x_0 \sim \mathcal{N}(\mu_0, \Sigma_0)$: # # $$ # \begin{align} # \boldsymbol{x}_k &\sim \mathcal{N}(f(\boldsymbol{x}_{k-1}), \boldsymbol{Q}) \\ # \boldsymbol{y}_k &\sim \mathcal{N}(h(\boldsymbol{x}_k), \boldsymbol{R}) # \end{align} # $$ # # This defines a dynamics model that assumes a state $x_k$ in a **discrete** sequence of states arising from a nonlinear map of the previous state $x_{k-1}$ corrupted with additive Gaussian noise under a **process noise** covariance matrix $Q$. # Similarly, the measurements $y_k$ are assumed to be nonlinear transformations of the latent state under additive Gaussian noise according to a **measurement noise** covariance $R$. # In the described case, we cannot use the Kalman Filtering equations since these assume linear relationships. One method is to linearise the nonlinear parts of the model by a first-order Taylor Expansion (Extended Kalman Filter, EKF) or employing the Unscented Kalman Filter (UKF) which is based on the unscented transform and does not require the computation of a Jacobian. # The `probnum` packages provides both methods by wrapping linearizing "Components" around nonlinear dynamcis and/or measurement models, as we will see below. # # Note that this can be generalized to a nonlinear time-varying state-space model, as well. Then we would have $f: \mathbb{T} \times \mathbb{R}^d \rightarrow \mathbb{R}^d$ and $h: \mathbb{T} \times \mathbb{R}^d \rightarrow \mathbb{R}^m$ where $\mathbb{T}$ is the "time dimension". # ### Define State-Space Model # #### I. Discrete Dynamics Model: Nonlinear Gaussian Transitions # In[4]: state_dim = 2 observation_dim = 1 # In[5]: # approx. gravitational constant g = 9.81 # dt delta_t = 0.0075 def pendulum_rhs(state): """Right-hand side of an ODE that defines pendulum dynamics""" x1, x2 = state y1 = x1 + x2 * delta_t y2 = x2 - g * np.sin(x1) * delta_t return np.array([y1, y2]) def pendulum_jacobian(state): """Jacobian of the pendulum ODE""" x1, x2 = state dy1_dx = [1.0, delta_t] dy2_dx = [-g * np.cos(x1) * delta_t, 1.0] return np.array([dy1_dx, dy2_dx]) dynamics_transition_function = lambda t, state: pendulum_rhs(state) dynamics_transition_jacobian_function = lambda t, state: pendulum_jacobian(state) dynamics_diffusion_matrix = 1.0 * ( np.diag(np.array([delta_t ** 3 / 3, delta_t])) + np.diag(np.array([delta_t ** 2 / 2]), 1) + np.diag(np.array([delta_t ** 2 / 2]), -1) ) # We here define the non-linear dynamics of a pendulum. Note (in the cell directly above) that `probnum` expects `dynamicsfun` to be a function of time and state. Even though we here consider a nonlinear but time-**in**variant model, we still have to fulfill the contract that the interface requires. # ```python # dynamics_transition_function = lambda t, state: pendulum_rhs(state) # ``` # In other words, we simply ignore the time variable `t` in case of time-invariant models. # To create a discrete, (non-linear) Gaussian dynamics model, `probnum` provides the `DiscreteGaussian` class that takes # - `input_dim` and `output_dim` # - `state_trans_fun` : the function that returns the next state given the current one # - `proc_noise_cov_mat_fun` : the function that returns the covariance matrix of the Gaussian process noise # - `jacob_state_trans_fun` : the function that returns the Jacobian matrix of the state transition function # In[6]: # Create discrete, non-linear Gaussian dynamics model dynamics_model = statespace.DiscreteGaussian( input_dim=state_dim, output_dim=state_dim, state_trans_fun=dynamics_transition_function, proc_noise_cov_mat_fun=lambda t: dynamics_diffusion_matrix, jacob_state_trans_fun=dynamics_transition_jacobian_function, ) # #### II. Discrete Measurement Model: Nonlinear Gaussian Measurements # In[7]: def pendulum_measurement(state): x1, x2 = state return np.array([np.sin(x1)]) def pendulum_measurement_jacobian(state): x1, x2 = state return np.array([[np.cos(x1), 0.0]]) measurement_function = lambda t, state: pendulum_measurement(state) measurement_jacobian_function = lambda t, state: pendulum_measurement_jacobian(state) measurement_variance = 0.32 ** 2 measurement_covariance = measurement_variance * np.eye(observation_dim) # In[8]: # Create discrete, non-linear Gaussian measurement model measurement_model = statespace.DiscreteGaussian( input_dim=state_dim, output_dim=observation_dim, state_trans_fun=measurement_function, proc_noise_cov_mat_fun=lambda t: measurement_covariance, jacob_state_trans_fun=measurement_jacobian_function, ) # #### III. Initial State Random Variable # In[9]: mu_0 = np.ones(state_dim) sigma_0 = measurement_variance * np.eye(state_dim) initial_state_rv = randvars.Normal(mean=mu_0, cov=sigma_0) # ### Generate Data for the State-Space Model # `statespace.generate_samples()` is used to sample both latent states and noisy observations from the specified state space model. # In[10]: time_grid = np.arange(0., 5., step=delta_t) # In[11]: latent_states, observations = statespace.generate_samples( dynmod=dynamics_model, measmod=measurement_model, initrv=initial_state_rv, times=time_grid ) # In[12]: regression_problem = RegressionProblem( observations=observations, locations=time_grid, ) # ### Kalman Filtering # #### I. Linearize model (Extended Kalman Filter) # Since we could easily derive the Jacobian of our nonlinear model components above, we can use the Extended Kalman Filter (EKF) to obtain a filtering posterior over the states. Therefore, in `probnum` we just wrap the dynamics model and the measurement model in a `DiscreteEKFComponent` object. # In cases in which the Jacobian is not as easy to write down, `probnum` also provides the `DiscreteUKFComponent` class that implements the linearization by means of an # _unscented Kalman filter_ that does not require a Jacobian. # Alternatively, the Jacobian can be calculated via automatic differentiation. # In[13]: linearised_dynamics_model = filtsmooth.DiscreteEKFComponent(dynamics_model) linearised_measurement_model = filtsmooth.DiscreteEKFComponent(measurement_model) # #### II. Kalman Filter # Then, we simply pass these linearized models to the Kalman Filter (exactly the same interface as in the linear case) and proceed. # In[14]: kalman_filter = filtsmooth.Kalman( dynamics_model=linearised_dynamics_model, measurement_model=linearised_measurement_model, initrv=initial_state_rv ) # #### III. Perform Kalman Filtering + Rauch-Tung-Striebel Smoothing # In[15]: state_posterior, _ = kalman_filter.filtsmooth(regression_problem) # The method `filtsmooth` returns a `KalmanPosterior` object which provides convenience functions for e.g. sampling and interpolation. # We can also extract the just computed posterior smoothing state variables by querying the `.state_rvs` property. # This yields a list of Gaussian Random Variables from which we can extract the statistics in order to visualize them. # In[16]: posterior_state_rvs = state_posterior.states # List of Normal Random Variables posterior_state_means = posterior_state_rvs.mean # Shape: (num_time_points, state_dim) posterior_state_covs = posterior_state_rvs.cov # Shape: (num_time_points, state_dim, state_dim) # ### Visualize Results # In[17]: state_fig = plt.figure() state_fig_gs = gridspec.GridSpec(ncols=2, nrows=1, figure=state_fig) ax_00 = state_fig.add_subplot(state_fig_gs[0, 0]) ax_01 = state_fig.add_subplot(state_fig_gs[0, 1]) # Plot means mu_x_1, mu_x_2 = [posterior_state_means[:, i] for i in range(state_dim)] ax_00.plot(time_grid, mu_x_1, label="posterior mean") ax_01.plot(time_grid, mu_x_2) # Plot marginal standard deviations std_x_1, std_x_2 = [np.sqrt(posterior_state_covs[:, i, i]) for i in range(state_dim)] ax_00.fill_between( time_grid, mu_x_1 - 1.96 * std_x_1, mu_x_1 + 1.96 * std_x_1, alpha=0.2, label="1.96 marginal stddev", ) ax_01.fill_between( time_grid, mu_x_2 - 1.96 * std_x_2, mu_x_2 + 1.96 * std_x_2, alpha=0.2 ) # Plot 5 samples on a subsampled grid samples = state_posterior.sample(t=state_posterior.locations[::5], size=5) for smp in samples: ax_00.plot(state_posterior.locations[::5], smp[:, 0], color="gray", alpha=0.75, linewidth=1, linestyle="dashed", label="sample") ax_01.plot(state_posterior.locations[::5], smp[:, 1], color="gray",alpha=0.75, linewidth=1, linestyle="dashed", label="sample") # Plot groundtruth ax_00.scatter(time_grid, observations, marker=".", label="measurements") # Add labels etc. ax_00.set_xlabel("t") ax_01.set_xlabel("t") ax_00.set_title(r"$x_1$") ax_01.set_title(r"$x_2$") # The following two lines are just to remove duplicate labels (caused by samples) from the legend handles, labels = ax_00.get_legend_handles_labels() by_label = dict(zip(labels, handles)) state_fig.legend( by_label.values(), by_label.keys(), loc="center left", bbox_to_anchor=(1, 0.5) ) state_fig.tight_layout() # ## 2. **Non-Linear Continuous-Discrete** State-Space Model: Benes-Daum Filter # --- # Now, consider we assume **continuous** dynamics. We assume that there is a continuous process that defines the nonlinear dynamics of our latent space from which we collect discrete nonlinear-Gaussian measurements (as above). Only the dynamics model changes. In particular, we formulate the dynamics as a stochastic process in terms of a Nonlinear Time-Invariant Stochastic Differential Equation. We refer to [1] for more details. # Again, consider non-linear functions $f: \mathbb{R}^d \rightarrow \mathbb{R}^d$ and $h: \mathbb{R}^d \rightarrow \mathbb{R}^m$ where $d$ is the dimension of the latent state and # $m$ is the dimension of the measurements. # We define the following nonlinear **continuous-discrete** state-space model: # # Let $x(t_0) \sim \mathcal{N}(\mu_0, \Sigma_0)$. # # $$ # \begin{align} # d\boldsymbol{x} &= f(\boldsymbol{x}) \, dt + \boldsymbol{L} \, d \boldsymbol{\omega} \\ # \boldsymbol{y}_k &\sim \mathcal{N}(h(\boldsymbol{x}(t_k)), \boldsymbol{R}), \qquad k = 1, \dots, K # \end{align} # $$ # # where $\boldsymbol{\omega}$ denotes a vector of driving forces (often Brownian Motion). # # # Note that this can be generalized to a nonlinear time-varying state-space model, as well. Then we would have $f: \mathbb{T} \times \mathbb{R}^d \rightarrow \mathbb{R}^d$ and $h: \mathbb{T} \times \mathbb{R}^d \rightarrow \mathbb{R}^m$ where $\mathbb{T}$ is the "time dimension". # ### Define State-Space Model # #### I. Continuous Dynamics Model: Nonlinear Stochastic Differential Equation # In[18]: state_dim = 1 observation_dim = 1 # In[19]: drift_function = lambda t, state: np.tanh(state) drift_function_jacobian = lambda t, state: 1.0 - np.tanh(state) ** 2 dispersion_matrix_function = lambda t: np.ones(state_dim).reshape(1, 1) # For continous, non-linear dynamics, `probnum` provides a general stochastic differential equation (SDE) interface that takes # - `driftfun`: a (possibly non-linear) drift function, # - `dispmatfun`: a (possibly non-linear) dispersion function, # - `jacobfun`: the Jacobian of the drift function. # In[20]: # Create continuous, non-linear SDE dynamics model dynamics_model = statespace.SDE( dimension=state_dim, driftfun=drift_function, dispmatfun=dispersion_matrix_function, jacobfun=drift_function_jacobian, ) # #### II. Discrete Measurement Model: Linear, Time-Invariant, Gaussian Measurements # In[21]: measurement_marginal_variance = 1.0 measurement_matrix = np.eye(observation_dim, state_dim) measurement_noise_matrix = measurement_marginal_variance * np.eye(observation_dim) # In[22]: measurement_model = statespace.DiscreteLTIGaussian( state_trans_mat=measurement_matrix, shift_vec=np.zeros(observation_dim), proc_noise_cov_mat=measurement_noise_matrix, ) # #### III. Initial State Random Variable # In[23]: mu_0 = np.zeros(state_dim) sigma_0 = 3.0 * np.eye(state_dim) initial_state_rv = randvars.Normal(mean=mu_0, cov=sigma_0) # ### Generate Data for the State-Space Model # `statespace.generate_samples()` is used to sample both latent states and noisy observations from the specified state space model. # In[24]: time_grid = np.arange(0.0, 10.0, step=10*delta_t) # In[25]: linearised_dynamics_model = filtsmooth.ContinuousEKFComponent(dynamics_model) # In[26]: latent_states, observations = statespace.generate_samples( dynmod=linearised_dynamics_model, measmod=measurement_model, initrv=initial_state_rv, times=time_grid, ) # In[27]: regression_problem = RegressionProblem( observations=observations, locations=time_grid, ) # ### Kalman Filtering # #### I. Linearize model (Extended Kalman Filter) # #### II. Kalman Filter # In[28]: kalman_filter = filtsmooth.Kalman( dynamics_model=linearised_dynamics_model, measurement_model=measurement_model, initrv=initial_state_rv, ) # #### III. Perform Kalman Filtering + Rauch-Tung-Striebel Smoothing # In[29]: state_posterior, _ = kalman_filter.filter(regression_problem) # In[30]: # smoothing smoothed_state_posterior = kalman_filter.smooth( filter_posterior=state_posterior ) # The method `filtsmooth` returns a `KalmanPosterior` object which provides convenience functions for e.g. sampling and prediction. # We can also extract the just computed posterior smoothing state variables by querying the `.state_rvs` property. # This yields a list of Gaussian Random Variables from which we can extract the statistics in order to visualize them. # In[31]: grid = np.linspace(0, 11, 500) posterior_state_rvs = state_posterior(grid) # List of Normal Random Variables posterior_state_means = posterior_state_rvs.mean.squeeze() # Shape: (num_time_points, state_dim) posterior_state_covs = posterior_state_rvs.cov # Shape: (num_time_points, state_dim, state_dim) # In[32]: smoothed_posterior_state_rvs = smoothed_state_posterior.states # List of Normal Random Variables smoothed_posterior_state_means = smoothed_posterior_state_rvs.mean.squeeze() # Shape: (num_time_points, state_dim) smoothed_posterior_state_covs = smoothed_posterior_state_rvs.cov # Shape: (num_time_points, state_dim, state_dim) # ### Visualize Results # In[33]: state_fig = plt.figure() ax = state_fig.add_subplot() # Plot means ax.plot(grid, posterior_state_means, label="posterior mean") # Plot marginal standard deviations std_x = np.sqrt(np.abs(posterior_state_covs)).squeeze() ax.fill_between( grid, posterior_state_means - 1.96 * std_x, posterior_state_means + 1.96 * std_x, alpha=0.2, label="1.96 marginal stddev for filter", ) # Plot smoothed means ax.plot(time_grid, smoothed_posterior_state_means, label="smoothed posterior mean") # Plot marginal standard deviations std_x = np.sqrt(smoothed_posterior_state_covs).squeeze() ax.fill_between( time_grid, smoothed_posterior_state_means - 1.96 * std_x, smoothed_posterior_state_means + 1.96 * std_x, alpha=0.2, label="1.96 marginal stddev for smoother", ) ax.scatter(time_grid, observations, marker=".", label="measurements") # Add labels etc. ax.set_xlabel("t") ax.set_title(r"$x$") ax.legend() state_fig.tight_layout() # To get a closer look at filter and smoother, we inspect the interval $[0, 1]$ in greater detail. # In[34]: idx = time_grid < 1.0 ax.set_xlim([0, 1]) ax.set_ylim(np.min(observations[idx]), np.max(observations[idx])) state_fig.tight_layout() state_fig # In[ ]: