from queue import Queue
from io import StringIO
from threading import Lock
# read/write with a threadsafe Queue
def write_queue(q, text):
q.put(text)
def read_queue(q):
buf = []
while not q.empty():
buf.append(q.get())
return ''.join(buf)
# read/write with StringIO
def write_io(buf, text):
buf.write(text)
def read_io(buf):
return buf.getvalue()
# read/write with StringIO and locking
def write_io_lock(buf, text, lock):
with lock:
buf.write(text)
def read_io_lock(buf, lock):
with lock:
return buf.getvalue()
def run_test(write, read, n_writes, msg_size):
"""Run a test with a given read/write pair
n_writes of size msg_size are performed.
"""
msg = 'x' * msg_size
for i in range(n_writes):
write(msg)
return read()
from functools import partial
def test_queue(n_writes, msg_size):
q = Queue()
write = partial(write_queue, q)
read = partial(read_queue, q)
return run_test(write, read, n_writes, msg_size)
def test_io(n_writes, msg_size):
buf = StringIO()
write = partial(write_io, buf)
read = partial(read_io, buf)
return run_test(write, read, n_writes, msg_size)
def test_io_lock(n_writes, msg_size):
buf = StringIO()
lock = Lock()
write = partial(write_io_lock, buf, lock=lock)
read = partial(read_io_lock, buf, lock=lock)
return run_test(write, read, n_writes, msg_size)
%timeit test_queue(1000, 100)
100 loops, best of 3: 4.6 ms per loop
%timeit test_io(1000, 100)
1000 loops, best of 3: 305 µs per loop
%timeit test_io_lock(1000, 100)
1000 loops, best of 3: 536 µs per loop