This is a simple demo of using threads for concurrent consumption of a REST API.
First, we define our simple function for processing a single resource ID. It has three basic steps:
import requests
url_t = "http://localhost:8000/records/%i"
def process_id(id):
"""process a single ID"""
# fetch the data
r = requests.get(url_t % id)
# parse the JSON reply
data = r.json()
# and update some data with PUT
requests.put(url_t % id, data=data)
return data
Our dummy webservice is just a simple REST service that serves up the /usr/share/dict/words
word list:
import os
ip = get_ipython()
print ip.pycolorize(open(os.path.expanduser("~/temp/echo.py")).read())
"""Simple dummy REST webservice Includes artificial delay in requests to allow simulation of concurrent access. """ import json from tornado.httpserver import HTTPServer from tornado.ioloop import IOLoop from tornado import web from tornado.options import define, options, parse_command_line define("port", default=8000, help="run on the given port", type=int) define("delay", default=0.1, help="artificial delay for GET requests", type=float) # load simple word list, or RESTful data words = [] with open("/usr/share/dict/words") as f: for line in f: words.append(line.strip()) class MainHandler(web.RequestHandler): @web.asynchronous def get(self, arg): index = int(arg) self.write(json.dumps(dict(index=index, word=words[index]))) # artificial delay for finish: loop = IOLoop.instance() loop.add_timeout(loop.time() + options.delay, self.finish) def put(self, arg): """put does nothing""" self.write(json.dumps(dict(status="ok"))) def main(): parse_command_line() application = web.Application([ (r"/records/(.*)", MainHandler), ]) http_server = HTTPServer(application) http_server.listen(options.port) IOLoop.instance().start() if __name__ == "__main__": main()
Now fetch some answers from the service (it should take a bit more than N/10 seconds, since we are using an artificial delay of 100ms).
%time answers = [ process_id(n) for n in range(32) ]
answers
CPU times: user 116 ms, sys: 16.2 ms, total: 133 ms Wall time: 3.41 s
[{u'index': 0, u'word': u'A'}, {u'index': 1, u'word': u'a'}, {u'index': 2, u'word': u'aa'}, {u'index': 3, u'word': u'aal'}, {u'index': 4, u'word': u'aalii'}, {u'index': 5, u'word': u'aam'}, {u'index': 6, u'word': u'Aani'}, {u'index': 7, u'word': u'aardvark'}, {u'index': 8, u'word': u'aardwolf'}, {u'index': 9, u'word': u'Aaron'}, {u'index': 10, u'word': u'Aaronic'}, {u'index': 11, u'word': u'Aaronical'}, {u'index': 12, u'word': u'Aaronite'}, {u'index': 13, u'word': u'Aaronitic'}, {u'index': 14, u'word': u'Aaru'}, {u'index': 15, u'word': u'Ab'}, {u'index': 16, u'word': u'aba'}, {u'index': 17, u'word': u'Ababdeh'}, {u'index': 18, u'word': u'Ababua'}, {u'index': 19, u'word': u'abac'}, {u'index': 20, u'word': u'abaca'}, {u'index': 21, u'word': u'abacate'}, {u'index': 22, u'word': u'abacay'}, {u'index': 23, u'word': u'abacinate'}, {u'index': 24, u'word': u'abacination'}, {u'index': 25, u'word': u'abaciscus'}, {u'index': 26, u'word': u'abacist'}, {u'index': 27, u'word': u'aback'}, {u'index': 28, u'word': u'abactinal'}, {u'index': 29, u'word': u'abactinally'}, {u'index': 30, u'word': u'abaction'}, {u'index': 31, u'word': u'abactor'}]
Now we define a function that calls our single-id function for a range of IDs. Simple enough.
def process_id_range(id_range, store=None):
"""process a number of ids, storing the results in a dict"""
if store is None:
store = {}
for id in id_range:
store[id] = process_id(id)
return store
This does the same thing as the list comprehension above (but with a dict) in a single function call:
%time s = process_id_range(range(32));
s
CPU times: user 118 ms, sys: 15.6 ms, total: 134 ms Wall time: 3.42 s
{0: {u'index': 0, u'word': u'A'}, 1: {u'index': 1, u'word': u'a'}, 2: {u'index': 2, u'word': u'aa'}, 3: {u'index': 3, u'word': u'aal'}, 4: {u'index': 4, u'word': u'aalii'}, 5: {u'index': 5, u'word': u'aam'}, 6: {u'index': 6, u'word': u'Aani'}, 7: {u'index': 7, u'word': u'aardvark'}, 8: {u'index': 8, u'word': u'aardwolf'}, 9: {u'index': 9, u'word': u'Aaron'}, 10: {u'index': 10, u'word': u'Aaronic'}, 11: {u'index': 11, u'word': u'Aaronical'}, 12: {u'index': 12, u'word': u'Aaronite'}, 13: {u'index': 13, u'word': u'Aaronitic'}, 14: {u'index': 14, u'word': u'Aaru'}, 15: {u'index': 15, u'word': u'Ab'}, 16: {u'index': 16, u'word': u'aba'}, 17: {u'index': 17, u'word': u'Ababdeh'}, 18: {u'index': 18, u'word': u'Ababua'}, 19: {u'index': 19, u'word': u'abac'}, 20: {u'index': 20, u'word': u'abaca'}, 21: {u'index': 21, u'word': u'abacate'}, 22: {u'index': 22, u'word': u'abacay'}, 23: {u'index': 23, u'word': u'abacinate'}, 24: {u'index': 24, u'word': u'abacination'}, 25: {u'index': 25, u'word': u'abaciscus'}, 26: {u'index': 26, u'word': u'abacist'}, 27: {u'index': 27, u'word': u'aback'}, 28: {u'index': 28, u'word': u'abactinal'}, 29: {u'index': 29, u'word': u'abactinally'}, 30: {u'index': 30, u'word': u'abaction'}, 31: {u'index': 31, u'word': u'abactor'}}
And finally the real point of this whole exercise, we can partition a list of IDs and assign sub-lists to threads, so that we can have some requests running concurrently.
from threading import Thread
def threaded_process(nthreads, id_range):
"""process the id range in a specified number of threads"""
store = {}
threads = []
# create the threads
for i in range(nthreads):
ids = id_range[i::nthreads]
t = Thread(target=process_id_range, args=(ids,store))
threads.append(t)
# start the threads
[ t.start() for t in threads ]
# wait for the threads to finish
[ t.join() for t in threads ]
return store
If we test this, we will see that adding threads allows this to complete faster. Given the nature of the dummy workload, this should actually scale to almost 100 threads.
import time
id_range = range(100)
tic = time.time()
reference = process_id_range(id_range)
reftime = time.time() - tic
print 1, reftime
nlist = [1,2,4,8,16,32,64]
tlist = [reftime]
for nthreads in nlist[1:]:
tic = time.time()
ans = threaded_process(nthreads, id_range)
toc = time.time()
print nthreads, toc-tic
assert ans == reference
tlist.append(toc-tic)
1 10.8778159618 2 5.70237803459 4 3.02138209343 8 1.79273200035 16 1.00950598717 32 0.682367086411 64 0.624274015427
%pylab inline
Welcome to pylab, a matplotlib-based Python environment [backend: module://IPython.kernel.zmq.pylab.backend_inline]. For more information, type 'help(pylab)'.
plt.loglog(nlist, [ len(id_range) / t for t in tlist], '-o')
plt.xlabel("n threads")
plt.ylabel("requests / sec");
And, logically enough, we peak around 50 threads, because anything between 50 and 99 threads there is at least one thread that processes two requests. All the other threads end up waiting.