#default_exp parallel
#export
from fastcore.imports import *
from fastcore.foundation import *
from fastcore.basics import *
from fastcore.xtras import *
from functools import wraps
# from contextlib import contextmanager,ExitStack
from multiprocessing import Process, Queue
import concurrent.futures,time
from multiprocessing import Manager, set_start_method
from threading import Thread
try:
if sys.platform == 'darwin': set_start_method("fork")
except: pass
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *
Threading and multiprocessing functions
#export
def threaded(f):
"Run `f` in a thread, and returns the thread"
@wraps(f)
def _f(*args, **kwargs):
res = Thread(target=f, args=args, kwargs=kwargs)
res.start()
return res
return _f
@threaded
def _1():
time.sleep(0.05)
print("second")
@threaded
def _2():
time.sleep(0.01)
print("first")
_1()
_2()
time.sleep(0.1)
first second
#export
def startthread(f):
"Like `threaded`, but start thread immediately"
threaded(f)()
@startthread
def _():
time.sleep(0.05)
print("second")
@startthread
def _():
time.sleep(0.01)
print("first")
time.sleep(0.1)
first second
#export
def set_num_threads(nt):
"Get numpy (and others) to use `nt` threads"
try: import mkl; mkl.set_num_threads(nt)
except: pass
try: import torch; torch.set_num_threads(nt)
except: pass
os.environ['IPC_ENABLE']='1'
for o in ['OPENBLAS_NUM_THREADS','NUMEXPR_NUM_THREADS','OMP_NUM_THREADS','MKL_NUM_THREADS']:
os.environ[o] = str(nt)
This sets the number of threads consistently for many tools, by:
nt
: OPENBLAS_NUM_THREADS
,NUMEXPR_NUM_THREADS
,OMP_NUM_THREADS
,MKL_NUM_THREADS
nt
threads for numpy and pytorch.#export
def _call(lock, pause, n, g, item):
l = False
if pause:
try:
l = lock.acquire(timeout=pause*(n+2))
time.sleep(pause)
finally:
if l: lock.release()
return g(item)
#export
def check_parallel_num(param_name, num_workers):
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0:
print("Due to IPython and Windows limitation, python multiprocessing isn't available now.")
print(f"So `{param_name}` is changed to 0 to avoid getting stuck")
num_workers = 0
return num_workers
#export
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
if max_workers is None: max_workers=defaults.cpus
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
super().__init__(max_workers, **kwargs)
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
if self.not_parallel == False: self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)
show_doc(ThreadPoolExecutor, title_level=4)
class
ThreadPoolExecutor
[source]
ThreadPoolExecutor
(max_workers
=20
,on_exc
=pause
=0
, ****kwargs
**) ::ThreadPoolExecutor
Same as Python's ThreadPoolExecutor, except can pass max_workers==0
for serial execution
#export
class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
if max_workers is None: max_workers=defaults.cpus
max_workers = check_parallel_num('max_workers', max_workers)
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
super().__init__(max_workers, **kwargs)
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
if self.not_parallel == False: self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)
show_doc(ProcessPoolExecutor, title_level=4)
class
ProcessPoolExecutor
[source]
ProcessPoolExecutor
(max_workers
=20
,on_exc
=pause
=0
, ****kwargs
**) ::ProcessPoolExecutor
Same as Python's ProcessPoolExecutor, except can pass max_workers==0
for serial execution
#export
try: from fastprogress import progress_bar
except: progress_bar = None
#export
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
threadpool=False, timeout=None, chunksize=1, **kwargs):
"Applies `func` in parallel to `items`, using `n_workers`"
pool = ThreadPoolExecutor if threadpool else ProcessPoolExecutor
with pool(n_workers, pause=pause) as ex:
r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)
if progress and progress_bar:
if total is None: total = len(items)
r = progress_bar(r, total=total, leave=False)
return L(r)
def add_one(x, a=1):
time.sleep(random.random()/80)
return x+a
inp,exp = range(50),range(1,51)
test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)
test_eq(parallel(add_one, inp, threadpool=True, n_workers=2, progress=False), exp)
test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))
test_eq(parallel(add_one, inp, n_workers=0), exp)
test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52))
Use the pause
parameter to ensure a pause of pause
seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set threadpool=True
to use ThreadPoolExecutor
instead of ProcessPoolExecutor
.
from datetime import datetime
def print_time(i):
time.sleep(random.random()/1000)
print(i, datetime.now())
parallel(print_time, range(5), n_workers=2, pause=0.25);
0 2021-02-03 09:51:30.561681 1 2021-02-03 09:51:30.812066 2 2021-02-03 09:51:31.063662 3 2021-02-03 09:51:31.313478 4 2021-02-03 09:51:31.564776
Note that f
should accept a collection of items.
#export
def run_procs(f, f_done, args):
"Call `f` for each item in `args` in parallel, yielding `f_done`"
processes = L(args).map(Process, args=arg0, target=f)
for o in processes: o.start()
yield from f_done()
processes.map(Self.join())
#export
def _f_pg(obj, queue, batch, start_idx):
for i,b in enumerate(obj(batch)): queue.put((start_idx+i,b))
def _done_pg(queue, items): return (queue.get() for _ in items)
#export
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
n_workers = check_parallel_num('n_workers', n_workers)
if n_workers==0:
yield from enumerate(list(cls(**kwargs)(items)))
return
batches = L(chunked(items, n_chunks=n_workers))
idx = L(itertools.accumulate(0 + batches.map(len)))
queue = Queue()
if progress_bar: items = progress_bar(items, leave=False)
f=partial(_f_pg, cls(**kwargs), queue)
done=partial(_done_pg, queue, items)
yield from run_procs(f, done, L(batches,idx).zip())
class _C:
def __call__(self, o): return ((i+1) for i in o)
items = range(5)
res = L(parallel_gen(_C, items, n_workers=0))
idxs,dat1 = zip(*res.sorted(itemgetter(0)))
test_eq(dat1, range(1,6))
res = L(parallel_gen(_C, items, n_workers=3))
idxs,dat2 = zip(*res.sorted(itemgetter(0)))
test_eq(dat2, dat1)
cls
is any class with __call__
. It will be passed args
and kwargs
when initialized. Note that n_workers
instances of cls
are created, one in each process. items
are then split in n_workers
batches and one is sent to each cls
. The function then returns a generator of tuples of item indices and results.
class TestSleepyBatchFunc:
"For testing parallel processes that run at different speeds"
def __init__(self): self.a=1
def __call__(self, batch):
for k in batch:
time.sleep(random.random()/4)
yield k+self.a
x = np.linspace(0,0.99,20)
res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2))
test_eq(res.sorted().itemgot(1), x+1)
#hide
from nbdev.export import notebook2script
notebook2script()
Converted 00_test.ipynb. Converted 01_basics.ipynb. Converted 02_foundation.ipynb. Converted 03_xtras.ipynb. Converted 03a_parallel.ipynb. Converted 03b_net.ipynb. Converted 04_dispatch.ipynb. Converted 05_transform.ipynb. Converted 07_meta.ipynb. Converted 08_script.ipynb. Converted index.ipynb.
from subprocess import Popen, PIPE
# test num_workers > 0 in scripts works when python process start method is spawn
process = Popen(["python", "parallel_test.py"], stdout=PIPE)
_, err = process.communicate(timeout=5)
exit_code = process.wait()
test_eq(exit_code, 0)