Concurrent REST requests with threads

This is a simple demo of using threads for concurrent consumption of a REST API.

First, we define our simple function for processing a single resource ID. It has three basic steps:

  1. GET the record
  2. parse / process it
  3. PUT to update the record
In [1]:
import requests

url_t = "http://localhost:8000/records/%i"

def process_id(id):
    """process a single ID"""
    # fetch the data
    r = requests.get(url_t % id)
    # parse the JSON reply
    data = r.json()
    # and update some data with PUT
    requests.put(url_t % id, data=data)
    return data
    

Our dummy webservice is just a simple REST service that serves up the /usr/share/dict/words word list:

In [2]:
import os
ip = get_ipython()
print ip.pycolorize(open(os.path.expanduser("~/temp/echo.py")).read())
"""Simple dummy REST webservice

Includes artificial delay in requests to allow simulation of concurrent access.
"""
import json

from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado import web

from tornado.options import define, options, parse_command_line

define("port", default=8000, help="run on the given port", type=int)
define("delay", default=0.1, help="artificial delay for GET requests", type=float)

# load simple word list, or RESTful data
words = []
with open("/usr/share/dict/words") as f:
    for line in f:
        words.append(line.strip())

class MainHandler(web.RequestHandler):
    
    @web.asynchronous
    def get(self, arg):
        index = int(arg)
        self.write(json.dumps(dict(index=index, word=words[index])))
        # artificial delay for finish:
        loop = IOLoop.instance()
        loop.add_timeout(loop.time() + options.delay, self.finish)

    def put(self, arg):
        """put does nothing"""
        self.write(json.dumps(dict(status="ok")))


def main():
    parse_command_line()
    application = web.Application([
        (r"/records/(.*)", MainHandler),
    ])
    http_server = HTTPServer(application)
    http_server.listen(options.port)
    IOLoop.instance().start()


if __name__ == "__main__":
    main()

Now fetch some answers from the service (it should take a bit more than N/10 seconds, since we are using an artificial delay of 100ms).

In [3]:
%time answers = [ process_id(n) for n in range(32) ]
answers
CPU times: user 116 ms, sys: 16.2 ms, total: 133 ms
Wall time: 3.41 s
Out[3]:
[{u'index': 0, u'word': u'A'},
 {u'index': 1, u'word': u'a'},
 {u'index': 2, u'word': u'aa'},
 {u'index': 3, u'word': u'aal'},
 {u'index': 4, u'word': u'aalii'},
 {u'index': 5, u'word': u'aam'},
 {u'index': 6, u'word': u'Aani'},
 {u'index': 7, u'word': u'aardvark'},
 {u'index': 8, u'word': u'aardwolf'},
 {u'index': 9, u'word': u'Aaron'},
 {u'index': 10, u'word': u'Aaronic'},
 {u'index': 11, u'word': u'Aaronical'},
 {u'index': 12, u'word': u'Aaronite'},
 {u'index': 13, u'word': u'Aaronitic'},
 {u'index': 14, u'word': u'Aaru'},
 {u'index': 15, u'word': u'Ab'},
 {u'index': 16, u'word': u'aba'},
 {u'index': 17, u'word': u'Ababdeh'},
 {u'index': 18, u'word': u'Ababua'},
 {u'index': 19, u'word': u'abac'},
 {u'index': 20, u'word': u'abaca'},
 {u'index': 21, u'word': u'abacate'},
 {u'index': 22, u'word': u'abacay'},
 {u'index': 23, u'word': u'abacinate'},
 {u'index': 24, u'word': u'abacination'},
 {u'index': 25, u'word': u'abaciscus'},
 {u'index': 26, u'word': u'abacist'},
 {u'index': 27, u'word': u'aback'},
 {u'index': 28, u'word': u'abactinal'},
 {u'index': 29, u'word': u'abactinally'},
 {u'index': 30, u'word': u'abaction'},
 {u'index': 31, u'word': u'abactor'}]

Now we define a function that calls our single-id function for a range of IDs. Simple enough.

