#!/usr/bin/env python # coding: utf-8 # # Bounded ThreadPool in Python # # I love learning from [Julia Evans](http://jvns.ca/). # Recently, [she blogged](http://jvns.ca/blog/2016/03/29/thread-pools-part-ii-i-love-blocking/) about thread pools, # and wanting to add the feature of blocking task submission, # to avoid an ever-growing queue of pending-but-not-running tasks. # # At the end, she made a comment about Python lacking a standard-library solution, which I think is true. # Curious, I set out to build a similar BoundedExecutor to Evans' Java implementation, # also using the stdlib Semaphore to implement blocking. # Let's look at a couple of signatures that will be relevant to us. First, the [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor) constructor: # In[1]: from concurrent.futures import ThreadPoolExecutor get_ipython().run_line_magic('pinfo', 'ThreadPoolExecutor') # This takes a single argument: the number of threads to create. # The stdlib default is a somewhat peculiar `5 * num_cpus`, # because # # > ThreadPoolExecutor is often used to overlap I/O instead of CPU work. # # Hashtag [GIL](https://wiki.python.org/moin/GlobalInterpreterLock), I suppose. # # The next thing to look at is [Executor.submit](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.submit), # which is where we want to add some logic: # In[2]: get_ipython().run_line_magic('pinfo', 'ThreadPoolExecutor.submit') # This is the call we want to block if there are too many tasks pending. We can add blocking to thread pools by making a subclass of ThreadPoolExecutor: # In[3]: import os from threading import Semaphore class BoundedExecutor(ThreadPoolExecutor): """Like ThreadPoolExecutor, but blocks task submission if too many tasks are submitted. """ def __init__(self, max_workers=None, bound=None): """Construct the Executor max_workers: int (default: os.cpu_count) The number of threads to use bound: int (default: max_workers) The number of pending tasks before submission blocks. """ if max_workers is None: max_workers = os.cpu_count() or 1 if bound is None: bound = max_workers self.semaphore = Semaphore(bound) super(BoundedExecutor, self).__init__(max_workers) def submit(self, fn, *args, **kwargs): """Schedule the callabkle to be executed with the given arguments. If `bound` calls are already pending, this will block until a slot becomes available. """ # acquire semaphore before starting work # this is where we block if there are too many tasks self.semaphore.acquire() # actually submit the task f = super(BoundedExecutor, self).submit(fn, *args, **kwargs) # release semaphore when the task is done, unblocking submissions f.add_done_callback(lambda _: self.semaphore.release()) return f # `bound` can be separate from `max_workers`, in case you want to run 4 at a time, but not start blocking until you have 10 tasks in your queue. # # The above example *subclasses* ThreadPoolExecutor, # adding a semaphore and extending `submit`. # This isn't totallly general, # and if we look more carefully at Evans' example, # there's a better approach. # # Rather than *extending* an Executor, she chose to *wrap* an Executor. A classic [has-a](https://en.wikipedia.org/wiki/Has-a) vs [is-a](https://en.wikipedia.org/wiki/Is-a) distinction. # I think her approach is nicer, so let's try again, copying more directly. # This time, we'll subclass the base Executor, # and take another Executor (we don't care what kind) as an argument: # In[4]: from concurrent.futures import Executor class BoundedExecutor(Executor): """An Executor that blocks when too many tasks are submitted. """ def __init__(self, executor, bound): """Construct the Executor executor: Executor instance The Executor that will submit tasks. bound: int The number of pending tasks before submission blocks. """ self.executor = executor self.semaphore = Semaphore(bound) def submit(self, fn, *args, **kwargs): # acquire semaphore before passing work to our executor # this is where we block if there are too many tasks self.semaphore.acquire() # actually submit the task f = self.executor.submit(fn, *args, **kwargs) # release semaphore when the task is done, unblocking submissions f.add_done_callback(lambda _: self.semaphore.release()) return f # This looks very similar, but it decouples the construction of the actual Executor from the bounding. Bonus: no super! # # One effect is making it easier to switch to a ProcessPoolExecutor, for instance if we suspect we are having `#GILProblems`: # In[5]: from concurrent.futures import ProcessPoolExecutor pool = ProcessPoolExecutor(2) executor = BoundedExecutor(pool, bound=3) import time zero = time.time() for i in range(1, 8): f = executor.submit(time.sleep, i) print('submitted %i %.2f' % (i, time.time()-zero)) # Yay! It blocked after 3 tasks, and then resumed as the workers were able to take their tasks. # ### Twitter follow-up # # After posting, @tintvrtkovic had an idea for an even simpler version, using the `Executor._work_queue` private API: # #
# # # I didn't want to use a private API in my first go, but let's look into it: # In[6]: get_ipython().run_line_magic('pinfo2', 'ThreadPoolExecutor.__init__') # We see that `_work_queue`, a private attribute, is a [Queue](https://docs.python.org/3/library/queue.html#queue.Queue) object. # Queues are a Python stdlib object for thread-safe queues, # often used for inter-thread communication. # # Let's have a look: # In[7]: from queue import Queue get_ipython().run_line_magic('pinfo', 'Queue') # Queues have a maximum size! What happens if we hit the limit of a queue's size trying to add an item to the queue: # In[8]: q = Queue(1) get_ipython().run_line_magic('pinfo', 'q.put') # What do you know? It blocks! That's exactly what we are looking for. Let's see what happens if we replace the default infinite queue with a finite queue: # In[9]: class BoundedExecutor(ThreadPoolExecutor): def __init__(self, max_workers=None, bound=0): super().__init__(max_workers) self._work_queue = Queue(bound) # This is indeed the simplest implementation! # Let's see how it behaves: # In[10]: executor = BoundedExecutor(max_workers=2, bound=2) zero = time.time() for i in range(6): executor.submit(time.sleep, 1) print('submitted %i %.2f' % (i, time.time()-zero)) # In general, it works. Yay, again! # But it is a bit different. # We got 4 tasks submitted before it started blocking, but our `bound` was 2. # **Why?** # # We can think a bit more about what the work queue does, # and what its size limit really means. # The work queue is the number of tasks *not yet assigned*, # but some tasks are assigned immediately to the actual work threads, # popping them off of the queue. # # With this method, blocking will happen when `bound` tasks cannot be assigned, # whereas before, # blocking happened when `bound` tasks had been **submitted** but not yet **completed**. # That is, it will block after `max_workers + bound` tasks have ben submitted, rather than just `bound`, like in the earlier implementations. # # The result is similar, as long as you take this new definition of `bound` into account. # One limitation of this implementation, though, is that we cannot have the first unassigned task block immediately (`bound == max_workers` in the first case). # We have to allow at least one task to be submitted but not assigned because we cannot have a queue size of zero.@minrk @b0rk Interesting! A simpler approach, maybe, would be to just override the internal executor._work_queue with a bounded one.
— Tin Tvrtković (@tintvrtkovic) April 9, 2016