#|default_exp parallel
#|export
from fastcore.imports import *
from fastcore.basics import *
from fastcore.foundation import *
from fastcore.meta import *
from fastcore.xtras import *
from functools import wraps
import concurrent.futures,time
from multiprocessing import Process,Queue,Manager,set_start_method,get_all_start_methods,get_context
from threading import Thread
try:
if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method("fork")
except: pass
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *
Threading and multiprocessing functions
#|export
def threaded(process=False):
"Run `f` in a `Thread` (or `Process` if `process=True`), and returns it"
def _r(f):
def g(_obj_td, *args, **kwargs):
res = f(*args, **kwargs)
_obj_td.result = res
@wraps(f)
def _f(*args, **kwargs):
res = (Thread,Process)[process](target=g, args=args, kwargs=kwargs)
res._args = (res,)+res._args
res.start()
return res
return _f
if callable(process):
o = process
process = False
return _r(o)
return _r
@threaded
def _1():
time.sleep(0.05)
print("second")
return 5
@threaded
def _2():
time.sleep(0.01)
print("first")
a = _1()
_2()
time.sleep(0.1)
first second
After the thread is complete, the return value is stored in the result
attr.
#| eval: false
a.result
5
#|export
def startthread(f):
"Like `threaded`, but start thread immediately"
return threaded(f)()
@startthread
def _():
time.sleep(0.05)
print("second")
@startthread
def _():
time.sleep(0.01)
print("first")
time.sleep(0.1)
first second
#|export
def startproc(f):
"Like `threaded(True)`, but start Process immediately"
return threaded(True)(f)()
@startproc
def _():
time.sleep(0.05)
print("second")
@startproc
def _():
time.sleep(0.01)
print("first")
time.sleep(0.1)
first second
#|export
def _call(lock, pause, n, g, item):
l = False
if pause:
try:
l = lock.acquire(timeout=pause*(n+2))
time.sleep(pause)
finally:
if l: lock.release()
return g(item)
#|export
def parallelable(param_name, num_workers, f=None):
f_in_main = f == None or sys.modules[f.__module__].__name__ == "__main__"
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0 and f_in_main:
print("Due to IPython and Windows limitation, python multiprocessing isn't available now.")
print(f"So `{param_name}` has to be changed to 0 to avoid getting stuck")
return False
return True
#|export
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
if max_workers is None: max_workers=defaults.cpus
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
super().__init__(max_workers, **kwargs)
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
if self.not_parallel == False: self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)
show_doc(ThreadPoolExecutor, title_level=4)
ThreadPoolExecutor (max_workers=8, on_exc=<built-infunctionprint>, pause=0, **kwargs)
Same as Python's ThreadPoolExecutor, except can pass max_workers==0
for serial execution
#|export
@delegates()
class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
if max_workers is None: max_workers=defaults.cpus
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
super().__init__(max_workers, **kwargs)
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0
self.not_parallel = self.max_workers==0
if self.not_parallel: self.max_workers=1
if self.not_parallel == False: self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)
show_doc(ProcessPoolExecutor, title_level=4)
ProcessPoolExecutor (max_workers=8, on_exc=<built-infunctionprint>, pause=0, mp_context=None, initializer=None, initargs=())
Same as Python's ProcessPoolExecutor, except can pass max_workers==0
for serial execution
#|export
try: from fastprogress import progress_bar
except: progress_bar = None
#|export
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
method=None, threadpool=False, timeout=None, chunksize=1, **kwargs):
"Applies `func` in parallel to `items`, using `n_workers`"
kwpool = {}
if threadpool: pool = ThreadPoolExecutor
else:
if not method and sys.platform == 'darwin': method='fork'
if method: kwpool['mp_context'] = get_context(method)
pool = ProcessPoolExecutor
with pool(n_workers, pause=pause, **kwpool) as ex:
r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)
if progress and progress_bar:
if total is None: total = len(items)
r = progress_bar(r, total=total, leave=False)
return L(r)
#|export
def add_one(x, a=1):
# this import is necessary for multiprocessing in notebook on windows
import random
time.sleep(random.random()/80)
return x+a
inp,exp = range(50),range(1,51)
test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)
test_eq(parallel(add_one, inp, threadpool=True, n_workers=2, progress=False), exp)
test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))
test_eq(parallel(add_one, inp, n_workers=0), exp)
test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52))
Use the pause
parameter to ensure a pause of pause
seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set threadpool=True
to use ThreadPoolExecutor
instead of ProcessPoolExecutor
.
from datetime import datetime
def print_time(i):
time.sleep(random.random()/1000)
print(i, datetime.now())
parallel(print_time, range(5), n_workers=2, pause=0.25);
0 2022-08-07 05:10:05.999916 1 2022-08-07 05:10:06.252031 2 2022-08-07 05:10:06.503603 3 2022-08-07 05:10:06.755216 4 2022-08-07 05:10:07.006702
#|hide
def die_sometimes(x):
# if 3<x<6: raise Exception(f"exc: {x}")
return x*2
parallel(die_sometimes, range(8))
(#8) [0,2,4,6,8,10,12,14]
#|export
def run_procs(f, f_done, args):
"Call `f` for each item in `args` in parallel, yielding `f_done`"
processes = L(args).map(Process, args=arg0, target=f)
for o in processes: o.start()
yield from f_done()
processes.map(Self.join())
#|export
def _f_pg(obj, queue, batch, start_idx):
for i,b in enumerate(obj(batch)): queue.put((start_idx+i,b))
def _done_pg(queue, items): return (queue.get() for _ in items)
#|export
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
if not parallelable('n_workers', n_workers): n_workers = 0
if n_workers==0:
yield from enumerate(list(cls(**kwargs)(items)))
return
batches = L(chunked(items, n_chunks=n_workers))
idx = L(itertools.accumulate(0 + batches.map(len)))
queue = Queue()
if progress_bar: items = progress_bar(items, leave=False)
f=partial(_f_pg, cls(**kwargs), queue)
done=partial(_done_pg, queue, items)
yield from run_procs(f, done, L(batches,idx).zip())
# class _C:
# def __call__(self, o): return ((i+1) for i in o)
# items = range(5)
# res = L(parallel_gen(_C, items, n_workers=0))
# idxs,dat1 = zip(*res.sorted(itemgetter(0)))
# test_eq(dat1, range(1,6))
# res = L(parallel_gen(_C, items, n_workers=3))
# idxs,dat2 = zip(*res.sorted(itemgetter(0)))
# test_eq(dat2, dat1)
cls
is any class with __call__
. It will be passed args
and kwargs
when initialized. Note that n_workers
instances of cls
are created, one in each process. items
are then split in n_workers
batches and one is sent to each cls
. The function then returns a generator of tuples of item indices and results.
class TestSleepyBatchFunc:
"For testing parallel processes that run at different speeds"
def __init__(self): self.a=1
def __call__(self, batch):
for k in batch:
time.sleep(random.random()/4)
yield k+self.a
x = np.linspace(0,0.99,20)
res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2))
test_eq(res.sorted().itemgot(1), x+1)
# #|hide
# from subprocess import Popen, PIPE
# # test num_workers > 0 in scripts works when python process start method is spawn
# process = Popen(["python", "parallel_test.py"], stdout=PIPE)
# _, err = process.communicate(timeout=10)
# exit_code = process.wait()
# test_eq(exit_code, 0)
#|hide
import nbdev; nbdev.nbdev_export()