In this notebook I verify 12 million forecasts in a couple of seconds using the RMSE metric on a dask.array
.
import xarray as xr
import pandas as pd
import numpy as np
import xskillscore as xs
import dask.array as da
from dask.distributed import Client
By default the dask.distributed.Client
uses a LocalCluster
cluster = LocalCluster()
client = Client(cluster)
However, this code can easily be adapted to scale on massive datasets using distributed computing via various methods of deployment:
or vendor products:
If anyone does run this example on a large cluster I would be curious how big you can scale nstores
and nskus
and how long it takes to run rmse
. You are welcome to post it in the issue section following this link.
Setup the client (i.e. connect to the scheduler):
client = Client()
client
Client
|
Cluster
|
Due to the success of your previous forecast (and verification using xskillscore!) the company you work for has expanded. They have grown to 4,000 stores each with 3,000 products:
nstores = 4000
nskus = 3000
nforecasts = nstores * nskus
print(f"That's {nforecasts:,d} different forecasts to verify!")
stores = np.arange(nstores)
skus = np.arange(nskus)
That's 12,000,000 different forecasts to verify!
The time period of interest is the same dates but for 2021:
dates = pd.date_range("1/1/2021", "1/5/2021", freq="D")
Setup the data as a dask.array
of dates x stores x skus.
dask
uses similar functions as numpy
. In this case switch the np.
to da.
to generate random numbers between 1 and 10:
data = da.random.randint(9, size=(len(dates), len(stores), len(skus))) + 1
data
|
Put this into an xarray.DataArray
and specify the Coordinates and dimensions:
y = xr.DataArray(data, coords=[dates, stores, skus], dims=["DATE", "STORE", "SKU"])
y
|
array(['2021-01-01T00:00:00.000000000', '2021-01-02T00:00:00.000000000', '2021-01-03T00:00:00.000000000', '2021-01-04T00:00:00.000000000', '2021-01-05T00:00:00.000000000'], dtype='datetime64[ns]')
array([ 0, 1, 2, ..., 3997, 3998, 3999])
array([ 0, 1, 2, ..., 29997, 29998, 29999])
Create a prediction array similar to that in 01_Deterministic.ipynb:
noise = da.random.uniform(-1, 1, size=(len(dates), len(stores), len(skus)))
yhat = y + (y * noise)
yhat
|
array(['2021-01-01T00:00:00.000000000', '2021-01-02T00:00:00.000000000', '2021-01-03T00:00:00.000000000', '2021-01-04T00:00:00.000000000', '2021-01-05T00:00:00.000000000'], dtype='datetime64[ns]')
array([ 0, 1, 2, ..., 3997, 3998, 3999])
array([ 0, 1, 2, ..., 29997, 29998, 29999])
Finally caculate RMSE at the store and sku level.
Use the .compute()
method to return the values:
%time xs.rmse(y, yhat, 'DATE').compute()
CPU times: user 1.52 s, sys: 964 ms, total: 2.49 s Wall time: 9.63 s
array([[1.53123262, 3.14700356, 3.15132902, ..., 4.94581499, 3.27822252, 4.06088894], [1.67277937, 4.08686635, 3.66413793, ..., 2.41712605, 2.57601639, 2.88718054], [2.76270445, 3.33921586, 3.06681608, ..., 3.34186527, 1.32741548, 2.08743438], ..., [2.35981265, 2.1617547 , 4.92081192, ..., 1.98393152, 3.1364395 , 3.59346663], [1.26363135, 2.77340328, 2.7967874 , ..., 2.8342274 , 3.01885276, 0.62828305], [4.29771572, 3.92254418, 2.15708334, ..., 4.099115 , 3.45980968, 3.73594864]])
array([ 0, 1, 2, ..., 3997, 3998, 3999])
array([ 0, 1, 2, ..., 29997, 29998, 29999])