#!/usr/bin/env python # coding: utf-8 # # Demo: Reactive plot with multiple regressions # In[1]: import numpy as np import bqplot.pyplot as plt from ipywidgets import VBox, FloatSlider # ## 1. Non-parametric regression # Let us define the random variables $X$ and $Y$ by # $$ # f(x) = x \frac{1 + x}{1 + x^2}, \qquad X \sim \mathcal{N}(0, 1), \quad Y = f(X) + \varepsilon, # $$ # where $\varepsilon \sim \mathcal{N}(0, 1/4)$ is independent of X # In[2]: def f(x): return (x + x ** 2) / (1 + x ** 2) n = 1000 sigma = 0.5 X = np.random.randn(n) Y = f(X) + sigma * np.random.randn(n) # In[3]: plt.figure(title='Joint sample, and Conditional expecation for (X, Y)') plt.scatter(X, Y, alpha=0.3) mesh = np.linspace(-4, 4, 201) plt.plot(mesh, f(mesh), linewidth=3) plt.show() # ## Non-parametric regression # In[4]: def reg_non_param(x, bdwidth, x_sample, y_sample): """Values of the non-parametric regression of Y wrt X using a Gaussian kernel.""" def kern(u, x): """Gaussian kernel function""" return np.exp(-(u[:, np.newaxis] - x) ** 2 / (2 * bdwidth ** 2)) return np.sum(kern(x_sample, x) * y_sample[:, np.newaxis], axis=0) \ / np.sum(kern(x_sample, x), axis=0) # Plotting non-parametric regressions of $Y$ with respect to $X$ with different values of the bandwidth: # In[5]: plt.figure(legend_location='bottom-right') plt.scatter(X, Y, alpha=0.3) plt.plot(mesh, reg_non_param(mesh, 0.1, X, Y), 'b', linewidth=3, labels=['bandwidth 0.1'], display_legend=True) plt.plot(mesh, reg_non_param(mesh, 0.2, X, Y), 'r', linewidth=3, labels=['bandwidth 0.2'], display_legend=True) plt.plot(mesh, reg_non_param(mesh, 0.5, X, Y), 'g', linewidth=3, labels=['bandwidth 0.5'], display_legend=True) plt.show() # In[6]: figure = plt.figure(title='Non-parametric regression') scatter = plt.scatter(X, Y, alpha=0.3, enable_move=True) regression = plt.plot(mesh, np.zeros(mesh.shape), 'b', linewidth=3) slider = FloatSlider(min=0.05, max=2.0, value=1.0, step=0.05, description='bandwidth') def update(change=None): regression.y = reg_non_param(regression.x, slider.value, scatter.x, scatter.y) slider.observe(update, names=['value']) scatter.observe(update, names=['x', 'y']) update() VBox([figure, slider]) # ## 2. Multiple regression # In[7]: def basis(knots, x): """Values of order-1 B-spline basis functions.""" nb_knots = len(knots) diag = np.identity(nb_knots) res = np.empty((len(x), nb_knots)) for i in range(nb_knots): res[:, i] = np.interp(x, knots, diag[i]) return res # In[8]: basis_len = 10 knots = np.linspace(-3.5, 3.5, basis_len) plt.figure(title='Order-0 B-splines') plt.plot(mesh, basis(knots, mesh).T, linewidth=2) plt.ylim(0.0, 2.0) plt.show() # In[9]: def reg_param_coeffs(knots, x_sample, y_sample): """Computes the coefficients of the P-L regression of y_sample wrt. x_sample.""" bis = basis(knots, x_sample) var = bis.T.dot(bis) covar = y_sample.dot(bis) return np.linalg.lstsq(var, covar.T)[0] def eval_piecewise_linear(x, knots, coeffs): """Eveluates the piecewise linear function at the specified x for the knots and coeffs. """ return np.interp(x, knots, coeffs) # In[10]: plt.figure() plt.scatter(X, Y, alpha=0.3) knots1 = np.linspace(-3.0, 3.0, 10) plt.plot(mesh, eval_piecewise_linear(mesh, knots1, reg_param_coeffs(knots1, X, Y)), 'b', linewidth=3) knots2 = np.linspace(-3.0, 3.0, 20) plt.plot(mesh, np.interp(mesh, knots2, reg_param_coeffs(knots2, X, Y)), 'r', linewidth=3) plt.title('Different collections of knots') plt.show() # ## 3. Penalized multiple regression # In[11]: def second_derivative_on_dirac_basis(knots): """ Computes the coefficients of the second derivative of the basis functions on the Dirac comb. """ nb_knots = len(knots) res = np.zeros((nb_knots, nb_knots)) if nb_knots > 1: res[0, 0] = -1.0 / (knots[1] - knots[0]) res[0, 1] = 1.0 / (knots[1] - knots[0]) for i in range(1, nb_knots - 1): res[i, i - 1] = (1.0 / (knots[i] - knots[i - 1])) res[i, i] = -(1.0 / (knots[i] - knots[i - 1]) + 1.0 / (knots[i + 1] - knots[i])) res[i, i + 1] = 1.0 / (knots[i + 1] - knots[i]) if nb_knots > 1: res[nb_knots - 1, nb_knots - 2] = 1.0 / (knots[nb_knots - 1] - knots[nb_knots - 2]) res[nb_knots - 1, nb_knots - 1] = -1.0 / (knots[nb_knots - 1] - knots[nb_knots - 2]) return res def dirac_inner_product(knots, coeffs1, coeffs2): """ Equivalent to the finite-difference approximation for the second derivative. """ nb_knots = len(knots) res = 0.0 for i in range(nb_knots): res += 0.5 * (coeffs1[i] * coeffs2[i] + coeffs1[i - 1] * coeffs2[i - 1]) / (knots[i] - knots[i - 1]) return res def tikhonov_matrix(knots): """Computes the second-order Tikhonov matrix of the B-splines corresponding to specified knots. Note ---- The specified array of knots must be non-empty and increasingly sorted. """ basis_len = len(knots) res = np.zeros((basis_len, basis_len)) coeffs_on_dirac_basis = second_derivative_on_dirac_basis(knots) influence_order = 2 for i in range(basis_len): min_j = max(0, i - influence_order) max_j = min(basis_len, i + influence_order + 1) for j in range(min_j, max_j): res[i, j] = dirac_inner_product(knots, coeffs_on_dirac_basis[i], coeffs_on_dirac_basis[j]) return res # In[12]: def penalized_pl_regression(knots, x_sample, y_sample, tikhonov_factor): """Compute the second-order penalized P-L regression of y_sample wrt. x_sample. """ bis = basis(knots, x_sample) var = (bis.T).dot(bis) / len(x_sample) covar = y_sample.dot(bis) / len(x_sample) tikho = tikhonov_matrix(knots) return np.linalg.lstsq(var + tikhonov_factor * tikho, covar.T)[0] # In[13]: plt.figure(title='Testing multiple values for Tikhonov factor') plt.scatter(X, Y, alpha=0.3) knots = np.linspace(-3.0, 3.0, 25) plt.plot(mesh, eval_piecewise_linear(mesh, knots, penalized_pl_regression(knots, X, Y, 0.01)), 'r', linewidth=3) plt.plot(mesh, eval_piecewise_linear(mesh, knots, penalized_pl_regression(knots, X, Y, 0.1)), 'g', linewidth=3) plt.show() # In[14]: figure = plt.figure(title='Non-parametric regression') scatter_s = plt.scatter(X, Y, alpha=0.3, enable_move=True) spline = plt.plot(mesh, np.zeros(mesh.shape), 'b', linewidth=3) tikhonov = FloatSlider(min=0.02, max=1.0, value=0.5, step=0.01, description='Tikhonov') def update_spline(change=None): spline.y = eval_piecewise_linear(spline.x, knots, penalized_pl_regression(knots, scatter_s.x, scatter_s.y, tikhonov.value)) tikhonov.observe(update_spline, names=['value']) scatter_s.observe(update_spline, names=['x', 'y']) update_spline() VBox([figure, tikhonov]) # In[ ]: # In[ ]: # In[ ]: