import matplotlib.pyplot as plt plt.rcParams["figure.figsize"] = (11, 5) #set default figure size import numpy as np from numba import vectorize, njit, prange from math import gamma import pandas as pd import seaborn as sns colors = sns.color_palette() @njit def set_seed(): np.random.seed(142857) set_seed() # Parameters in the two beta distributions. F_a, F_b = 1, 1 G_a, G_b = 3, 1.2 @vectorize def p(x, a, b): r = gamma(a + b) / (gamma(a) * gamma(b)) return r * x** (a-1) * (1 - x) ** (b-1) # The two density functions. f = njit(lambda x: p(x, F_a, F_b)) g = njit(lambda x: p(x, G_a, G_b)) @njit def simulate(a, b, T=50, N=500): ''' Generate N sets of T observations of the likelihood ratio, return as N x T matrix. ''' l_arr = np.empty((N, T)) for i in range(N): for j in range(T): w = np.random.beta(a, b) l_arr[i, j] = f(w) / g(w) return l_arr l_arr_g = simulate(G_a, G_b, N=50000) l_seq_g = np.cumprod(l_arr_g, axis=1) l_arr_f = simulate(F_a, F_b, N=50000) l_seq_f = np.cumprod(l_arr_f, axis=1) @njit def update(π, l): "Update π using likelihood l" # Update belief π = π * l / (π * l + 1 - π) return π π1, π2 = 0.2, 0.8 T = l_arr_f.shape[1] π_seq_f = np.empty((2, T+1)) π_seq_f[:, 0] = π1, π2 for t in range(T): for i in range(2): π_seq_f[i, t+1] = update(π_seq_f[i, t], l_arr_f[0, t]) fig, ax1 = plt.subplots() for i in range(2): ax1.plot(range(T+1), π_seq_f[i, :], label=f"$\pi_0$={π_seq_f[i, 0]}") ax1.set_ylabel("$\pi_t$") ax1.set_xlabel("t") ax1.legend() ax1.set_title("when f governs data") ax2 = ax1.twinx() ax2.plot(range(1, T+1), np.log(l_seq_f[0, :]), '--', color='b') ax2.set_ylabel("$log(L(w^{t}))$") plt.show() T = l_arr_g.shape[1] π_seq_g = np.empty((2, T+1)) π_seq_g[:, 0] = π1, π2 for t in range(T): for i in range(2): π_seq_g[i, t+1] = update(π_seq_g[i, t], l_arr_g[0, t]) fig, ax1 = plt.subplots() for i in range(2): ax1.plot(range(T+1), π_seq_g[i, :], label=f"$\pi_0$={π_seq_g[i, 0]}") ax1.set_ylabel("$\pi_t$") ax1.set_xlabel("t") ax1.legend() ax1.set_title("when g governs data") ax2 = ax1.twinx() ax2.plot(range(1, T+1), np.log(l_seq_g[0, :]), '--', color='b') ax2.set_ylabel("$log(L(w^{t}))$") plt.show() π_seq = np.empty((2, T+1)) π_seq[:, 0] = π1, π2 for i in range(2): πL = π_seq[i, 0] * l_seq_f[0, :] π_seq[i, 1:] = πL / (πL + 1 - π_seq[i, 0]) np.abs(π_seq - π_seq_f).max() < 1e-10 @njit def martingale_simulate(π0, N=5000, T=200): π_path = np.empty((N,T+1)) w_path = np.empty((N,T)) π_path[:,0] = π0 for n in range(N): π = π0 for t in range(T): # draw w if np.random.rand() <= π: w = np.random.beta(F_a, F_b) else: w = np.random.beta(G_a, G_b) π = π*f(w)/g(w)/(π*f(w)/g(w) + 1 - π) π_path[n,t+1] = π w_path[n,t] = w return π_path, w_path def fraction_0_1(π0, N, T, decimals): π_path, w_path = martingale_simulate(π0, N=N, T=T) values, counts = np.unique(np.round(π_path[:,-1], decimals=decimals), return_counts=True) return values, counts def create_table(π0s, N=10000, T=500, decimals=2): outcomes = [] for π0 in π0s: values, counts = fraction_0_1(π0, N=N, T=T, decimals=decimals) freq = counts/N outcomes.append(dict(zip(values, freq))) table = pd.DataFrame(outcomes).sort_index(axis=1).fillna(0) table.index = π0s return table # simulate T = 200 π0 = .5 π_path, w_path = martingale_simulate(π0=π0, T=T, N=10000) fig, ax = plt.subplots() for i in range(100): ax.plot(range(T+1), π_path[i, :]) ax.set_xlabel('$t$') ax.set_ylabel('$\pi_t$') plt.show() fig, ax = plt.subplots() for t in [1, 10, T-1]: ax.hist(π_path[:,t], bins=20, alpha=0.4, label=f'T={t}') ax.set_ylabel('count') ax.set_xlabel('$\pi_T$') ax.legend(loc='lower right') plt.show() # simulate T = 200 π0 = .3 π_path3, w_path3 = martingale_simulate(π0=π0, T=T, N=10000) fig, ax = plt.subplots() for t in [1, 10, T-1]: ax.hist(π_path3[:,t], bins=20, alpha=0.4, label=f'T={t}') ax.set_ylabel('count') ax.set_xlabel('$\pi_T$') ax.legend(loc='upper right') plt.show() fig, ax = plt.subplots() for i, j in enumerate([10, 100]): ax.plot(range(T+1), π_path[j,:], color=colors[i], label=f'$\pi$_path, {j}-th simulation') ax.plot(range(1,T+1), w_path[j,:], color=colors[i], label=f'$w$_path, {j}-th simulation', alpha=0.3) ax.legend(loc='upper right') ax.set_xlabel('$t$') ax.set_ylabel('$\pi_t$') ax2 = ax.twinx() ax2.set_ylabel("$w_t$") plt.show() # create table table = create_table(list(np.linspace(0,1,11)), N=10000, T=500) table @njit def compute_cond_var(pi, mc_size=int(1e6)): # create monte carlo draws mc_draws = np.zeros(mc_size) for i in prange(mc_size): if np.random.rand() <= pi: mc_draws[i] = np.random.beta(F_a, F_b) else: mc_draws[i] = np.random.beta(G_a, G_b) dev = pi*f(mc_draws)/(pi*f(mc_draws) + (1-pi)*g(mc_draws)) - pi return np.mean(dev**2) pi_array = np.linspace(0, 1, 40) cond_var_array = [] for pi in pi_array: cond_var_array.append(compute_cond_var(pi)) fig, ax = plt.subplots() ax.plot(pi_array, cond_var_array) ax.set_xlabel('$\pi_{t-1}$') ax.set_ylabel('$\sigma^{2}(\pi_{t}\\vert \pi_{t-1})$') plt.show()