This is the accompanying Python notebook to my blog post.
In the post we have managed to decompose the total prediction error for a given supervised problem $P(Y|X)$ and a model $\hat{f}$ as such:
$$ E[d(Y, \hat{f}_D(X))] = \underbrace{E \left[ (f(X) - E[\hat{f}_D(X)|X])^2 \right]}_\text{bias} + \underbrace{E \left[ Var[\hat{f}_D(X)|X] \right]}_\text{variance} + \underbrace{Var[\epsilon]}_\text{irreducible error} $$We want to minimize the above expected prediction error. To illustrate the decomposition for actual models and show the concept of model complexity in practice I have written the ModelComparator
class. It takes a 1-D supervised learning problem and a list of several models to compare w.r.t. the given problem. Their respective bias, variance, irreducible error, total prediction error and training error are calculated.
ModelComparator
class¶Feel free to skip this part if you are not interested in the implementation and just want to play with the ModelComparator
functions.
# data
import numpy as np
# random generation
from scipy.stats import uniform, norm, multinomial
from numpy.random import default_rng
# models
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.pipeline import Pipeline
from sklearn.base import clone
# plotting
import plotly
import plotly.express as px
import plotly.graph_objects as go
# progress bar
from tqdm.notebook import tqdm
class ModelComparator(object):
def __init__(self, regr_func, models, error_distr=None,
x_train_distr=None, x_train_distr_size=128,
plotting_resolution=2048, plotting_start=0, plotting_end=1,
plotting_width=800, plotting_height=600):
# supervised learning problem (regression function, error, x and x_dataset)
self.regr_func = regr_func
self.error_distr = error_distr or norm(loc=0, scale=1)
self.x_train_distr = x_train_distr or uniform(loc=0, scale=1)
self.x_train_distr_size = x_train_distr_size
# models to compare
self.models = models
# plotting related
self.plotting_resolution = plotting_resolution
self.plotting_start = plotting_start
self.plotting_end = plotting_end
self.plotting_width = plotting_width
self.plotting_height = plotting_height
self.legend = dict(yanchor='top', y=0.99, xanchor='left', x=0.01)
####
# model training
self.datasets = [] # size = (#datasets)
self.trained_models = [] # size = (#models, #datasets)
self.trainset_distance = None
# evaluation (averaged over X)
self.eval_m_avg_bias = None
self.eval_m_avg_var = None
# evaluation (for fixed X)
self.eval_m_dp_exp = None
self.eval_m_dp_var = None
self.eval_m_dp_xs = None
def generate_rnd_train_dataset(self):
'''
helper to generate random train datasets D
'''
xs = self.x_train_distr.rvs(size=(self.x_train_distr_size, 1))
fs = np.apply_along_axis(self.regr_func, 1, xs).reshape(-1, 1)
errors = self.error_distr.rvs(size=(self.x_train_distr_size, 1))
ys = fs + errors
return (xs, ys)
def fit_models(self, number_datasets):
'''
number_datasets: the number of datasets D to generate
'''
tqdm.write('ModelComparator.fit_models()')
self.datasets.clear()
self.trained_models.clear()
self.trainset_distance = np.full((len(self.models), number_datasets), np.nan)
# generate random datasets
tqdm.write('-- generating random datasets')
self.datasets = [self.generate_rnd_train_dataset() for _ in range(number_datasets)]
# fit models on datasets
tqdm.write('-- fitting models')
for model_name, model in tqdm(self.models.items()):
# create trained copies of the model
model_trained_copies = [clone(model).fit(ds_x, ds_y) for ds_x, ds_y in self.datasets]
self.trained_models.append(model_trained_copies)
# calculate trainset distance
tqdm.write('-- calculating trainset distance')
for model_index in tqdm(range(len(self.models))):
for ds_index in range(len(self.datasets)):
trained_model = self.trained_models[model_index][ds_index]
xs, ys = self.datasets[ds_index]
ys_pred = trained_model.predict(xs)
avg_dist = ((ys - ys_pred)**2).mean()
self.trainset_distance[model_index, ds_index] = avg_dist
assert not np.isnan(self.trainset_distance).any()
def get_model_names(self):
return [name for name, _ in self.models.items()]
def plot_models_on_dataset(self, ds_index,
show_models=None, show_regr_func=True,
show_error_bands=False, show_dataset=True, show_title=True):
assert ds_index < len(self.datasets)
assert show_models is None or isinstance(show_models, list)
xs = np.linspace(self.plotting_start, self.plotting_end, num=self.plotting_resolution).reshape(-1, 1)
# plot
fig = go.Figure()
if show_regr_func:
# plot regression function
fs = np.apply_along_axis(self.regr_func, 1, xs)
error_upper = fs + self.error_distr.std()
error_lower = fs - self.error_distr.std()
if show_error_bands:
# plot error bands
error_band_col = 'rgb(170, 170, 170)'
fig.add_trace(go.Scatter(
name='regression function + std.', x=xs.flatten(), y=error_upper.flatten(),
mode='lines', line=dict(color=error_band_col, width=0), marker=dict(),
showlegend=False, legendgroup='r_f'
))
fig.add_trace(go.Scatter(
name='regression function - std.', x=xs.flatten(), y=error_lower.flatten(),
mode='lines', line=dict(color=error_band_col, width=0), marker=dict(), fill='tonexty',
showlegend=False, legendgroup='r_f'
))
fig.add_trace(go.Scatter(
x=xs.flatten(), y=fs.flatten(),
name='regression function',
mode='lines', line=dict(color='black'),
showlegend=True, legendgroup='r_f'
))
if show_dataset:
# plot training set
ds_xs, ds_ys = self.datasets[ds_index]
fig.add_trace(go.Scatter(x=ds_xs.flatten(), y=ds_ys.flatten(),
name='training dataset',
mode='markers', marker=dict(color='gray')))
# plot trained models
for m_idx, model_name in enumerate(self.get_model_names()):
# skip some models
if show_models is not None and model_name not in show_models:
continue
ys_pred = self.trained_models[m_idx][ds_index].predict(xs)
fig.add_trace(go.Scatter(
x=xs.flatten(), y=ys_pred.flatten(), name=f'model: {model_name}', mode='lines'))
fig.update_layout(title=f'models trained on dataset #{ds_index}' if show_title else '',
xaxis_title='x', yaxis_title='y',
width=self.plotting_width, height=self.plotting_height,
legend=(None if show_models is None or len(show_models) > 3 else self.legend)
)
return fig
def evaluate_models_per_x(self, xs):
# reshape to be used by scikit-learn
xs = xs.reshape(-1, 1)
# calculate predictions
tqdm.write('-- calculating predictions')
ys_pred = np.full((len(self.models), len(self.datasets), len(xs)), np.nan)
for m_index in tqdm(range(len(self.models))):
for ds_index in range(len(self.datasets)):
ys_pred[m_index, ds_index] = self.trained_models[m_index][ds_index].predict(xs).reshape(-1)
assert not np.isnan(ys_pred).any()
# calculate regression function values
fs = np.apply_along_axis(self.regr_func, 1, xs)
# calculate mean, variance and bias
tqdm.write('-- calculating mean, variance and bias')
# per (model, data point) i.e. m_dp (randomness comes from dataset D)
m_dp_exp = ys_pred.mean(axis=1)
m_dp_var = ys_pred.var(axis=1)
m_dp_bias = (fs.reshape(1, -1) - m_dp_exp)**2
assert not np.isnan(m_dp_exp).any()
assert not np.isnan(m_dp_var).any()
assert not np.isnan(m_dp_bias).any()
return (m_dp_exp, m_dp_var, m_dp_bias)
def evaluate_models_decomposition(self, eval_x_distr=None, eval_x_size=512):
'''
Calculates the bias-variance decomposition of all the models (averaged over X).
eval_x_distr: P(X)_{prediction}
eval_size: on how many points X to evaluation
'''
assert len(self.datasets) > 0
assert len(self.trained_models) > 0
# evaluation by default on the training distribution
eval_x_distr = eval_x_distr or self.x_train_distr
tqdm.write('ModelComparator.evaluate_models()')
xs = eval_x_distr.rvs(size=eval_x_size)
_, m_dp_var, m_dp_bias = self.evaluate_models_per_x(xs)
# per model i.e. (m) (randomness comes from X)
self.eval_m_avg_bias = m_dp_bias.mean(axis=1)
self.eval_m_avg_var = m_dp_var.mean(axis=1)
assert not np.isnan(self.eval_m_avg_bias).any()
assert not np.isnan(self.eval_m_avg_var).any()
def plot_models_decomposition(self, show_title=True):
assert self.eval_m_avg_bias is not None
assert self.eval_m_avg_var is not None
model_names = self.get_model_names()
# for models
avg_dist = self.eval_m_avg_bias + self.eval_m_avg_var + self.error_distr.var()
avg_trainset_dist = self.trainset_distance.mean(axis=1)
fig = go.Figure()
fig.add_trace(go.Scatter(x=model_names, y=avg_trainset_dist, name='dataset error',
mode='lines', line=dict(color='green')))
fig.add_trace(go.Scatter(x=model_names, y=[self.error_distr.var()] * len(model_names), name='irr. error',
mode='lines', line=dict(color='gray')))
fig.add_trace(go.Scatter(x=model_names, y=self.eval_m_avg_bias, name='bias',
mode='lines', line=dict(color='red')))
fig.add_trace(go.Scatter(x=model_names, y=self.eval_m_avg_var, name='variance',
mode='lines', line=dict(color='blue')))
fig.add_trace(go.Scatter(x=model_names, y=avg_dist, name='total error',
mode='lines', line=dict(color='black')))
fig.update_layout(title=('bias-variance-(irreducible error) decomposition & trainset error' if show_title else ''),
xaxis_title='models', yaxis_title='error',
width=self.plotting_width, height=self.plotting_height
)
return fig
def evaluate_models_x_interval(self):
'''
Calculates the expectation and variance for all models in an interval on X.
'''
tqdm.write('ModelComparator.evaluate_models_x_interval()')
self.eval_m_dp_xs = np.linspace(self.plotting_start, self.plotting_end, num=self.plotting_resolution)
self.eval_m_dp_exp, self.eval_m_dp_var, _ = self.evaluate_models_per_x(self.eval_m_dp_xs)
def plot_models_x_interval(self, show_models=None, show_title=True):
assert show_models is None or isinstance(show_models, list)
model_names = self.get_model_names()
xs = self.eval_m_dp_xs
default_cols = plotly.colors.DEFAULT_PLOTLY_COLORS
fig = go.Figure()
# plot regression function
fs = np.apply_along_axis(self.regr_func, 0, xs)
fig.add_trace(go.Scatter(x=xs, y=fs,
name='regression function',
mode='lines', line=dict(color='black')))
# plot models' expectation and variance
for model_idx in range(len(self.models)):
model_name = model_names[model_idx]
# plot on specified models
if show_models is not None and model_name not in show_models:
continue
model_exp = self.eval_m_dp_exp[model_idx]
model_std = self.eval_m_dp_var[model_idx]**0.5
model_upper_err = model_exp + model_std
model_lower_err = model_exp - model_std
color = default_cols[model_idx % len(default_cols)]
trace_exp = go.Scatter(
name=model_name, x=xs, y=model_exp,
mode='lines', line=dict(color=color),
showlegend=True, legendgroup=model_name
)
fig.add_trace(trace_exp)
fig.add_trace(go.Scatter(
name=model_name + r' + $\sigma$', x=xs, y=model_upper_err,
mode='lines', line=dict(width=0), marker=dict(color=color),
showlegend=False, legendgroup=model_name
))
fig.add_trace(go.Scatter(
name=model_name + r'$ - \sigma$', x=xs, y=model_lower_err,
mode='lines', line=dict(width=0), marker=dict(color=color), fill='tonexty',
showlegend=False, legendgroup=model_name
))
fig.update_layout(title=('models expectation and variance' if show_title else ''),
xaxis_title='x', yaxis_title='y',
width=self.plotting_width, height=self.plotting_height,
legend=self.legend
)
return fig
The regression function is $f(x) = x + 1.5 \times sin(x)$. The error is assumed to be standard Gaussian: $\epsilon \sim N(0,1)$. The distribution of $X$ is $X_{prediction} \sim X_{dataset} \sim Uniform(10,20)$.
regr_func = lambda x: x + 1.5 * np.sin(x)
x_distr_uniform = uniform(loc=10, scale=10)
mc = ModelComparator(
regr_func = regr_func,
models={f'knn{i}' : KNeighborsRegressor(n_neighbors=i) for i in list(range(45, 0, -1))},
x_train_distr = x_distr_uniform,
plotting_start=8, plotting_end=22
)
mc.fit_models(number_datasets=1024)
ModelComparator.fit_models() -- generating random datasets -- fitting models
0%| | 0/45 [00:00<?, ?it/s]
-- calculating trainset distance
0%| | 0/45 [00:00<?, ?it/s]
Let's have a look at the regression function, $\sigma$ error bands around it and a sample dataset $d$ generated from $P(Y|X)$. As one can see the data points span the range $[10,20]$, since $X_{dataset} \sim Uniform(10,20)$.
mc.plot_models_on_dataset(0, show_models=[], show_error_bands=True, show_title=True)
Let's now have a look at three KNN models of different complexities trained on the exact same dataset.
Here, knn45
, knn11
and knn1
have been selected to respectively show underfitting, good fit and overfitting.
models_subset_knn = ['knn45', 'knn11', 'knn1']
mc.plot_models_on_dataset(0, show_models=models_subset_knn, show_title=True).show()
In the following plot we can see a range of KNN models with different hyperparameter $K$ trained on the same dataset $d$. Feel free to show/hide different models by clicking the legend entries on the right (use double click to display only a particular model). As one can see with decreasing $K$ the model complexity increases.
mc.plot_models_on_dataset(0).show()
Here we calculate the average bias-variance decomposition all the KNN models we have trained.
mc.evaluate_models_decomposition()
mc.plot_models_decomposition(show_title=True).show()
ModelComparator.evaluate_models() -- calculating predictions
0%| | 0/45 [00:00<?, ?it/s]
-- calculating mean, variance and bias
mc.evaluate_models_x_interval()
mc.plot_models_x_interval(show_models=['knn45'], show_title=True).show()
ModelComparator.evaluate_models_x_interval() -- calculating predictions
0%| | 0/45 [00:00<?, ?it/s]
-- calculating mean, variance and bias
class mixed_distr(object):
def __init__(self, distributions, probabilities):
self.distributions = distributions
self.probabilities = probabilities
def rvs(self, size=(1,)):
all_samples = np.array([distr.rvs(size=size) for distr in self.distributions])
idxs = default_rng().choice(range(len(self.distributions)),
size=size, p=self.probabilities, replace=True)
samples = np.take_along_axis(all_samples, idxs[None, ...], axis=0)
samples = np.squeeze(samples, 0)
return samples
x_distr_split = mixed_distr([uniform(10, 2), uniform(18, 2)], [0.5, 0.5])
mc = ModelComparator(
regr_func = regr_func,
models={f'knn{i}' : KNeighborsRegressor(n_neighbors=i) for i in list(range(45, 0, -1))},
x_train_distr = x_distr_split,
plotting_start=8, plotting_end=22
)
mc.fit_models(number_datasets=64)
ModelComparator.fit_models() -- generating random datasets -- fitting models
0%| | 0/45 [00:00<?, ?it/s]
-- calculating trainset distance
0%| | 0/45 [00:00<?, ?it/s]
mc.plot_models_on_dataset(0, show_models=[], show_error_bands=True, show_title=True).show()
mc.plot_models_on_dataset(0, show_models=models_subset_knn, show_title=True).show()
Now we would like to look at the bias-variance decomposition for various values of $X$.
mc.evaluate_models_x_interval()
ModelComparator.evaluate_models_x_interval() -- calculating predictions
0%| | 0/45 [00:00<?, ?it/s]
-- calculating mean, variance and bias
mc.plot_models_x_interval(show_models=['knn1'], show_title=True).show()
poly_regr_models = {
f'lr{i}' :
Pipeline([
('polynomial', PolynomialFeatures(i, include_bias=False)),
('regr', LinearRegression(normalize=True))
]) for i in range(1, 22)
}
mc = ModelComparator(
regr_func = regr_func,
models=poly_regr_models,
x_train_distr = x_distr_uniform,
plotting_start=9, plotting_end=21
)
mc.fit_models(number_datasets=2048)
ModelComparator.fit_models() -- generating random datasets -- fitting models
0%| | 0/21 [00:00<?, ?it/s]
-- calculating trainset distance
0%| | 0/21 [00:00<?, ?it/s]
mc.plot_models_on_dataset(0, show_models=['lr1', 'lr6'], show_title=True).show()
mc.evaluate_models_decomposition()
mc.plot_models_decomposition(show_title=True).show()
ModelComparator.evaluate_models() -- calculating predictions
0%| | 0/21 [00:00<?, ?it/s]
-- calculating mean, variance and bias
mc = ModelComparator(
regr_func = regr_func,
models=poly_regr_models,
x_train_distr = x_distr_split,
plotting_start=8.5, plotting_end=21.5
)
mc.fit_models(number_datasets=64)
ModelComparator.fit_models() -- generating random datasets -- fitting models
0%| | 0/21 [00:00<?, ?it/s]
-- calculating trainset distance
0%| | 0/21 [00:00<?, ?it/s]
mc.plot_models_on_dataset(0, show_models=['lr1', 'lr6'], show_title=True).show()
mc.evaluate_models_x_interval()
ModelComparator.evaluate_models_x_interval() -- calculating predictions
0%| | 0/21 [00:00<?, ?it/s]
-- calculating mean, variance and bias
mc.plot_models_x_interval(show_models=['lr6'], show_title=True).show()