In [4]:
def process_id_range(id_range, store=None):
    """process a number of ids, storing the results in a dict"""
    if store is None:
        store = {}
    for id in id_range:
        store[id] = process_id(id)
    return store

This does the same thing as the list comprehension above (but with a dict) in a single function call:

In [5]:
%time s = process_id_range(range(32));
s
CPU times: user 118 ms, sys: 15.6 ms, total: 134 ms
Wall time: 3.42 s
Out[5]:
{0: {u'index': 0, u'word': u'A'},
 1: {u'index': 1, u'word': u'a'},
 2: {u'index': 2, u'word': u'aa'},
 3: {u'index': 3, u'word': u'aal'},
 4: {u'index': 4, u'word': u'aalii'},
 5: {u'index': 5, u'word': u'aam'},
 6: {u'index': 6, u'word': u'Aani'},
 7: {u'index': 7, u'word': u'aardvark'},
 8: {u'index': 8, u'word': u'aardwolf'},
 9: {u'index': 9, u'word': u'Aaron'},
 10: {u'index': 10, u'word': u'Aaronic'},
 11: {u'index': 11, u'word': u'Aaronical'},
 12: {u'index': 12, u'word': u'Aaronite'},
 13: {u'index': 13, u'word': u'Aaronitic'},
 14: {u'index': 14, u'word': u'Aaru'},
 15: {u'index': 15, u'word': u'Ab'},
 16: {u'index': 16, u'word': u'aba'},
 17: {u'index': 17, u'word': u'Ababdeh'},
 18: {u'index': 18, u'word': u'Ababua'},
 19: {u'index': 19, u'word': u'abac'},
 20: {u'index': 20, u'word': u'abaca'},
 21: {u'index': 21, u'word': u'abacate'},
 22: {u'index': 22, u'word': u'abacay'},
 23: {u'index': 23, u'word': u'abacinate'},
 24: {u'index': 24, u'word': u'abacination'},
 25: {u'index': 25, u'word': u'abaciscus'},
 26: {u'index': 26, u'word': u'abacist'},
 27: {u'index': 27, u'word': u'aback'},
 28: {u'index': 28, u'word': u'abactinal'},
 29: {u'index': 29, u'word': u'abactinally'},
 30: {u'index': 30, u'word': u'abaction'},
 31: {u'index': 31, u'word': u'abactor'}}

And finally the real point of this whole exercise, we can partition a list of IDs and assign sub-lists to threads, so that we can have some requests running concurrently.

In [6]:
from threading import Thread
In [7]:
def threaded_process(nthreads, id_range):
    """process the id range in a specified number of threads"""
    store = {}
    threads = []
    # create the threads
    for i in range(nthreads):
        ids = id_range[i::nthreads]
        t = Thread(target=process_id_range, args=(ids,store))
        threads.append(t)
    
    # start the threads
    [ t.start() for t in threads ]
    # wait for the threads to finish
    [ t.join() for t in threads ]
    return store

If we test this, we will see that adding threads allows this to complete faster. Given the nature of the dummy workload, this should actually scale to almost 100 threads.

In [8]:
import time
In [9]:
id_range = range(100)
tic = time.time()
reference = process_id_range(id_range)
reftime = time.time() - tic
print 1, reftime

nlist = [1,2,4,8,16,32,64]
tlist = [reftime]
for nthreads in nlist[1:]:
    tic = time.time()
    ans = threaded_process(nthreads, id_range)
    toc = time.time()
    print nthreads, toc-tic
    assert ans == reference
    tlist.append(toc-tic)
1 10.8778159618
2 5.70237803459
4 3.02138209343
8 1.79273200035
16 1.00950598717
32 0.682367086411
64 0.624274015427
In [10]:
%pylab inline
Welcome to pylab, a matplotlib-based Python environment [backend: module://IPython.kernel.zmq.pylab.backend_inline].
For more information, type 'help(pylab)'.
In [11]:
plt.loglog(nlist, [ len(id_range) / t for t in tlist], '-o')
plt.xlabel("n threads")
plt.ylabel("requests / sec");

And, logically enough, we peak around 50 threads, because anything between 50 and 99 threads there is at least one thread that processes two requests. All the other threads end up waiting.