#!/usr/bin/env python # coding: utf-8 # # Faster ARIMA # # This is the companion notebook to my [blog post](https://njodell.com/?p=74) on multi-core ARIMA. # In[1]: import os import pandas as pd import pmdarima as pm import functools import time import multiprocessing from threadpoolctl import ThreadpoolController # You'll need the grocery sales dataset. Go to the dataset [here](https://www.kaggle.com/c/store-sales-time-series-forecasting/data), and download train.csv.zip before continuing. # In[2]: grocery_df = pd.read_csv("train.csv.zip") grocery_df = grocery_df.set_index(['store_nbr', 'family', 'date']) grocery_df = grocery_df.drop(columns=['onpromotion', 'id']) grocery_df = grocery_df.sort_index() grocery_df # In[3]: # Convert data into numpy array array = grocery_df['sales'].values array = array.reshape(54, 33, 1684) array # A quick helper for measuring wall clock time and CPU time. Thanks to [@tdelaney](https://stackoverflow.com/a/40009631/530160) for providing the code that I based this on. # In[4]: class MyTimer(object): def __enter__(self): self.start = time.perf_counter_ns() self.start_cpu = time.process_time_ns() return self def __exit__(self, typ, value, traceback): duration_ns = time.perf_counter_ns() - self.start cpu_time_ns = time.process_time_ns() - self.start_cpu self.wall_time = duration_ns / 1_000_000_000 self.cpu_time = cpu_time_ns / 1_000_000_000 # Next, fit an ARIMA model to the first time series in the dataset, with and without BLAS parallelism. # In[5]: for limit_cores in [True, False]: series = array[0, 0] with MyTimer() as timer: if limit_cores: controller = ThreadpoolController() with controller.limit(limits=1, user_api='blas'): fit = pm.auto_arima(series, n_jobs=1) else: fit = pm.auto_arima(series, n_jobs=1) print(f"lim: {limit_cores} wall: {timer.wall_time:.3f} cpu: {timer.cpu_time:.3f} ") # This shows that giving the auto_arima function additional cores actually slows it down. # In[7]: controller = ThreadpoolController() def attach_limit(func, limit, *args, **kwargs): """Call func() using a limited number of cores if limit is True, or else don't attach a limit.""" if limit: return func(*args, **kwargs) else: with controller.limit(limits=1, user_api='blas'): return func(*args, **kwargs) def predict(x): return pm.auto_arima(x, error_action="ignore", fit_args={'low_memory': True}) for limit in [True, False]: with multiprocessing.Pool() as p: # Get one store store_array = array[1] with MyTimer() as timer: predict_restrict = functools.partial(attach_limit, predict, limit) model = p.map(predict_restrict, store_array) print(f"lim: {limit} time: {timer.wall_time:.3f}") # If fitting many ARIMA series, the gap widens.