import os
import pandas as pd
import pmdarima as pm
import functools
import time
import multiprocessing
from threadpoolctl import ThreadpoolController
You'll need the grocery sales dataset. Go to the dataset here, and download train.csv.zip before continuing.
grocery_df = pd.read_csv("train.csv.zip")
grocery_df = grocery_df.set_index(['store_nbr', 'family', 'date'])
grocery_df = grocery_df.drop(columns=['onpromotion', 'id'])
grocery_df = grocery_df.sort_index()
grocery_df
sales | |||
---|---|---|---|
store_nbr | family | date | |
1 | AUTOMOTIVE | 2013-01-01 | 0.0 |
2013-01-02 | 2.0 | ||
2013-01-03 | 3.0 | ||
2013-01-04 | 3.0 | ||
2013-01-05 | 5.0 | ||
... | ... | ... | ... |
54 | SEAFOOD | 2017-08-11 | 0.0 |
2017-08-12 | 1.0 | ||
2017-08-13 | 2.0 | ||
2017-08-14 | 0.0 | ||
2017-08-15 | 3.0 |
3000888 rows × 1 columns
# Convert data into numpy array
array = grocery_df['sales'].values
array = array.reshape(54, 33, 1684)
array
array([[[0.0000000e+00, 2.0000000e+00, 3.0000000e+00, ..., 1.0000000e+00, 1.0000000e+00, 4.0000000e+00], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 2.0000000e+00, 0.0000000e+00, ..., 1.0000000e+00, 6.0000000e+00, 4.0000000e+00], ..., [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 9.8666900e+02, 2.6117550e+03, 2.2402300e+03], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 3.8029000e+01, 1.7366001e+01, ..., 1.1378000e+01, 1.4129000e+01, 2.2487000e+01]], [[0.0000000e+00, 8.0000000e+00, 5.0000000e+00, ..., 9.0000000e+00, 4.0000000e+00, 2.0000000e+00], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 3.0000000e+00, 2.0000000e+00, ..., 1.0000000e+01, 7.0000000e+00, 9.0000000e+00], ..., [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 2.4767031e+03, 2.1040650e+03, 2.0068740e+03], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 3.1373001e+01, 3.4343000e+01, ..., 4.4046000e+01, 3.4644000e+01, 3.2718002e+01]], [[0.0000000e+00, 1.0000000e+01, 7.0000000e+00, ..., 1.5000000e+01, 7.0000000e+00, 1.2000000e+01], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 1.5000000e+01, 6.0000000e+00, ..., 2.6000000e+01, 2.7000000e+01, 1.3000000e+01], ..., [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 8.0177603e+03, 7.9418730e+03, 7.2652890e+03], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 1.0000000e+00, 0.0000000e+00, 1.0000000e+00], [0.0000000e+00, 1.6906400e+02, 9.8050995e+01, ..., 7.7398000e+01, 8.6451996e+01, 7.1061000e+01]], ..., [[0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 2.1000000e+01, 5.0000000e+00, 2.0000000e+00], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 1.2000000e+01, 9.0000000e+00, 4.0000000e+00], ..., [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 4.5730670e+03, 4.0908450e+03, 3.8033680e+03], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 1.5000000e+01, 1.0000000e+00, 1.7000000e+01], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 1.2000000e+01, 6.5950003e+00, 7.0000000e+00]], [[0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 2.0000000e+01, 2.0000000e+00, 9.0000000e+00], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 7.0000000e+00, 1.0000000e+00, 7.0000000e+00], ..., [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 2.2171320e+03, 1.5942080e+03, 1.4324890e+03], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 1.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 9.0000000e+00, 9.0000000e+00, 5.0000000e+00]], [[0.0000000e+00, 5.0000000e+00, 3.0000000e+00, ..., 1.6000000e+01, 4.0000000e+00, 8.0000000e+00], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], ..., [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 8.7730400e+02, 5.8561500e+02, 9.1537100e+02], [0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00, 0.0000000e+00, 0.0000000e+00], [0.0000000e+00, 3.0000000e+00, 2.0000000e+00, ..., 2.0000000e+00, 0.0000000e+00, 3.0000000e+00]]])
A quick helper for measuring wall clock time and CPU time. Thanks to @tdelaney for providing the code that I based this on.
class MyTimer(object):
def __enter__(self):
self.start = time.perf_counter_ns()
self.start_cpu = time.process_time_ns()
return self
def __exit__(self, typ, value, traceback):
duration_ns = time.perf_counter_ns() - self.start
cpu_time_ns = time.process_time_ns() - self.start_cpu
self.wall_time = duration_ns / 1_000_000_000
self.cpu_time = cpu_time_ns / 1_000_000_000
Next, fit an ARIMA model to the first time series in the dataset, with and without BLAS parallelism.
for limit_cores in [True, False]:
series = array[0, 0]
with MyTimer() as timer:
if limit_cores:
controller = ThreadpoolController()
with controller.limit(limits=1, user_api='blas'):
fit = pm.auto_arima(series, n_jobs=1)
else:
fit = pm.auto_arima(series, n_jobs=1)
print(f"lim: {limit_cores} wall: {timer.wall_time:.3f} cpu: {timer.cpu_time:.3f} ")
lim: True wall: 8.124 cpu: 8.636 lim: False wall: 9.743 cpu: 40.478
This shows that giving the auto_arima function additional cores actually slows it down.
controller = ThreadpoolController()
def attach_limit(func, limit, *args, **kwargs):
"""Call func() using a limited number of cores if limit is True, or else don't attach a limit."""
if limit:
return func(*args, **kwargs)
else:
with controller.limit(limits=1, user_api='blas'):
return func(*args, **kwargs)
def predict(x):
return pm.auto_arima(x, error_action="ignore", fit_args={'low_memory': True})
for limit in [True, False]:
with multiprocessing.Pool() as p:
# Get one store
store_array = array[1]
with MyTimer() as timer:
predict_restrict = functools.partial(attach_limit, predict, limit)
model = p.map(predict_restrict, store_array)
print(f"lim: {limit} time: {timer.wall_time:.3f}")
lim: True time: 560.534 lim: False time: 148.646
If fitting many ARIMA series, the gap widens.