#!/usr/bin/env python # coding: utf-8 # In[2]: import ipyparallel as ipp rc = ipp.Client() dv = rc[:] dv.activate() # In[38]: def task(n): import time from ipyparallel.datapub import publish_data tic = time.time() for i in range(n): publish_data({ 'stage': i, 'state': '%i ms' % (1e3 * (time.time() - tic)), }) time.sleep(1) # Simple monitoring of the current state # In[42]: from pprint import pprint ar = dv.apply(task, 5) while not ar.done(): pprint(ar.data) time.sleep(0.5) ar.data # Collecting and consolidating events for each stage. # # This requires: # # 1. collecting data for each engine at each stage # 2. checking when we have data for all engines at the same stage # 3. only publishing each stage once # In[43]: ar = dv.apply(task, 5) import copy from queue import Queue from collections import defaultdict # create a queue of events q = Queue() # object for assembling in-progress events # keys are stages, values are 'state' for the given stage for each engine events = defaultdict(lambda: [None] * len(ar)) # the current stage we are working on current_stage = 0 while not ar.done(): for idx, engine_data in enumerate(ar.data): stage = engine_data.get('stage') if stage is not None and stage >= current_stage: events[stage][idx] = engine_data['state'] # publish consolidated event for each stage # only when: # 1. all engines are in the same state, and # 2. we haven't published this stage before if all(e is not None for e in events[stage]): print('publishing stage', stage, events[stage]) q.put(events.pop(stage)) current_stage = stage + 1 print("the final queue") while not q.empty(): print(q.get()) # In[ ]: