In [1]:
import sys
sys.path.append('../')

# import os
# os.environ['PYTHONASYNCIODEBUG'] = '1'

Note: you can try this tutorial in Binder.

A ten-minutes introduction

You will need to import the module aiochan and asyncio first:

In [2]:
import aiochan as ac
import asyncio

A channel is like a golang channel or a Clojure core.async chan. Creating a channel is simple:

In [3]:
c = ac.Chan()
c
Out[3]:
Chan<_unk_0 140697829983528>

In the following examples, we use ac.run to run the main coroutine. You can also run asyncio loops directly.

We can call await c.put(v) to put value into the channel, await c.get() to get value from the channel, c.close() to close the channel, and ac.go(...) to spawn a coroutine inside another coroutine:

In [5]:
async def producer(c):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product ' + str(i))
        if not still_open:
            print('producer goes home')
            break


async def consumer(c):
    while True:
        product = await c.get()
        if product is not None:
            print('obtained:', product)
        else:
            print('consumer goes home')
            break

async def main():
    c = ac.Chan()
    ac.go(producer(c))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer

ac.run(main())
obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home

Channel works as an async iterator:

In [6]:
async def producer(c):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product ' + str(i))
        if not still_open:
            print('producer goes home')
            break


async def consumer(c):
    async for product in c:
        print('obtained:', product)
    print('consumer goes home')

async def main():
    c = ac.Chan()
    ac.go(producer(c))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer

ac.run(main())
obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home

select, which is the whole point of channels, works as in golang or alt! in Clojure's core.async to complete one and only one operation non-deterministically:

In [8]:
async def worker(out, stop, tag):
    i = 0
    while True:
        i += 1
        await asyncio.sleep(0.1)
        result, c = await ac.select(stop, (out, '%s-%s' % (tag, i)), priority=True)
        if c is stop:
            print('%s stopped' % tag)
            break

async def consumer(c, stop):
    while True:
        result, c = await ac.select(stop, c, priority=True)
        if c is stop:
            print('consumer stopped')
            break
        else:
            print('received', result)

async def main():
    c = ac.Chan()
    stop = ac.Chan()
    for i in range(3):
        ac.go(worker(c, stop, 'worker%s' % i))
    ac.go(consumer(c, stop))
    await asyncio.sleep(0.6)
    stop.close()
    await asyncio.sleep(0.2)

ac.run(main())
received worker0-1
received worker1-1
received worker2-1
received worker0-2
received worker1-2
received worker2-2
received worker0-3
received worker1-3
received worker2-3
received worker0-4
received worker1-4
received worker2-4
received worker0-5
received worker1-5
received worker2-5
consumer stopped
worker0 stopped
worker1 stopped
worker2 stopped

Channels can use some buffering to implement back-pressure:

In [10]:
async def worker(c):
    i = 0
    while True:
        i += 1
        await asyncio.sleep(0.05)
        print('producing', i)
        await c.put(i)

async def consumer(c):
    while True:
        await asyncio.sleep(0.2)
        result = await c.get()
        print('consuming', result)

async def main():
    c = ac.Chan(3)
    ac.go(worker(c))
    ac.go(consumer(c))
    await asyncio.sleep(1)

ac.run(main())
producing 1
producing 2
producing 3
consuming 1
producing 4
producing 5
consuming 2
producing 6
consuming 3
producing 7
consuming 4
producing 8

Sliding and dropping buffers are also available.

That's all the basics, but there are much more: functional methods, combination patterns, pipelines, thread or process-based parallelism and so on. Read the in-depth tutorial or the API documentation to find out more.