#!/usr/bin/env python # coding: utf-8 # # Unlock the power of your computer with multiprocessing computation # ## Process, threads? # # Threads are sequence of programmed instructions that can be managed independently by the os. They share the memory space (so we have to be carefull and ensure thread safety, in order to avoid two threads writing in the same emplacement at the same time), and they are the common way to deal with asynchrone computation and to improve performance when the machine has more than one CPU. # # Processes are instance of a computer program that is executed. They do not share memory and require special object to share information, objects as queue, shared objects, pipes, semaphores... They are heavier than the threads, but are safer due to the lack of common memory space. # ## Multiprocessing / threading in python # # Due to the [GIL (Global interpreter lock)](https://wiki.python.org/moin/GlobalInterpreterLock), multiple threads cannot execute Python bytecodes at once. It reduced the usefullness of threading : only the function that release the GIL can run at the same time. It's the case for I/O operations (web protocol requests as http/ftp, on-disk reading / writing), and for most numpy operations that rely on C routines. # # That's why, in the Python ecosystem, multiprocessing is prefered over threading. NB: some trial has been made to take the GIL away from CPython, and led to drastic performance regression ([more info](http://python-notes.curiousefficiency.org/en/latest/python3/multicore_python.html)). # # Python has a nice standard library that allow multiprocessing computation. It's called [`multiprocessing`]. Good to know: the library [`threading`] is for multiple threads computation and has a very similar API, even if the tendancy is to use [asyncio](https://docs.python.org/3/library/asyncio.html) based library to deal with I/O operations. # # Some libraries allow abstraction that help multiprocessing computation, as joblib (part of the sklearn ecosystem), [`concurrent.futures`] (in the stdlib, allow future-based API), [`distributed`] (part of the dask ecosystem and allow local and distributed computation that can live on other computers) and so on... I will focus on the stdlib [`multiprocessing`] first, then have few words on the other tools. # # NB: a nice tool called [`joblib`] can be used to provide an unified way to do embarassingly parallel computation. I will also have a word on this tool at the end. # # [`multiprocessing`]: https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#module-multiprocessing # # [`threading`]: https://docs.python.org/3/library/threading.html?highlight=threading#module-threading # # [`concurrent.futures`]: https://docs.python.org/3/library/concurrent.futures.html?highlight=futures#module-concurrent.futures # # [`distributed`]: https://distributed.readthedocs.io/en/latest/ # # [`joblib`]: https://joblib.readthedocs.io/en/latest/ # ## Multiprocessing usage # ### Process object # # A process object represent a processus that can be started and run a function. It can be initialized in two different way: # # - directly by passing a target function that will be ran by the process # - by writing a child object with a `run` method. # # The latter is usefull for complexe cases. # # The processes objets have some useful methods. Some of there: # # - start() will start the process. This is a non-blocking method # - join() will wait that the process finish his job. # - terminate() will send a sigterm to the process: it will be gently terminated. # - is_alive() return True if the process is alive, False otherwise # # For example, these two snippets do exactly the same thing : # In[2]: import multiprocessing as mp from time import sleep # In[3]: def a_long_running_function(time): sleep(time) return time # In[5]: # These lines are not blocking process = mp.Process(target=a_long_running_function, args=(10, )) process.start() print(f"before join, process.is_alive: {process.is_alive()}") # These one will block until `a_long_running_function` is over process.join() print(f"after join, process.is_alive: {process.is_alive()}") # In[6]: # These lines are not blocking class ALongRunningProcess(mp.Process): def __init__(self, *args): super().__init__() self._args = args def run(self): a_long_running_function(*self._args) process = ALongRunningProcess(10) process.start() print(f"before join, process.is_alive: {process.is_alive()}") # These one will block until `a_long_running_function` is over process.join() print(f"after join, process.is_alive: {process.is_alive()}") # ### Pool object # # Often, we do not want a complex workflow with a lot of different processes sharing informations. We just want N independant computation of the same function with different inputs. In that case, managing by ourself the processes can be harmful, and it's worst considering that we should avoid to restart a process each time because it add some overhead. # # There come the Pool object : it's a pool of N processes (often the same number as the machine CPU) that can be fed with tasks (function and input), one by one, or with a range of parameters. # # That way: # In[7]: with mp.Pool() as p: # A future is a result that we expect. future = p.apply_async(a_long_running_function, args=(5, )) print(f"future object: {future}") # We have to use the get method: otherwise, # the pool will be closed before we obtain the result # We can use the wait method too: in that case, the result is not returned result = future.get() print(f"future.get(): {result}") # The map allow to run multiple time the function over a range on inputs # then return the result as a list. It can be blocking or not. # If it's async, it will return a MapResult, an equivalent of the future for # multiple results. results = p.map(a_long_running_function, [5] * mp.cpu_count()) print(f"results: {results}") futures = p.map_async(a_long_running_function, [5] * mp.cpu_count()) print(f"futures: {futures}") print(f"futures.get: {futures.get()}") # ### Sharing information between processes # ## Other tools for multiprocessing computation # ### `concurrent.futures` # ### `dask.distributed` # ### `ipyparallel` # ## A word on the serialization in the multiprocessing workflow # ### Multiprocessing and Pickle # ### Pickle limitation, and "better" serialisation # In[ ]: