I love learning from Julia Evans. Recently, she blogged about thread pools, and wanting to add the feature of blocking task submission, to avoid an ever-growing queue of pending-but-not-running tasks.
At the end, she made a comment about Python lacking a standard-library solution, which I think is true. Curious, I set out to build a similar BoundedExecutor to Evans' Java implementation, also using the stdlib Semaphore to implement blocking.
Let's look at a couple of signatures that will be relevant to us. First, the ThreadPoolExecutor constructor:
from concurrent.futures import ThreadPoolExecutor
ThreadPoolExecutor?
Init signature: ThreadPoolExecutor(max_workers=None) Docstring: This is an abstract base class for concrete asynchronous executors. Init docstring: Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. File: ~/conda/lib/python3.5/concurrent/futures/thread.py Type: type
This takes a single argument: the number of threads to create.
The stdlib default is a somewhat peculiar 5 * num_cpus
,
because
ThreadPoolExecutor is often used to overlap I/O instead of CPU work.
Hashtag GIL, I suppose.
The next thing to look at is Executor.submit, which is where we want to add some logic:
ThreadPoolExecutor.submit?
Signature: ThreadPoolExecutor.submit(self, fn, *args, **kwargs) Docstring: Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. Returns: A Future representing the given call. File: ~/conda/lib/python3.5/concurrent/futures/thread.py Type: function
This is the call we want to block if there are too many tasks pending. We can add blocking to thread pools by making a subclass of ThreadPoolExecutor:
import os
from threading import Semaphore
class BoundedExecutor(ThreadPoolExecutor):
"""Like ThreadPoolExecutor, but blocks task submission
if too many tasks are submitted.
"""
def __init__(self, max_workers=None, bound=None):
"""Construct the Executor
max_workers: int (default: os.cpu_count)
The number of threads to use
bound: int (default: max_workers)
The number of pending tasks before submission blocks.
"""
if max_workers is None:
max_workers = os.cpu_count() or 1
if bound is None:
bound = max_workers
self.semaphore = Semaphore(bound)
super(BoundedExecutor, self).__init__(max_workers)
def submit(self, fn, *args, **kwargs):
"""Schedule the callabkle to be executed with the given arguments.
If `bound` calls are already pending,
this will block until a slot becomes available.
"""
# acquire semaphore before starting work
# this is where we block if there are too many tasks
self.semaphore.acquire()
# actually submit the task
f = super(BoundedExecutor, self).submit(fn, *args, **kwargs)
# release semaphore when the task is done, unblocking submissions
f.add_done_callback(lambda _: self.semaphore.release())
return f
bound
can be separate from max_workers
, in case you want to run 4 at a time, but not start blocking until you have 10 tasks in your queue.
The above example subclasses ThreadPoolExecutor,
adding a semaphore and extending submit
.
This isn't totallly general,
and if we look more carefully at Evans' example,
there's a better approach.
Rather than extending an Executor, she chose to wrap an Executor. A classic has-a vs is-a distinction. I think her approach is nicer, so let's try again, copying more directly. This time, we'll subclass the base Executor, and take another Executor (we don't care what kind) as an argument:
from concurrent.futures import Executor
class BoundedExecutor(Executor):
"""An Executor that blocks when too many tasks are submitted.
"""
def __init__(self, executor, bound):
"""Construct the Executor
executor: Executor instance
The Executor that will submit tasks.
bound: int
The number of pending tasks before submission blocks.
"""
self.executor = executor
self.semaphore = Semaphore(bound)
def submit(self, fn, *args, **kwargs):
# acquire semaphore before passing work to our executor
# this is where we block if there are too many tasks
self.semaphore.acquire()
# actually submit the task
f = self.executor.submit(fn, *args, **kwargs)
# release semaphore when the task is done, unblocking submissions
f.add_done_callback(lambda _: self.semaphore.release())
return f
This looks very similar, but it decouples the construction of the actual Executor from the bounding. Bonus: no super!
One effect is making it easier to switch to a ProcessPoolExecutor, for instance if we suspect we are having #GILProblems
:
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor(2)
executor = BoundedExecutor(pool, bound=3)
import time
zero = time.time()
for i in range(1, 8):
f = executor.submit(time.sleep, i)
print('submitted %i %.2f' % (i, time.time()-zero))
submitted 1 0.02 submitted 2 0.03 submitted 3 0.04 submitted 4 1.03 submitted 5 2.04 submitted 6 4.03 submitted 7 6.04
Yay! It blocked after 3 tasks, and then resumed as the workers were able to take their tasks.
After posting, @tintvrtkovic had an idea for an even simpler version, using the Executor._work_queue
private API:
@minrk @b0rk Interesting! A simpler approach, maybe, would be to just override the internal executor._work_queue with a bounded one.
— Tin Tvrtković (@tintvrtkovic) April 9, 2016
I didn't want to use a private API in my first go, but let's look into it:
ThreadPoolExecutor.__init__??
Signature: ThreadPoolExecutor.__init__(self, max_workers=None) Source: def __init__(self, max_workers=None): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. """ if max_workers is None: # Use this number because ThreadPoolExecutor is often # used to overlap I/O instead of CPU work. max_workers = (os.cpu_count() or 1) * 5 if max_workers <= 0: raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() File: ~/conda/lib/python3.5/concurrent/futures/thread.py Type: function
We see that _work_queue
, a private attribute, is a Queue object.
Queues are a Python stdlib object for thread-safe queues,
often used for inter-thread communication.
Let's have a look:
from queue import Queue
Queue?
Init signature: Queue(maxsize=0) Docstring: Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite. File: ~/conda/lib/python3.5/queue.py Type: type
Queues have a maximum size! What happens if we hit the limit of a queue's size trying to add an item to the queue:
q = Queue(1)
q.put?
Signature: q.put(item, block=True, timeout=None) Docstring: Put an item into the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until a free slot is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Full exception if no free slot was available within that time. Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). File: ~/conda/lib/python3.5/queue.py Type: method
What do you know? It blocks! That's exactly what we are looking for. Let's see what happens if we replace the default infinite queue with a finite queue:
class BoundedExecutor(ThreadPoolExecutor):
def __init__(self, max_workers=None, bound=0):
super().__init__(max_workers)
self._work_queue = Queue(bound)
This is indeed the simplest implementation! Let's see how it behaves:
executor = BoundedExecutor(max_workers=2, bound=2)
zero = time.time()
for i in range(6):
executor.submit(time.sleep, 1)
print('submitted %i %.2f' % (i, time.time()-zero))
submitted 0 0.00 submitted 1 0.01 submitted 2 0.01 submitted 3 0.01 submitted 4 1.00 submitted 5 1.01
In general, it works. Yay, again!
But it is a bit different.
We got 4 tasks submitted before it started blocking, but our bound
was 2.
Why?
We can think a bit more about what the work queue does, and what its size limit really means. The work queue is the number of tasks not yet assigned, but some tasks are assigned immediately to the actual work threads, popping them off of the queue.
With this method, blocking will happen when bound
tasks cannot be assigned,
whereas before,
blocking happened when bound
tasks had been submitted but not yet completed.
That is, it will block after max_workers + bound
tasks have ben submitted, rather than just bound
, like in the earlier implementations.
The result is similar, as long as you take this new definition of bound
into account.
One limitation of this implementation, though, is that we cannot have the first unassigned task block immediately (bound == max_workers
in the first case).
We have to allow at least one task to be submitted but not assigned because we cannot have a queue size of zero.