import ipyparallel as ipp
rc = ipp.Client()
dv = rc[:]
dv.activate()
def task(n):
import time
from ipyparallel.datapub import publish_data
tic = time.time()
for i in range(n):
publish_data({
'stage': i,
'state': '%i ms' % (1e3 * (time.time() - tic)),
})
time.sleep(1)
Simple monitoring of the current state
from pprint import pprint
ar = dv.apply(task, 5)
while not ar.done():
pprint(ar.data)
time.sleep(0.5)
ar.data
[{}, {}, {}, {}] [{'stage': 0, 'state': '0 ms'}, {'stage': 0, 'state': '0 ms'}, {'stage': 0, 'state': '0 ms'}, {'stage': 0, 'state': '0 ms'}] [{'stage': 0, 'state': '0 ms'}, {'stage': 0, 'state': '0 ms'}, {'stage': 0, 'state': '0 ms'}, {'stage': 0, 'state': '0 ms'}] [{'stage': 1, 'state': '1004 ms'}, {'stage': 1, 'state': '1003 ms'}, {'stage': 1, 'state': '1002 ms'}, {'stage': 1, 'state': '1005 ms'}] [{'stage': 1, 'state': '1004 ms'}, {'stage': 1, 'state': '1003 ms'}, {'stage': 1, 'state': '1002 ms'}, {'stage': 2, 'state': '2006 ms'}] [{'stage': 2, 'state': '2009 ms'}, {'stage': 2, 'state': '2008 ms'}, {'stage': 2, 'state': '2007 ms'}, {'stage': 2, 'state': '2006 ms'}] [{'stage': 2, 'state': '2009 ms'}, {'stage': 2, 'state': '2008 ms'}, {'stage': 2, 'state': '2007 ms'}, {'stage': 2, 'state': '2006 ms'}] [{'stage': 3, 'state': '3015 ms'}, {'stage': 3, 'state': '3014 ms'}, {'stage': 3, 'state': '3013 ms'}, {'stage': 3, 'state': '3012 ms'}] [{'stage': 3, 'state': '3015 ms'}, {'stage': 3, 'state': '3014 ms'}, {'stage': 3, 'state': '3013 ms'}, {'stage': 3, 'state': '3012 ms'}] [{'stage': 4, 'state': '4019 ms'}, {'stage': 4, 'state': '4018 ms'}, {'stage': 4, 'state': '4016 ms'}, {'stage': 4, 'state': '4015 ms'}] [{'stage': 4, 'state': '4019 ms'}, {'stage': 4, 'state': '4018 ms'}, {'stage': 4, 'state': '4016 ms'}, {'stage': 4, 'state': '4015 ms'}]
[{'stage': 4, 'state': '4019 ms'}, {'stage': 4, 'state': '4018 ms'}, {'stage': 4, 'state': '4016 ms'}, {'stage': 4, 'state': '4015 ms'}]
Collecting and consolidating events for each stage.
This requires:
ar = dv.apply(task, 5)
import copy
from queue import Queue
from collections import defaultdict
# create a queue of events
q = Queue()
# object for assembling in-progress events
# keys are stages, values are 'state' for the given stage for each engine
events = defaultdict(lambda: [None] * len(ar))
# the current stage we are working on
current_stage = 0
while not ar.done():
for idx, engine_data in enumerate(ar.data):
stage = engine_data.get('stage')
if stage is not None and stage >= current_stage:
events[stage][idx] = engine_data['state']
# publish consolidated event for each stage
# only when:
# 1. all engines are in the same state, and
# 2. we haven't published this stage before
if all(e is not None for e in events[stage]):
print('publishing stage', stage, events[stage])
q.put(events.pop(stage))
current_stage = stage + 1
print("the final queue")
while not q.empty():
print(q.get())
publishing stage 0 ['0 ms', '0 ms', '0 ms', '0 ms'] publishing stage 1 ['1005 ms', '1002 ms', '1001 ms', '1000 ms'] publishing stage 2 ['2006 ms', '2006 ms', '2001 ms', '2004 ms'] publishing stage 3 ['3010 ms', '3011 ms', '3007 ms', '3006 ms'] publishing stage 4 ['4016 ms', '4012 ms', '4007 ms', '4008 ms'] the final queue ['0 ms', '0 ms', '0 ms', '0 ms'] ['1005 ms', '1002 ms', '1001 ms', '1000 ms'] ['2006 ms', '2006 ms', '2001 ms', '2004 ms'] ['3010 ms', '3011 ms', '3007 ms', '3006 ms'] ['4016 ms', '4012 ms', '4007 ms', '4008 ms']