In this notebook I verify 12 million forecasts in a couple of seconds using the RMSE metric on a dask.array.

In [1]:
import xarray as xr
import pandas as pd
import numpy as np
import xskillscore as xs
import dask.array as da
from dask.distributed import Client

By default the dask.distributed.Client uses a LocalCluster

cluster = LocalCluster()
client = Client(cluster)

However, this code can easily be adapted to scale on massive datasets using distributed computing via various methods of deployment:

or vendor products:

If anyone does run this example on a large cluster I would be curious how big you can scale nstores and nskus and how long it takes to run rmse. You are welcome to post it in the issue section following this link.

Setup the client (i.e. connect to the scheduler):

In [2]:
client = Client()
client
Out[2]:

Client

Cluster

  • Workers: 3
  • Cores: 6
  • Memory: 16.73 GB

Due to the success of your previous forecast (and verification using xskillscore!) the company you work for has expanded. They have grown to 4,000 stores each with 3,000 products:

In [3]:
nstores = 4000
nskus = 3000

nforecasts = nstores * nskus
print(f"That's {nforecasts:,d} different forecasts to verify!")

stores = np.arange(nstores)
skus = np.arange(nskus)
That's 12,000,000 different forecasts to verify!

The time period of interest is the same dates but for 2021:

In [4]:
dates = pd.date_range("1/1/2021", "1/5/2021", freq="D")

Setup the data as a dask.array of dates x stores x skus.

dask uses similar functions as numpy. In this case switch the np. to da. to generate random numbers between 1 and 10:

In [5]:
data = da.random.randint(9, size=(len(dates), len(stores), len(skus))) + 1
data
Out[5]:
Array Chunk
Bytes 480.00 MB 60.00 MB
Shape (5, 4000, 3000) (5, 1000, 1500)
Count 16 Tasks 8 Chunks
Type int64 numpy.ndarray
3000 4000 5

Put this into an xarray.DataArray and specify the Coordinates and dimensions:

In [6]:
y = xr.DataArray(data, coords=[dates, stores, skus], dims=["DATE", "STORE", "SKU"])
y
Out[6]:
Show/Hide data repr Show/Hide attributes
xarray.DataArray
'add-7f8918891a510f68ee4b513f895573f3'
  • DATE: 5
  • STORE: 4000
  • SKU: 3000
  • dask.array<chunksize=(5, 1000, 1500), meta=np.ndarray>
    Array Chunk
    Bytes 480.00 MB 60.00 MB
    Shape (5, 4000, 3000) (5, 1000, 1500)
    Count 16 Tasks 8 Chunks
    Type int64 numpy.ndarray
    3000 4000 5
    • DATE
      (DATE)
      datetime64[ns]
      2021-01-01 ... 2021-01-05
      array(['2021-01-01T00:00:00.000000000', '2021-01-02T00:00:00.000000000',
             '2021-01-03T00:00:00.000000000', '2021-01-04T00:00:00.000000000',
             '2021-01-05T00:00:00.000000000'], dtype='datetime64[ns]')
    • STORE
      (STORE)
      int64
      0 1 2 3 4 ... 3996 3997 3998 3999
      array([   0,    1,    2, ..., 3997, 3998, 3999])
    • SKU
      (SKU)
      int64
      0 1 2 3 4 ... 2996 2997 2998 2999
      array([   0,    1,    2, ..., 2997, 2998, 2999])

Create a prediction array similar to that in 01_Deterministic.ipynb:

In [7]:
noise = da.random.uniform(-1, 1, size=(len(dates), len(stores), len(skus)))
yhat = y + (y * noise)
yhat
Out[7]:
Show/Hide data repr Show/Hide attributes
xarray.DataArray
  • DATE: 5
  • STORE: 4000
  • SKU: 3000
  • dask.array<chunksize=(5, 1000, 1500), meta=np.ndarray>
    Array Chunk
    Bytes 480.00 MB 60.00 MB
    Shape (5, 4000, 3000) (5, 1000, 1500)
    Count 40 Tasks 8 Chunks
    Type float64 numpy.ndarray
    3000 4000 5
    • DATE
      (DATE)
      datetime64[ns]
      2021-01-01 ... 2021-01-05
      array(['2021-01-01T00:00:00.000000000', '2021-01-02T00:00:00.000000000',
             '2021-01-03T00:00:00.000000000', '2021-01-04T00:00:00.000000000',
             '2021-01-05T00:00:00.000000000'], dtype='datetime64[ns]')
    • STORE
      (STORE)
      int64
      0 1 2 3 4 ... 3996 3997 3998 3999
      array([   0,    1,    2, ..., 3997, 3998, 3999])
    • SKU
      (SKU)
      int64
      0 1 2 3 4 ... 2996 2997 2998 2999
      array([   0,    1,    2, ..., 2997, 2998, 2999])

Finally calculate RMSE at the store and sku level.

Use the .compute() method to return the values:

In [8]:
%time xs.rmse(y, yhat, 'DATE').compute()
CPU times: user 162 ms, sys: 119 ms, total: 281 ms
Wall time: 2.08 s
Out[8]:
Show/Hide data repr Show/Hide attributes
xarray.DataArray
  • STORE: 4000
  • SKU: 3000
  • 3.417 3.234 2.697 3.372 5.377 3.665 ... 1.19 3.425 4.259 2.542 4.752
    array([[3.41682887, 3.23411861, 2.69707474, ..., 2.87026685, 3.68759293,
            2.6408434 ],
           [3.16087021, 2.17575623, 4.33849841, ..., 2.24438267, 5.54621049,
            2.04912144],
           [1.81492691, 2.04205526, 1.63489789, ..., 4.26397024, 1.33314852,
            4.22996336],
           ...,
           [5.12907457, 0.64156566, 3.15916421, ..., 3.2282438 , 5.43960656,
            3.9416223 ],
           [3.84933788, 3.28570209, 2.29624814, ..., 1.48937493, 1.32878641,
            2.65736995],
           [0.5710644 , 1.32473384, 4.10346585, ..., 4.25863161, 2.54228724,
            4.75186667]])
    • STORE
      (STORE)
      int64
      0 1 2 3 4 ... 3996 3997 3998 3999
      array([   0,    1,    2, ..., 3997, 3998, 3999])
    • SKU
      (SKU)
      int64
      0 1 2 3 4 ... 2996 2997 2998 2999
      array([   0,    1,    2, ..., 2997, 2998, 2999])
In [ ]: