#|default_exp parallel #|export from fastcore.imports import * from fastcore.basics import * from fastcore.foundation import * from fastcore.meta import * from fastcore.xtras import * from functools import wraps import concurrent.futures,time from multiprocessing import Process,Queue,Manager,set_start_method,get_all_start_methods,get_context from threading import Thread try: if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method("fork") except: pass from fastcore.test import * from nbdev.showdoc import * from fastcore.nb_imports import * #|export def threaded(process=False): "Run `f` in a `Thread` (or `Process` if `process=True`), and returns it" def _r(f): def g(_obj_td, *args, **kwargs): res = f(*args, **kwargs) _obj_td.result = res @wraps(f) def _f(*args, **kwargs): res = (Thread,Process)[process](target=g, args=args, kwargs=kwargs) res._args = (res,)+res._args res.start() return res return _f if callable(process): o = process process = False return _r(o) return _r @threaded def _1(): time.sleep(0.05) print("second") return 5 @threaded def _2(): time.sleep(0.01) print("first") a = _1() _2() time.sleep(0.1) #| eval: false a.result #|export def startthread(f): "Like `threaded`, but start thread immediately" return threaded(f)() @startthread def _(): time.sleep(0.05) print("second") @startthread def _(): time.sleep(0.01) print("first") time.sleep(0.1) #|export def startproc(f): "Like `threaded(True)`, but start Process immediately" return threaded(True)(f)() @startproc def _(): time.sleep(0.05) print("second") @startproc def _(): time.sleep(0.01) print("first") time.sleep(0.1) #|export def _call(lock, pause, n, g, item): l = False if pause: try: l = lock.acquire(timeout=pause*(n+2)) time.sleep(pause) finally: if l: lock.release() return g(item) #|export def parallelable(param_name, num_workers, f=None): f_in_main = f == None or sys.modules[f.__module__].__name__ == "__main__" if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0 and f_in_main: print("Due to IPython and Windows limitation, python multiprocessing isn't available now.") print(f"So `{param_name}` has to be changed to 0 to avoid getting stuck") return False return True #|export class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): "Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution" def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs): if max_workers is None: max_workers=defaults.cpus store_attr() self.not_parallel = max_workers==0 if self.not_parallel: max_workers=1 super().__init__(max_workers, **kwargs) def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs): if self.not_parallel == False: self.lock = Manager().Lock() g = partial(f, *args, **kwargs) if self.not_parallel: return map(g, items) _g = partial(_call, self.lock, self.pause, self.max_workers, g) try: return super().map(_g, items, timeout=timeout, chunksize=chunksize) except Exception as e: self.on_exc(e) show_doc(ThreadPoolExecutor, title_level=4) #|export @delegates() class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor): "Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution" def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs): if max_workers is None: max_workers=defaults.cpus store_attr() self.not_parallel = max_workers==0 if self.not_parallel: max_workers=1 super().__init__(max_workers, **kwargs) def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs): if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0 self.not_parallel = self.max_workers==0 if self.not_parallel: self.max_workers=1 if self.not_parallel == False: self.lock = Manager().Lock() g = partial(f, *args, **kwargs) if self.not_parallel: return map(g, items) _g = partial(_call, self.lock, self.pause, self.max_workers, g) try: return super().map(_g, items, timeout=timeout, chunksize=chunksize) except Exception as e: self.on_exc(e) show_doc(ProcessPoolExecutor, title_level=4) #|export try: from fastprogress import progress_bar except: progress_bar = None #|export def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0, method=None, threadpool=False, timeout=None, chunksize=1, **kwargs): "Applies `func` in parallel to `items`, using `n_workers`" kwpool = {} if threadpool: pool = ThreadPoolExecutor else: if not method and sys.platform == 'darwin': method='fork' if method: kwpool['mp_context'] = get_context(method) pool = ProcessPoolExecutor with pool(n_workers, pause=pause, **kwpool) as ex: r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs) if progress and progress_bar: if total is None: total = len(items) r = progress_bar(r, total=total, leave=False) return L(r) #|export def add_one(x, a=1): # this import is necessary for multiprocessing in notebook on windows import random time.sleep(random.random()/80) return x+a inp,exp = range(50),range(1,51) test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp) test_eq(parallel(add_one, inp, threadpool=True, n_workers=2, progress=False), exp) test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52)) test_eq(parallel(add_one, inp, n_workers=0), exp) test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52)) from datetime import datetime def print_time(i): time.sleep(random.random()/1000) print(i, datetime.now()) parallel(print_time, range(5), n_workers=2, pause=0.25); #|hide def die_sometimes(x): # if 3 0 in scripts works when python process start method is spawn # process = Popen(["python", "parallel_test.py"], stdout=PIPE) # _, err = process.communicate(timeout=10) # exit_code = process.wait() # test_eq(exit_code, 0) #|hide import nbdev; nbdev.nbdev_export()