import sys
sys.path.append('../')
# import os
# os.environ['PYTHONASYNCIODEBUG'] = '1'
You will need to import the module aiochan
and asyncio
first:
import aiochan as ac
import asyncio
A channel is like a golang channel or a Clojure core.async chan. Creating a channel is simple:
c = ac.Chan()
c
In the following examples, we use ac.run
to run the main coroutine. You can also run asyncio loops directly.
We can call await c.put(v)
to put value into the channel, await c.get()
to get value from the channel, c.close()
to close the channel, and ac.go(...)
to spawn a coroutine inside another coroutine:
async def producer(c):
i = 0
while True:
await asyncio.sleep(0.1) # producing stuff takes time
i += 1
still_open = await c.put('product ' + str(i))
if not still_open:
print('producer goes home')
break
async def consumer(c):
while True:
product = await c.get()
if product is not None:
print('obtained:', product)
else:
print('consumer goes home')
break
async def main():
c = ac.Chan()
ac.go(producer(c))
ac.go(consumer(c))
await asyncio.sleep(0.6)
print('It is late, let us call it a day.')
c.close()
await asyncio.sleep(0.2) # necessary to wait for producer
ac.run(main())
Channel works as an async iterator:
async def producer(c):
i = 0
while True:
await asyncio.sleep(0.1) # producing stuff takes time
i += 1
still_open = await c.put('product ' + str(i))
if not still_open:
print('producer goes home')
break
async def consumer(c):
async for product in c:
print('obtained:', product)
print('consumer goes home')
async def main():
c = ac.Chan()
ac.go(producer(c))
ac.go(consumer(c))
await asyncio.sleep(0.6)
print('It is late, let us call it a day.')
c.close()
await asyncio.sleep(0.2) # necessary to wait for producer
ac.run(main())
select
, which is the whole point of channels, works as in golang or alt!
in Clojure's core.async to complete one and only one operation non-deterministically:
async def worker(out, stop, tag):
i = 0
while True:
i += 1
await asyncio.sleep(0.1)
result, c = await ac.select(stop, (out, '%s-%s' % (tag, i)), priority=True)
if c is stop:
print('%s stopped' % tag)
break
async def consumer(c, stop):
while True:
result, c = await ac.select(stop, c, priority=True)
if c is stop:
print('consumer stopped')
break
else:
print('received', result)
async def main():
c = ac.Chan()
stop = ac.Chan()
for i in range(3):
ac.go(worker(c, stop, 'worker%s' % i))
ac.go(consumer(c, stop))
await asyncio.sleep(0.6)
stop.close()
await asyncio.sleep(0.2)
ac.run(main())
Channels can use some buffering to implement back-pressure:
async def worker(c):
i = 0
while True:
i += 1
await asyncio.sleep(0.05)
print('producing', i)
await c.put(i)
async def consumer(c):
while True:
await asyncio.sleep(0.2)
result = await c.get()
print('consuming', result)
async def main():
c = ac.Chan(3)
ac.go(worker(c))
ac.go(consumer(c))
await asyncio.sleep(1)
ac.run(main())
Sliding and dropping buffers are also available.
That's all the basics, but there are much more: functional methods, combination patterns, pipelines, thread or process-based parallelism and so on. Read the in-depth tutorial or the API documentation to find out more.