#!/usr/bin/env python # coding: utf-8 # # Publishing usage info during running tasks # In[1]: import ipyparallel as ipp # In[2]: def get_usage(): """return a dict of usage info for this process""" import socket import psutil from datetime import datetime hn = socket.gethostname() p = psutil.Process() mem = p.memory_percent() cpu = p.cpu_percent(interval=0.1) return { 'date': datetime.utcnow(), 'cpu': cpu, 'mem': mem, 'hostname': hn, } @ipp.interactive def publish_data_forever(get_usage, interval): """Forever, call get_usage and publish the data via datapub This will be available on all AsyncResults as `ar.data` """ from threading import Thread import time def main(): from ipyparallel.datapub import publish_data while not globals().get('stop_publishing', False): publish_data(get_usage()) time.sleep(interval) Thread(target=main, daemon=True).start() # Connect to our client # In[3]: rc = ipp.Client() dv = rc[:] view = rc.load_balanced_view() # In[4]: dv.apply_async(get_usage).get_dict() # Start reporting usage in a thread # In[5]: from functools import partial dv.apply_sync(partial(publish_data_forever, get_usage), 2) # Our work task that has bursts of cpu usage # In[6]: def burn_randomly_forever(n=10): import numpy as np for i in range(n): a = np.random.random((1024, 1024)) b = np.random.random((1024, 1024)) tic = time.time() while time.time() - tic < 2: c = a.dot(b) time.sleep(1) # In[11]: def print_usage(ar): """Nicely print usage data""" print( "{:20s} {:32s} {:3s}% {:3s}%".format( "hostname", "date", "CPU", "MEM", ) ) for report in ar.data: if not report: # no data yet continue print( "{:20s} {:32s} {:3.0f}% {:3.0f}%".format( report["hostname"], report["date"].isoformat(), report["cpu"], report["mem"], )) # Give it a test: # In[9]: import time ar = dv.apply_async(burn_randomly_forever, 5) while not ar.done(): print_usage(ar) time.sleep(2) ar.get() # Tell the publisher threads to stop: # In[10]: get_ipython().run_line_magic('px', 'stop_publishing = True')