#!/usr/bin/env python # coding: utf-8 # # Hidden Markov Model with Poisson emissions and Time-Varying Transitions # # ## Summary # # The following exposition uses [`pymc3-hmm`](https://github.com/AmpersandTV/pymc3-hmm) to simulate and estimate a hidden Markov model (HMM) with a time-varying transition matrix and emissions consisting of Poisson random variables. # In[1]: import numpy as np import pandas as pd import patsy import theano import theano.tensor as tt from theano import shared import pymc3 as pm from pymc3_hmm.utils import multilogit_inv from pymc3_hmm.distributions import SwitchingProcess, DiscreteMarkovChain from pymc3_hmm.step_methods import FFBSStep # ## Introduction # # Our observation model can be described as follows: # # \begin{align} # \label{eq:pois-zero-model} # \left( Y_t \mid S_t = s \right) &\sim \operatorname{Pois}\left( \mu_s \right),\quad # \\ # S_t &\sim \operatorname{Bern}\left( \pi_t \right) # \;, # \end{align} # where $y_t \sim Y_t$ are the observed values sampled from the observation distribution, $Y_t$, spanning $t \in \left\{0, \dots, T \right\}$. # # The "hidden" state sequence, $\{S_t\}$, is driven by the following Markov relationship: # # \begin{equation*} # \operatorname{P}\left(S_t \mid S_{t-1}\right) \triangleq \Gamma_{t,t-1} \in \mathbb{R}^{2 \times 2}_{[0, 1]} # \end{equation*} # # The marginal state probability, $\pi_t$, is then given by # # \begin{equation*} # \begin{aligned} # \operatorname{P}\left( S_t \right) # &= \int_{S_{t-1}} \operatorname{P}\left(S_t \mid S_{t-1}\right) # \operatorname{dP}\left(S_{t-1}\right) # \\ # &= # \begin{pmatrix} # \Gamma^{(0, 0)}_{t,t-1} & \Gamma^{(0, 1)}_{t,t-1} # \\ # \Gamma^{(1, 0)}_{t,t-1} & \Gamma^{(1, 1)}_{t,t-1} # \end{pmatrix}^\top # \begin{pmatrix} # \pi_{t-1} # \\ # 1 - \pi_{t-1} # \end{pmatrix} # \\ # &= # \begin{pmatrix} # \pi_{t} # \\ # 1 - \pi_{t} # \end{pmatrix} # \;. # \end{aligned} # \end{equation*} # # In this example, the rows of our transition matrix, $\Gamma^{(r)}_{t, t-1}$ for $r \in \{0, 1\}$, are driven by a logistic regression: # # \begin{gather*} # \Gamma^{(r)}_{t, t-1} = \operatorname{logit^{-1}}\left( X_t \xi_r \right) # \end{gather*} # # where $X_t \in \mathbb{R}^{T \times N}$ is a covariate matrix and $\xi_r \in \mathbb{R}^N$ is the regression parameter vector for row $r$. # # In the remainder of this exposition, we will assume normal priors for each $\xi_r$, the conjugate Gamma prior for $\mu$, and a Dirichlet prior for $\pi_0$. # # ## Simulation # # For these simulations, we will generate a time series and make the $\xi_r$ regression consist of fixed-effects for a seasonal component based on weekdays. In other words, the transition probabilities will vary based on the day of week. # In[2]: def create_poisson_zero_hmm_tv(X_tt, mu_1, mu_2, xis, pi_0, observed=None): z_tt = tt.tensordot(X_tt, xis, axes=((1,), (0,))) P_tt = multilogit_inv(z_tt) P_rv = pm.Deterministic("P_t", P_tt) S_rv = DiscreteMarkovChain("S_t", P_rv, pi_0, shape=np.shape(observed)[-1]) Y_rv = SwitchingProcess( "Y_t", [pm.Poisson.dist(mu_1), pm.Poisson.dist(mu_1 + mu_2)], S_rv, observed=observed, ) return Y_rv # In[3]: np.random.seed(2032) start_date = pd.Timestamp("2020-01-01 00:00:00") time_index = pd.date_range( start=start_date, end=start_date + pd.Timedelta("30W"), closed="left", freq="1h" ) X_ind_df = pd.DataFrame( { "weekday": time_index.weekday, }, index=time_index, ) formula_str = "~ 1 + C(weekday)" X_df = patsy.dmatrix(formula_str, X_ind_df, return_type="dataframe") xi_0_true = pd.Series( # The coefficients used to compute the state zero-to-zero transition probabilities np.array([2.0, -5.0, -3.0, 0.0, 0.0, -5.0, 5.0]), index=X_df.columns, ) xi_1_true = pd.Series( # The coefficients for the state one-to-zero transition probabilities np.array([2.0, -1.0, 3.0, 0.0, 0.0, -5.0, 5.0]), index=X_df.columns, ) xis_true = tt.as_tensor(np.stack([xi_0_true, xi_1_true], axis=1)[..., None], name="xis") mu_1_true = 50 mu_2_true = 20 p_0_true = tt.as_tensor(np.r_[0.0, 1.0]) X_tt = theano.shared(X_df.values, borrow=True) with pm.Model(theano_config={"compute_test_value": "ignore"}) as sim_model: _ = create_poisson_zero_hmm_tv( X_tt, mu_1_true, mu_2_true, xis_true, p_0_true, np.zeros(X_df.shape[0]) ) sim_point = pm.sample_prior_predictive(samples=1, model=sim_model) sim_point["Y_t"] = sim_point["Y_t"].squeeze() y_t = sim_point["Y_t"] # ## Estimation # # We will use the "true" data-generating observation model to estimate the parameters $\mu$ and $\Gamma_{t, t-1}$ (the latter as rows denoted by `p_0` and `p_1`). For demonstration purposes, we choose hyper-parameters for the $\mu_s$ priors that are somewhat far from the true $\mu_s$ values. # # The sampling steps for $S_t$ are performed using forward-filtering backward-sampling (FFBS), while sampling for $\mu_s$, $\pi_0$, and the $\xi_r$ use NUTS. # # In[4]: theano.config.allow_gc = False with pm.Model(theano_config={"compute_test_value": "ignore"}) as test_model: E_mu, Var_mu = 10.0, 10000.0 mu_1_rv = pm.Gamma("mu_1", E_mu ** 2 / Var_mu, E_mu / Var_mu) mu_2_rv = pm.Gamma("mu_2", 2 * E_mu ** 2 / Var_mu, E_mu / Var_mu) p_0_rv = pm.Dirichlet("p_0", np.r_[1, 1]) xis_rv = pm.Normal("xis", 0, 10, shape=(X_df.shape[1], 2, 1)) _ = create_poisson_zero_hmm_tv(X_tt, mu_1_rv, mu_2_rv, xis_rv, p_0_rv, y_t) with test_model: states_step = FFBSStep([test_model.S_t]) mu_step = pm.NUTS( [test_model.mu_1, test_model.mu_2, test_model.xis, test_model.p_0] ) posterior_trace = pm.sample( step=[states_step, mu_step], return_inferencedata=True, chains=1, progressbar=True, ) # ## Posterior Samples # In[5]: import arviz as az import matplotlib.pyplot as plt ax = az.plot_trace( posterior_trace.posterior, var_names=[ "mu_1", "mu_2", "xis", ], lines=[ ("mu_1", {}, [mu_1_true]), ("mu_2", {}, [mu_2_true]), ("xis", {}, [xis_true.data]), ], compact=True, figsize=(15, 15), ) # ## Posterior Predictive Samples # In[6]: with test_model: posterior_pred_trace = pm.sample_posterior_predictive( posterior_trace.posterior, var_names=["S_t", "Y_t"] ) # In[7]: from matplotlib.ticker import MaxNLocator from pymc3_hmm.utils import plot_split_timeseries_histograms columns = pd.MultiIndex.from_tuples( [("Y_t", i) for i in range(posterior_pred_trace["Y_t"].shape[0])], names=("var", "sample"), ) plot_data = pd.DataFrame( posterior_pred_trace["Y_t"].T, index=time_index, columns=columns, ) plot_data["E[S_t]"] = posterior_pred_trace["S_t"].mean(0) plot_data["E[Y_t]"] = posterior_pred_trace["Y_t"].mean(0) plot_data["y_t"] = y_t # In[8]: def plot_fn(ax, data, **kwargs): ax.plot( data["y_t"], label="y_t", alpha=0.7, color="red", linewidth=0.8, drawstyle="steps", ) ax.plot( data["E[Y_t]"], label="E[Y_t]", alpha=0.3, color="blue", linewidth=0.8, drawstyle="steps", ) axes_split_data = plot_split_timeseries_histograms( plot_data, sample_col="Y_t", plot_fn=plot_fn, split_max=10, figsize=(15, 30), twin_column_name="E[S_t]", twin_plot_kwargs={ "color": "green", "drawstyle": "steps", "linestyle": "--", "alpha": 0.4, }, ) for (_, twin_ax), _ in axes_split_data: _ = twin_ax.set_ylabel( "E[S_t]", color=twin_ax.get_lines()[0].get_color() ) _ = twin_ax.yaxis.set_major_locator(MaxNLocator(integer=True)) plt.tight_layout() # In[9]: columns = pd.MultiIndex.from_tuples( [("R_t", i) for i in range(posterior_pred_trace["Y_t"].shape[0])], names=("var", "sample"), ) residuals = posterior_pred_trace["Y_t"].T - y_t.reshape(-1, 1) plot_data = pd.DataFrame( residuals, index=time_index, columns=columns, ) plot_data["E[S_t]"] = posterior_pred_trace["S_t"].mean(0) plot_data["E[R_t]"] = residuals.mean(1) def plot_fn(ax, data, **kwargs): ax.plot( data["E[R_t]"], label="E[R_t]", alpha=0.7, color="orange", linewidth=0.8, drawstyle="steps", ) axes_split_data = plot_split_timeseries_histograms( plot_data, sample_col="R_t", plot_fn=plot_fn, split_max=10, figsize=(15, 30), twin_column_name="E[S_t]", twin_plot_kwargs={ "color": "green", "drawstyle": "steps", "linestyle": "--", "alpha": 0.4, }, ) for (_, twin_ax), _ in axes_split_data: _ = twin_ax.set_ylabel( "E[S_t]", color=twin_ax.get_lines()[0].get_color() ) _ = twin_ax.yaxis.set_major_locator(MaxNLocator(integer=True)) plt.tight_layout() # ## Out-of-sample Posterior Predictives # In[10]: start_date_oos = time_index[-1] time_index_oos = pd.date_range( start=start_date_oos, end=start_date_oos + pd.Timedelta("5W"), closed="left", freq="1h", ) X_oos_ind_df = pd.DataFrame( { "weekday": time_index_oos.weekday, }, index=time_index_oos, ) X_oos_df = patsy.dmatrix(formula_str, X_oos_ind_df, return_type="dataframe") X_tt.set_value(X_oos_df.values) with test_model, theano.config.change_flags(compute_test_value="off"): test_model.S_t.distribution.shape = (X_oos_df.shape[0],) out_of_sample_pp = pm.sample_posterior_predictive( posterior_trace.posterior.drop_vars(["S_t", "P_t"]), var_names=["S_t", "P_t", "Y_t"], model=test_model, ) # In[11]: columns = pd.MultiIndex.from_tuples( [("Y_t", i) for i in range(out_of_sample_pp["Y_t"].shape[0])], names=("var", "sample"), ) oos_plot_data = pd.DataFrame( out_of_sample_pp["Y_t"].T, index=pd.date_range( start="1/1/2021", periods=out_of_sample_pp["Y_t"].shape[1], freq="H" ), columns=columns, ) oos_plot_data["E[S_t]"] = out_of_sample_pp["S_t"].mean(0) oos_plot_data["E[Y_t]"] = out_of_sample_pp["Y_t"].mean(0) def plot_fn(ax, data, **kwargs): ax.plot( data["E[Y_t]"], label="E[Y_t]", alpha=0.7, color="orange", linewidth=0.8, drawstyle="steps", ) axes_split_data = plot_split_timeseries_histograms( oos_plot_data, sample_col="Y_t", plot_fn=plot_fn, split_max=10, figsize=(15, 30), twin_column_name="E[S_t]", twin_plot_kwargs={ "color": "red", "drawstyle": "steps", "linestyle": "--", "alpha": 0.9, }, ) for (_, twin_ax), _ in axes_split_data: _ = twin_ax.set_ylabel( "E[S_t]", color=twin_ax.get_lines()[0].get_color() ) _ = twin_ax.yaxis.set_major_locator(MaxNLocator(integer=True)) plt.tight_layout()