There are 5 possible forecasting strategies:
step
points and use them to forecast next points.The first two of these strategies are available in ETNA, and we will take a closer look at them in this notebook.
Table of contents
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
from etna.analysis import plot_backtest
from etna.datasets import TSDataset
from etna.metrics import MAE
from etna.metrics import MAPE
from etna.metrics import SMAPE
from etna.models import CatBoostPerSegmentModel
from etna.transforms import LagTransform
from etna.transforms import LinearTrendTransform
HORIZON = 14
HISTORY_LEN = 5 * HORIZON
NUMBER_OF_LAGS = 21
Let's load and plot the dataset:
df = pd.read_csv("data/example_dataset.csv")
df = TSDataset.to_dataset(df)
ts = TSDataset(df, freq="D")
ts.plot()
Recursive strategy in ETNA is implemented via AutoregressivePipeline
.
AutoRegressivePipeline
is pipeline, which iteratively forecasts step
values ahead and after that uses forecasted values to build the features for the next steps.
step
, since the method needs to recalculate features $\lceil{\frac{horizon}{step}} \rceil$ timesHORIZON
Note:
We will add linear trend into the model(because we are working with tree-based models) and use target's lags as features
from etna.pipeline import AutoRegressivePipeline
model = CatBoostPerSegmentModel()
transforms = [
LinearTrendTransform(in_column="target"),
LagTransform(in_column="target", lags=[i for i in range(1, 1 + NUMBER_OF_LAGS)], out_column="target_lag"),
]
autoregressivepipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=HORIZON, step=1)
metrics_recursive_df, forecast_recursive_df, _ = autoregressivepipeline.backtest(
ts=ts, metrics=[SMAPE(), MAE(), MAPE()]
)
autoregressive_pipeline_metrics = metrics_recursive_df.mean()
[Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 16.0s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 31.5s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 46.5s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 56.6s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 1.1min [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 1.1min [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 1.6s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 3.1s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 4.6s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 6.2s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 7.7s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 7.7s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.0s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.2s
plot_backtest(forecast_recursive_df, ts, history_len=HISTORY_LEN)
Recursive strategy in ETNA is implemented via Pipeline
and DirectEnsemble
. This strategy assumes conditional independence of forecasts.
Pipeline
¶Pipeline
implements the version of direct strategy, where the only one model is fitted to forecast all the points in the future. This implies the several things:
Pipeline
doesn't accept lags less than horizon
Note:
As mentioned above, we cannot use lags less than horizon
, so now we will use lags from horizon
to horizon + number_of_lags
from etna.pipeline import Pipeline
model = CatBoostPerSegmentModel()
transforms = [
LinearTrendTransform(in_column="target"),
LagTransform(in_column="target", lags=list(range(HORIZON, HORIZON + NUMBER_OF_LAGS)), out_column="target_lag"),
]
pipeline = Pipeline(model=model, transforms=transforms, horizon=HORIZON)
metrics_pipeline_df, forecast_pipeline_df, _ = pipeline.backtest(ts=ts, metrics=[SMAPE(), MAE(), MAPE()])
pipeline_metrics = metrics_pipeline_df.mean()
[Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.3s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.1s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 16.7s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 22.3s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 27.9s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 27.9s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 0.3s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 0.4s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.5s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.5s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.0s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.2s
plot_backtest(forecast_pipeline_df, ts, history_len=HISTORY_LEN)
DirectEnsemble
¶DirectEnsemble
fits the separate pipeline to forecast each time subsegment. Forecasting the future, it selects base pipeline with the shortest horizon that covers the timestamp of the current forecasted point. Let's see an example of choosing a base pipeline for forecasting:
Let's build the separate pipeline for each week of interest. The first week will be forecasted using the lags from 7
to 7 + number_of_lags
and the second one with lags from horizon
to horizon + number_of_lags
. We expect that the using of the near lags for the first week might improve the forecast quality
First, let's build our pipelines:
horizons = [7, 14]
model_1 = CatBoostPerSegmentModel()
transforms_1 = [
LinearTrendTransform(in_column="target"),
LagTransform(
in_column="target", lags=[i for i in range(horizons[0], horizons[0] + NUMBER_OF_LAGS)], out_column="target_lag"
),
]
pipeline_1 = Pipeline(model=model_1, transforms=transforms_1, horizon=horizons[0])
model_2 = CatBoostPerSegmentModel()
transforms_2 = [
LinearTrendTransform(in_column="target"),
LagTransform(
in_column="target", lags=[i for i in range(horizons[1], horizons[1] + NUMBER_OF_LAGS)], out_column="target_lag"
),
]
pipeline_2 = Pipeline(model=model_2, transforms=transforms_2, horizon=horizons[1])
Secondly, we will create ensemble and forecasts:
from etna.ensembles import DirectEnsemble
ensemble = DirectEnsemble(pipelines=[pipeline_1, pipeline_2])
metrics_ensemble_df, forecast_ensemble_df, _ = ensemble.backtest(ts=ts, metrics=[SMAPE(), MAE(), MAPE()])
ensemble_metrics = metrics_ensemble_df.mean()
[Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.3s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 10.5s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 10.5s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 10.5s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.5s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.0s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.0s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 21.5s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.6s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.2s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 32.7s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.6s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.2s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 43.9s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.6s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 55.2s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 55.2s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.4s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 0.6s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 0.8s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 1.0s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 1.0s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.0s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.2s
plot_backtest(forecast_ensemble_df, ts, history_len=HISTORY_LEN)
DirectEnsemble
described above requires the building of the separate pipeline for each of the time subsegment. This pipelines often has many common parts and differs only in the few places. To make the definition of the pipelines a little bit shorter, you can use assemble_pipelines
. It generates the pipelines using the following rules:
i
-th pipeline will hold i
-th model(horizon).Let's consider that A
, B
, C
, D
, E
are different transforms.
Example 1
If input transform sequence is [A, B, C]
, function will put [A, B, C]
for each pipeline
Example 2
If input transform sequence is [A, [B, C], D, E]
, function will put [A, B, D, E]
for the first generated pipeline and [A, C, D, E]
for the second.
Example 3
If input transform sequence is [A, [B, C], [D, E]]
, function will put [A, B, D]
for the first generated pipeline and [A, C, E]
for the second.
Example 4
If input transform sequence is [A, [B, None]]
, function will put [A, B]
for the first generated pipeline and [A]
for the second.
Let's build the ensemble from the previous section using assemble_pipelines
from etna.pipeline import assemble_pipelines
models = [CatBoostPerSegmentModel(), CatBoostPerSegmentModel()]
transforms = [
LinearTrendTransform(in_column="target"),
[
LagTransform(
in_column="target",
lags=[i for i in range(horizons[0], horizons[0] + NUMBER_OF_LAGS)],
out_column="target_lag",
),
LagTransform(
in_column="target",
lags=[i for i in range(horizons[1], horizons[1] + NUMBER_OF_LAGS)],
out_column="target_lag",
),
],
]
pipelines = assemble_pipelines(models=models, transforms=transforms, horizons=horizons)
pipelines
[Pipeline(model = CatBoostPerSegmentModel(iterations = None, depth = None, learning_rate = None, logging_level = 'Silent', l2_leaf_reg = None, thread_count = None, ), transforms = [LinearTrendTransform(in_column = 'target', poly_degree = 1, ), LagTransform(in_column = 'target', lags = [7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27], out_column = 'target_lag', )], horizon = 7, ), Pipeline(model = CatBoostPerSegmentModel(iterations = None, depth = None, learning_rate = None, logging_level = 'Silent', l2_leaf_reg = None, thread_count = None, ), transforms = [LinearTrendTransform(in_column = 'target', poly_degree = 1, ), LagTransform(in_column = 'target', lags = [14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34], out_column = 'target_lag', )], horizon = 14, )]
Pipelines generation process looks now a bit simpler, isn't it? Now it's time to create DirectEnsemble
out of them:
ensemble = DirectEnsemble(pipelines=pipelines)
metrics_ensemble_df_2, forecast_ensemble_df_2, _ = ensemble.backtest(ts=ts, metrics=[SMAPE(), MAE(), MAPE()])
[Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.3s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 10.6s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 10.6s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 10.6s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.7s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 21.9s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.7s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 33.3s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.7s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 44.5s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 5.6s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 11.3s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 55.8s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 55.8s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.4s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 0.6s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 0.8s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 1.0s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 1.0s [Parallel(n_jobs=1)]: Done 1 tasks | elapsed: 0.0s [Parallel(n_jobs=1)]: Done 2 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 3 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 4 tasks | elapsed: 0.1s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.2s [Parallel(n_jobs=1)]: Done 5 tasks | elapsed: 0.2s
Let's check that the forecasts has not changed:
pd.testing.assert_frame_equal(metrics_ensemble_df_2, metrics_ensemble_df)
In this notebook, we discussed forecasting strategies available in ETNA and look at the examples of their usage. In conclusion, let's compare their quality on the considered dataset:
df_res = pd.DataFrame(
data=[ensemble_metrics, pipeline_metrics, autoregressive_pipeline_metrics],
index=["direct_ensemble", "pipeline", "autoregressive_pipeline"],
).drop("fold_number", axis=1)
df_res = df_res.sort_values(by="SMAPE")
df_res
SMAPE | MAE | MAPE | |
---|---|---|---|
direct_ensemble | 7.152913 | 28.657613 | 7.004382 |
autoregressive_pipeline | 7.247425 | 29.945816 | 7.117746 |
pipeline | 7.319264 | 28.476013 | 7.102676 |