In this notebook I verify 12 million forecasts in a couple of seconds using the RMSE metric on a dask.array
.
import xarray as xr
import pandas as pd
import numpy as np
import xskillscore as xs
import dask.array as da
from dask.distributed import Client
By default the dask.distributed.Client
uses a LocalCluster
cluster = LocalCluster()
client = Client(cluster)
However, this code can easily be adapted to scale on massive datasets using distributed computing via various methods of deployment:
or vendor products:
If anyone does run this example on a large cluster I would be curious how big you can scale nstores
and nskus
and how long it takes to run rmse
. You are welcome to post it in the issue section following this link.
Setup the client (i.e. connect to the scheduler):
client = Client()
client
Client
|
Cluster
|
Due to the success of your previous forecast (and verification using xskillscore!) the company you work for has expanded. They have grown to 4,000 stores each with 3,000 products:
nstores = 4000
nskus = 3000
nforecasts = nstores * nskus
print(f"That's {nforecasts:,d} different forecasts to verify!")
stores = np.arange(nstores)
skus = np.arange(nskus)
That's 12,000,000 different forecasts to verify!
The time period of interest is the same dates but for 2021:
dates = pd.date_range("1/1/2021", "1/5/2021", freq="D")
Setup the data as a dask.array
of dates x stores x skus.
dask
uses similar functions as numpy
. In this case switch the np.
to da.
to generate random numbers between 1 and 10:
data = da.random.randint(9, size=(len(dates), len(stores), len(skus))) + 1
data
|
Put this into an xarray.DataArray
and specify the Coordinates and dimensions:
y = xr.DataArray(data, coords=[dates, stores, skus], dims=["DATE", "STORE", "SKU"])
y
|
array(['2021-01-01T00:00:00.000000000', '2021-01-02T00:00:00.000000000', '2021-01-03T00:00:00.000000000', '2021-01-04T00:00:00.000000000', '2021-01-05T00:00:00.000000000'], dtype='datetime64[ns]')
array([ 0, 1, 2, ..., 3997, 3998, 3999])
array([ 0, 1, 2, ..., 2997, 2998, 2999])
Create a prediction array similar to that in 01_Deterministic.ipynb:
noise = da.random.uniform(-1, 1, size=(len(dates), len(stores), len(skus)))
yhat = y + (y * noise)
yhat
|
array(['2021-01-01T00:00:00.000000000', '2021-01-02T00:00:00.000000000', '2021-01-03T00:00:00.000000000', '2021-01-04T00:00:00.000000000', '2021-01-05T00:00:00.000000000'], dtype='datetime64[ns]')
array([ 0, 1, 2, ..., 3997, 3998, 3999])
array([ 0, 1, 2, ..., 2997, 2998, 2999])
Finally calculate RMSE at the store and sku level.
Use the .compute()
method to return the values:
%time xs.rmse(y, yhat, 'DATE').compute()
CPU times: user 162 ms, sys: 119 ms, total: 281 ms Wall time: 2.08 s
array([[3.41682887, 3.23411861, 2.69707474, ..., 2.87026685, 3.68759293, 2.6408434 ], [3.16087021, 2.17575623, 4.33849841, ..., 2.24438267, 5.54621049, 2.04912144], [1.81492691, 2.04205526, 1.63489789, ..., 4.26397024, 1.33314852, 4.22996336], ..., [5.12907457, 0.64156566, 3.15916421, ..., 3.2282438 , 5.43960656, 3.9416223 ], [3.84933788, 3.28570209, 2.29624814, ..., 1.48937493, 1.32878641, 2.65736995], [0.5710644 , 1.32473384, 4.10346585, ..., 4.25863161, 2.54228724, 4.75186667]])
array([ 0, 1, 2, ..., 3997, 3998, 3999])
array([ 0, 1, 2, ..., 2997, 2998, 2999])