#!/usr/bin/env python # coding: utf-8 # # Publishing usage info during running tasks # In[1]: def get_usage(): """return a dict of usage info for this process""" from IPython import get_ipython import socket import psutil from datetime import datetime hn = socket.gethostname() p = psutil.Process() mem = p.memory_percent() cpu = p.cpu_percent(interval=0.1) return { 'engine_id': getattr(get_ipython().kernel, 'engine_id', None), 'date': datetime.utcnow(), 'cpu': cpu, 'mem': mem, 'hostname': hn, 'pid': os.getpid(), } # In[2]: get_usage() # Connect to our client # In[3]: import ipyparallel as ipp rc = ipp.Client() dv = rc[:] view = rc.load_balanced_view() # Call get_usage remotely: # In[4]: dv.apply_async(get_usage).get_dict() # Now, we want to start publishing in the background. # # This spawns a thread and publishes get_usage output forever, # until e.g. `%px stop_publishing = True` # In[5]: get_ipython().run_line_magic('px', 'stop_publishing = True') # In[6]: get_ipython().run_line_magic('px', 'stop_publishing = False') def publish_data_forever(get_usage, interval): """Forever, call get_usage and publish the data via datapub This will be available on all AsyncResults as `ar.data` """ from threading import Thread import time import __main__ as user_ns # the interactive namespace from ipyparallel.datapub import publish_data def main(): while not getattr(user_ns, 'stop_publishing', False): publish_data(get_usage()) time.sleep(interval) Thread(target=main, daemon=True).start() from functools import partial dv.apply_sync(partial(publish_data_forever, get_usage), 5) # If you want this to happen by default, it can also start publishing in your IPython engine startup: # # ```python # # ipengine_config.py # c.IPEngineApp.startup_script = 'start-publishing.py' # ``` # # and in that script, call publish_data_forever: # # ```python # # start-publishing.py # def get_usage(): # """return a dict of usage info for this process""" # from IPython import get_ipython # import socket # import psutil # from datetime import datetime # hn = socket.gethostname() # p = psutil.Process() # mem = p.memory_percent() # cpu = p.cpu_percent(interval=0.1) # return { # 'engine_id': getattr(get_ipython().kernel, 'engine_id', None), # 'date': datetime.utcnow(), # 'cpu': cpu, # 'mem': mem, # 'hostname': hn, # 'pid': os.getpid(), # } # # def publish_data_forever(interval): # """Forever, call get_usage and publish the data via datapub # # This will be available on all AsyncResults as `ar.data` # """ # from threading import Thread # import time # import __main__ as user_ns # the interactive namespace # # from ipyparallel.datapub import publish_data # # def main(): # while not getattr(user_ns, 'stop_publishing', False): # publish_data(get_usage()) # time.sleep(interval) # Thread(target=main, daemon=True).start() # # publish_data_forever(5) # ``` # Now, we want to watch the forever-publishing data, even when *other* clients are requesting executions. To do this, we hijack our existing iopub socket # and handle `data_message` events ourselves. # # We collect them into a dict called `latest_data`, which contains each report, # keyed by engine id. # In[7]: import pprint from functools import partial from ipyparallel import serialize latest_data = {} def collect_data(session, msg_frames): """Collect and deserialize messages""" idents, msg = session.feed_identities(msg_frames) try: msg = session.deserialize(msg, content=True) except Exception as e: print(e) return # print(f"Message: {msg['header']['msg_type']}") if msg['header']['msg_type'] != 'data_message': return # show the contents of data messages: data, remainder = serialize.deserialize_object(msg['buffers']) latest_data[data['engine_id']] = data # We could connect our own iopub socket here instead of hijacking, # but we already have the stream, so why not use it? # In[8]: rc._iopub_stream.on_recv(partial(collect_data, rc.session)) # In[9]: def print_usage(data): """Nicely print usage data""" if not data: return print( " {:2s} {:20s} {:32s} {:3s}% {:3s}%".format( "id", "hostname", "date", "CPU", "MEM", ) ) for eid, report in sorted(data.items()): print( "{:3.0f} {:20s} {:32s} {:3.0f}% {:3.0f}%".format( report["engine_id"], report["hostname"], report["date"].isoformat(), report["cpu"], report["mem"], )) # Wait a bit, then print the latest data # In[10]: while not latest_data: time.sleep(1) print_usage(latest_data) # At this point, we've invalidated our first client by taking over its socket. # Let's make another one to test it out. # In[11]: rc2 = ipp.Client() rc2.activate() dv = rc2[:] # Our work task that has bursts of cpu usage # In[12]: def burn_randomly_forever(n=10): import numpy as np for i in range(n): a = np.random.random((1024, 1024)) b = np.random.random((1024, 1024)) tic = time.time() while time.time() - tic < 2: c = a.dot(b) time.sleep(1) # Give it a test, printing usage data coming in on our first client, while running code on our second: # In[13]: ar = dv.apply_async(burn_randomly_forever) while not ar.done(): print_usage(latest_data) time.sleep(4) # Finally, tell the publisher threads to stop: # In[14]: get_ipython().run_line_magic('px', 'stop_publishing = True')