import sys
sys.path.append('../')
# import os
# os.environ['PYTHONASYNCIODEBUG'] = '1'
Channels with their put and get operations can already be used to build rather complicated systems. Now we introduce the operation select
, which hugely increases the expressive power of channels further.
Basically, if we have channels c1
, c2
and c3
and we write
result = await select(c1, c2, c3)
then result
will hold the result of one and only one get
operation on c1
, c2
and c3
. Only one operation will be attempted. If we have several operations that can be completed at the same time, only one will complete, and the non-completing ones will not run at all. This is in constrast with, say, asyncio.wait
.
Let's have some examples:
import asyncio
import aiochan as ac
async def main():
c1 = ac.Chan(name='c1').add(1, 2, 3).close()
c2 = ac.Chan(name='c2').add('a', 'b', 'c').close()
c3 = ac.Chan(name='c3').add('x', 'y', 'z').close()
result, chan = await ac.select(c1, c2, c3)
print('the result is', result)
print('the result is from', chan)
async for v in c1:
print('c1 still has value:', v)
async for v in c2:
print('c2 still has value:', v)
async for v in c3:
print('c3 still has value:', v)
ac.run(main())
the result is 1 the result is from Chan<c1 140594564470264> c1 still has value: 2 c1 still has value: 3 c2 still has value: a c2 still has value: b c2 still has value: c c3 still has value: x c3 still has value: y c3 still has value: z
Here we have also used some new operations on channels:
Chan(name='some name')
,ch.add(...)
adds elements to channels on the background when it is possible to do so,close
closes the channel immediately, but all pending puts (here those by add
) will still have an opportunity to complete,add
and close
can be chained as both these methods return the channel.And for our select
:
Actually, it is not only get operations that can be select
ed:
async def receive(c):
r = await c.get()
print('received', r, 'on', c)
async def main():
c1 = ac.Chan(name='c1')
c2 = ac.Chan(name='c2')
ac.go(receive(c1))
ac.go(receive(c2))
await ac.nop()
result, chan = await ac.select((c1, 'A'), (c2, 'B'))
print('select completes on', chan)
ac.run(main())
select completes on Chan<c2 140594564470264> received B on Chan<c2 140594564470264>
we see that if we give an argument like (chan, value)
it is interpreted as a put operation akin to chan.put(value)
. Again, one and only one operation will complete. You can also mix get operations with put operations.
Also, if you are careful, you will have noticed that we have inserted a nop
above. If it is not there, the select
will always complete on c1
. You may want to think about why.
The more non-trivial the application is, the more use of select
you can find. One of its simplest use is for stopping many workers at once:
async def worker(out, stop, tag):
i = 0
while True:
i += 1
await asyncio.sleep(0.1)
result, c = await ac.select(stop, (out, '%s-%s' % (tag, i)), priority=True)
if c is stop:
print('%s stopped' % tag)
break
async def consumer(c, stop):
while True:
result, c = await ac.select(stop, c, priority=True)
if c is stop:
print('consumer stopped')
break
else:
print('received', result)
async def main():
c = ac.Chan()
stop = ac.Chan()
for i in range(3):
ac.go(worker(c, stop, 'worker%s' % i))
ac.go(consumer(c, stop))
await asyncio.sleep(0.6)
stop.close()
await asyncio.sleep(0.2)
ac.run(main())
received worker0-1 received worker1-1 received worker2-1 received worker0-2 received worker1-2 received worker2-2 received worker0-3 received worker1-3 received worker2-3 received worker0-4 received worker1-4 received worker2-4 received worker0-5 received worker1-5 received worker2-5 consumer stopped worker0 stopped worker1 stopped worker2 stopped
Here stopping can actually be signaled by simply closing the fan-in-fan-out channel, but in more complicated situations (for example, closing down in response to any one of several conditions) select
is essential.
We have also seen that select
takes an argument priority
, which defaults to False
. Here we set it to true, so when several operations become completable at the same time, it is guaranteed that the leftmost one will complete. Here we use this priority select
to make sure that the operation stops at the earliest instance.
There is also a default
argument to select
, which if set, will produce the set value immediately when none of the operations can be completed immediately, with None
in the place where you usually find the completed channel. The following snippet completes the put only if it can be done immediately:
async def main():
ch = ac.Chan()
result, c = await ac.select((ch, 'value'), default='giveup')
if c is None:
print(result)
print('put cannot complete immediately and was given up')
ac.run(main())
giveup put cannot complete immediately and was given up
By now you should know how to use select
. It certainly seems a simple enough operation to understand. However, select
is non-trivial. What we mean by that is that, using only channels and put and get operations on channels, it is not possible to write a select
clone that has the correct semantics. The semantics of select
has three requirements:
Writing an operation satisfying any two of the above is easy. But to satisfy all three, you need to submit your operations to the involved channels at the time of calling, and at the time of completion of any operation, you will need to notify all other operations to cancel themselves. Thus the semantics of select
must be implemented inside Chan
, not outside.
select
is actually the whole point of aiochan
: asyncio
do provide us with futures, locks and things, which are somewhat like our channels superficially. But select
is conspicuously missing. Channels are made to make select
possible. Rob Pike, the inventor of golang, mentions select
as the reason why channels in golang is provided by the language itself instead of as a library.
Another way of putting this is: in the hierarchy of concurrency operations, select
is on the highest level of abstraction. Consider the following:
java.util.concurrent
was added as a libray;core.async
as a library, since writing a select
that works well on all the previous stuff is not possible! (By the way, select
is called alt!
, alts!
, alt!!
and alts!!
in core.async. Yes there are four of them.)By the way, python has a built-in library called select
, and a higher-level one doing essentially the same thing called selectors
. But these libraries only work with files or sockets, not plain python objects, and the availability of the various operations in theses libraries depend on the operating system. That is because the library just offloads it work to system calls. Usually we think of system calls as pretty low level. How many times have you encountered some abstraction that is provided by the lower-level operating system but not by the higher-level programming language?
To recap:
select
operator completes exactly one operation from the given operations,select
can be used as a control structure,select
is non-trivial.Useful constructs:
select
aiochan.Chan.add