Template Specification | Description |
---|---|
Summary | This template demonstrates how to parallelize the training of hundreds of time-series forecasting models with Ray Tune. The template uses the statsforecast library to fit models to partitions of the M4 forecasting competition dataset. |
Time to Run | Around 5 minutes to train all models. |
Minimum Compute Requirements | No hard requirements. The default is 8 nodes with 8 CPUs each. |
Cluster Environment | This template uses the latest Anyscale-provided Ray ML image using Python 3.9: anyscale/ray-ml:latest-py39-gpu , with some extra requirements from requirements.txt installed on top. If you want to change to a different cluster environment, make sure that it is based off of this image and includes all packages listed in the requirements.txt file. |
The end result of the template is fitting multiple models on each dataset partition, then determining the best model based on cross-validation metrics. Then, using the best model, we can generate forecasts like the ones shown below:
In many model training, the focus is on training models on multiple subsets of a dataset, rather than training a single model on the entire dataset. Each model is trained on an independent dataset partition, allowing Ray to parallelize the workload by running multiple training jobs concurrently, instead of sequentially training each model.
Slot in your code below wherever you see the ✂️ icon to build off of this template!
The framework and data format used in this template can be easily replaced to suit your own application!
When running in a distributed Ray Cluster, all nodes need to have access to dependencies.
For this, we'll use pip install --user
to install the necessary requirements. On an Anyscale Workspace, this is configured to install packages to a shared filesystem that will be available to all nodes in the cluster.
pip install --user -r requirements.txt
After installing all the requirements, we'll start with some imports.
import matplotlib.pyplot as plt
import pandas as pd
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA, AutoETS, MSTL
from ray import train, tune
from ray.train import RunConfig
Next, we define the custom training function that fits the forecasting models and computes evaluation metrics. Ray Tune will distribute this code across the cluster and schedule for as many training jobs as possible to execute in parallel, considering the available cluster resources.
✂️ Replace this with your own training logic to run per dataset partition.
The only additional Ray Tune code that is added is the
train.report
at the end of the training function. This reports metrics for Ray Tune to log, which can be analyzed after the run finishes.
n_cv_windows = 1
# Try two different types of forecasting models per dataset partition.
# The dataset contains hourly records, so the `season_length` is 24 hours.
models = [
AutoETS(season_length=24),
MSTL(season_length=24, trend_forecaster=AutoARIMA()),
]
# See the appendix for info on setting resource requirements for each trial.
cpus_per_trial = len(models) * n_cv_windows
def train_fn(config: dict):
# First, define some helper functions for fetching data and computing eval metrics.
def get_m5_partition(unique_id: str) -> pd.DataFrame:
df = pd.read_parquet(
"https://datasets-nixtla.s3.amazonaws.com/m4-hourly.parquet"
)
df = df[df["unique_id"] == unique_id]
return df.dropna()
def evaluate_cross_validation(df: pd.DataFrame) -> pd.DataFrame:
from sklearn.metrics import mean_squared_error
models = df.drop(columns=["ds", "cutoff", "y"]).columns.tolist()
evals = []
for model in models:
eval_ = (
df.groupby(["unique_id", "cutoff"])
# Calculate the Root Mean Squared Error (RMSE)
.apply(
lambda x: mean_squared_error(
x["y"].values, x[model].values, squared=False
)
).to_frame()
)
eval_.columns = [model]
evals.append(eval_)
evals = pd.concat(evals, axis=1)
evals = evals.groupby(["unique_id"]).mean(numeric_only=True)
evals["best_model"] = evals.idxmin(axis=1)
return evals
# Later, we will set up Ray Tune to populate `config['data_partition_id']`.
# Use this value to determine which partition of the dataset to use.
data_partition_id = config["data_partition_id"]
train_df = get_m5_partition(data_partition_id)
forecast_horizon = 24 # Forecast the next 24 hours
sf = StatsForecast(
df=train_df,
models=models,
freq="H",
# Set the number of cores used by statsforecast to the
# number of CPUs assigned to the trial!
n_jobs=cpus_per_trial,
)
cv_df = sf.cross_validation(
h=forecast_horizon,
step_size=forecast_horizon,
n_windows=n_cv_windows,
)
eval_df = evaluate_cross_validation(df=cv_df)
best_model = eval_df["best_model"][data_partition_id]
forecast_mse = eval_df[best_model][data_partition_id]
if data_partition_id == "H1":
# For the first data partition, plot forecasts of the best model.
forecast_df = sf.forecast(h=forecast_horizon)
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
plot_df = pd.concat([train_df, forecast_df]).set_index("ds")
plot_df[["y", best_model]].plot(ax=ax)
ax.set_title(f"Forecast for data partition: {data_partition_id}")
ax.set_xlabel(f"Timestamp [ds]")
ax.set_ylabel(f"Target [y]")
ax.get_figure().savefig("prediction.png")
# Report the best-performing model and its corresponding eval metric.
train.report({"forecast_mse": forecast_mse, "best_model": best_model})
trainable = tune.with_resources(train_fn, resources={"CPU": cpus_per_trial})
In this template, we consider the dataset partition ID as a hyperparameter, and we leverage Ray Tune to parallelize the execution of our training function across each dataset partition.
✂️ Modify the hyperparameter search space
param_space
to enable your training function to configure the dataset! This is howconfig['data_partition_id']
from earlier gets populated.
# First, pull the list of unique IDs used to partition the dataset.
data_partition_ids = list(
pd.read_parquet(
"https://datasets-nixtla.s3.amazonaws.com/m4-hourly.parquet",
columns=["unique_id"],
)["unique_id"].unique()
)
print(f"Training on a total of {len(data_partition_ids)} dataset partitions.")
param_space = {
"data_partition_id": tune.grid_search(data_partition_ids),
}
Run many model training using Ray Tune!
tuner = tune.Tuner(
trainable,
param_space=param_space,
# Experiment results are saved to a shared filesystem available to all nodes.
run_config=RunConfig(storage_path="/mnt/cluster_storage"),
)
result_grid = tuner.fit()
View the reported results of all trials as a dataframe.
results_df = result_grid.get_dataframe()
results_df
We saved an image of the forecast generated by the best model trained on the first dataset partition 'H1'
.
Let's find that file and display it!
from IPython.display import Image, display
import os
for result in result_grid:
# Find the result associated with the run that saved a forecast plot.
if result.config["data_partition_id"] == "H1":
display(Image(os.path.join(result.path, "prediction.png")))
break
This template is a quickstart to using Ray Tune for many model training. See this blog post for more information on the benefits of performing many model training with Ray!
At a high level, this template showed how to do the following:
tune.with_resources
was used to specify the resources needed to launch one of our training jobs.
Feel free to change this to the resources required by your application! You can also comment out the tune.with_resources
block to assign 1 CPU
(the default) to each trial.
Note that the number of CPUs to assign a trial is dependent on the workload.
In this template, statsforecast
has a n_jobs
configuration that determines the number of CPU cores to use for performing the model fitting and cross-validation within a trial. So, we should set n_jobs = cpus_per_trial
. We chose to set the parallelism equal to the total number of models that are fitted during cross-validation: M model types * N temporal cross-validation windows = 2 * 1 = 2
.
See Ray Tune's guide on assigning resources for more information.