def get_usage():
"""return a dict of usage info for this process"""
from IPython import get_ipython
import socket
import psutil
from datetime import datetime
hn = socket.gethostname()
p = psutil.Process()
mem = p.memory_percent()
cpu = p.cpu_percent(interval=0.1)
return {
'engine_id': getattr(get_ipython().kernel, 'engine_id', None),
'date': datetime.utcnow(),
'cpu': cpu,
'mem': mem,
'hostname': hn,
'pid': os.getpid(),
}
get_usage()
{'engine_id': None, 'date': datetime.datetime(2018, 9, 4, 14, 31, 56, 487733), 'cpu': 0.2, 'mem': 0.33042430877685547, 'hostname': 'k5.simula.no', 'pid': 98975}
Connect to our client
import ipyparallel as ipp
rc = ipp.Client()
dv = rc[:]
view = rc.load_balanced_view()
Call get_usage remotely:
dv.apply_async(get_usage).get_dict()
{0: {'cpu': 0.2, 'date': datetime.datetime(2018, 9, 4, 14, 32, 3, 374646), 'engine_id': 0, 'hostname': 'k5.simula.no', 'mem': 0.5434989929199219, 'pid': 98366}, 1: {'cpu': 0.1, 'date': datetime.datetime(2018, 9, 4, 14, 32, 3, 560603), 'engine_id': 1, 'hostname': 'k5.simula.no', 'mem': 0.5460023880004883, 'pid': 98367}, 2: {'cpu': 0.1, 'date': datetime.datetime(2018, 9, 4, 14, 32, 3, 280124), 'engine_id': 2, 'hostname': 'k5.simula.no', 'mem': 0.5440950393676758, 'pid': 98368}, 3: {'cpu': 0.1, 'date': datetime.datetime(2018, 9, 4, 14, 32, 3, 564956), 'engine_id': 3, 'hostname': 'k5.simula.no', 'mem': 0.5443096160888672, 'pid': 98371}, 4: {'cpu': 0.1, 'date': datetime.datetime(2018, 9, 4, 14, 32, 3, 564958), 'engine_id': 4, 'hostname': 'k5.simula.no', 'mem': 0.5438804626464844, 'pid': 98376}, 5: {'cpu': 0.3, 'date': datetime.datetime(2018, 9, 4, 14, 32, 3, 279651), 'engine_id': 5, 'hostname': 'k5.simula.no', 'mem': 0.5438804626464844, 'pid': 98387}, 6: {'cpu': 0.4, 'date': datetime.datetime(2018, 9, 4, 14, 32, 3, 705696), 'engine_id': 6, 'hostname': 'k5.simula.no', 'mem': 0.5440473556518555, 'pid': 98392}, 7: {'cpu': 0.2, 'date': datetime.datetime(2018, 9, 4, 14, 32, 3, 609328), 'engine_id': 7, 'hostname': 'k5.simula.no', 'mem': 0.544428825378418, 'pid': 98405}}
Now, we want to start publishing in the background.
This spawns a thread and publishes get_usage output forever,
until e.g. %px stop_publishing = True
%px stop_publishing = True
%px stop_publishing = False
def publish_data_forever(get_usage, interval):
"""Forever, call get_usage and publish the data via datapub
This will be available on all AsyncResults as `ar.data`
"""
from threading import Thread
import time
import __main__ as user_ns # the interactive namespace
from ipyparallel.datapub import publish_data
def main():
while not getattr(user_ns, 'stop_publishing', False):
publish_data(get_usage())
time.sleep(interval)
Thread(target=main, daemon=True).start()
from functools import partial
dv.apply_sync(partial(publish_data_forever, get_usage), 5)
[None, None, None, None, None, None, None, None]
If you want this to happen by default, it can also start publishing in your IPython engine startup:
# ipengine_config.py
c.IPEngineApp.startup_script = 'start-publishing.py'
and in that script, call publish_data_forever:
# start-publishing.py
def get_usage():
"""return a dict of usage info for this process"""
from IPython import get_ipython
import socket
import psutil
from datetime import datetime
hn = socket.gethostname()
p = psutil.Process()
mem = p.memory_percent()
cpu = p.cpu_percent(interval=0.1)
return {
'engine_id': getattr(get_ipython().kernel, 'engine_id', None),
'date': datetime.utcnow(),
'cpu': cpu,
'mem': mem,
'hostname': hn,
'pid': os.getpid(),
}
def publish_data_forever(interval):
"""Forever, call get_usage and publish the data via datapub
This will be available on all AsyncResults as `ar.data`
"""
from threading import Thread
import time
import __main__ as user_ns # the interactive namespace
from ipyparallel.datapub import publish_data
def main():
while not getattr(user_ns, 'stop_publishing', False):
publish_data(get_usage())
time.sleep(interval)
Thread(target=main, daemon=True).start()
publish_data_forever(5)
Now, we want to watch the forever-publishing data, even when other clients are requesting executions. To do this, we hijack our existing iopub socket
and handle data_message
events ourselves.
We collect them into a dict called latest_data
, which contains each report,
keyed by engine id.
import pprint
from functools import partial
from ipyparallel import serialize
latest_data = {}
def collect_data(session, msg_frames):
"""Collect and deserialize messages"""
idents, msg = session.feed_identities(msg_frames)
try:
msg = session.deserialize(msg, content=True)
except Exception as e:
print(e)
return
# print(f"Message: {msg['header']['msg_type']}")
if msg['header']['msg_type'] != 'data_message':
return
# show the contents of data messages:
data, remainder = serialize.deserialize_object(msg['buffers'])
latest_data[data['engine_id']] = data
We could connect our own iopub socket here instead of hijacking, but we already have the stream, so why not use it?
rc._iopub_stream.on_recv(partial(collect_data, rc.session))
def print_usage(data):
"""Nicely print usage data"""
if not data:
return
print(
" {:2s} {:20s} {:32s} {:3s}% {:3s}%".format(
"id",
"hostname",
"date",
"CPU",
"MEM",
)
)
for eid, report in sorted(data.items()):
print(
"{:3.0f} {:20s} {:32s} {:3.0f}% {:3.0f}%".format(
report["engine_id"],
report["hostname"],
report["date"].isoformat(),
report["cpu"],
report["mem"],
))
Wait a bit, then print the latest data
while not latest_data:
time.sleep(1)
print_usage(latest_data)
id hostname date CPU% MEM% 0 k5.simula.no 2018-09-04T14:32:03.868295 2% 1% 1 k5.simula.no 2018-09-04T14:32:03.871952 3% 1% 2 k5.simula.no 2018-09-04T14:32:03.875761 3% 1% 3 k5.simula.no 2018-09-04T14:32:03.871934 3% 1% 4 k5.simula.no 2018-09-04T14:32:03.871955 3% 1% 5 k5.simula.no 2018-09-04T14:32:03.871950 3% 1% 6 k5.simula.no 2018-09-04T14:32:03.875775 3% 1% 7 k5.simula.no 2018-09-04T14:32:03.875779 2% 1%
At this point, we've invalidated our first client by taking over its socket. Let's make another one to test it out.
rc2 = ipp.Client()
rc2.activate()
dv = rc2[:]
Our work task that has bursts of cpu usage
def burn_randomly_forever(n=10):
import numpy as np
for i in range(n):
a = np.random.random((1024, 1024))
b = np.random.random((1024, 1024))
tic = time.time()
while time.time() - tic < 2:
c = a.dot(b)
time.sleep(1)
Give it a test, printing usage data coming in on our first client, while running code on our second:
ar = dv.apply_async(burn_randomly_forever)
while not ar.done():
print_usage(latest_data)
time.sleep(4)
id hostname date CPU% MEM% 0 k5.simula.no 2018-09-04T14:32:03.868295 2% 1% 1 k5.simula.no 2018-09-04T14:32:03.871952 3% 1% 2 k5.simula.no 2018-09-04T14:32:03.875761 3% 1% 3 k5.simula.no 2018-09-04T14:32:03.871934 3% 1% 4 k5.simula.no 2018-09-04T14:32:03.871955 3% 1% 5 k5.simula.no 2018-09-04T14:32:03.871950 3% 1% 6 k5.simula.no 2018-09-04T14:32:03.875775 3% 1% 7 k5.simula.no 2018-09-04T14:32:03.875779 2% 1% id hostname date CPU% MEM% 0 k5.simula.no 2018-09-04T14:32:06.495548 112% 1% 1 k5.simula.no 2018-09-04T14:32:06.510583 99% 1% 2 k5.simula.no 2018-09-04T14:32:06.430812 32% 1% 3 k5.simula.no 2018-09-04T14:32:06.409458 95% 1% 4 k5.simula.no 2018-09-04T14:32:06.521952 28% 1% 5 k5.simula.no 2018-09-04T14:32:06.515087 98% 1% 6 k5.simula.no 2018-09-04T14:32:06.524322 112% 1% 7 k5.simula.no 2018-09-04T14:32:06.526264 63% 1% id hostname date CPU% MEM% 0 k5.simula.no 2018-09-04T14:32:11.600315 102% 1% 1 k5.simula.no 2018-09-04T14:32:11.619881 62% 1% 2 k5.simula.no 2018-09-04T14:32:11.536484 143% 1% 3 k5.simula.no 2018-09-04T14:32:11.518273 102% 1% 4 k5.simula.no 2018-09-04T14:32:11.628190 101% 1% 5 k5.simula.no 2018-09-04T14:32:11.618840 105% 1% 6 k5.simula.no 2018-09-04T14:32:11.629376 160% 1% 7 k5.simula.no 2018-09-04T14:32:11.636048 90% 1% id hostname date CPU% MEM% 0 k5.simula.no 2018-09-04T14:32:16.709262 0% 1% 1 k5.simula.no 2018-09-04T14:32:16.723937 0% 1% 2 k5.simula.no 2018-09-04T14:32:16.643301 0% 1% 3 k5.simula.no 2018-09-04T14:32:16.623810 0% 1% 4 k5.simula.no 2018-09-04T14:32:16.736032 0% 1% 5 k5.simula.no 2018-09-04T14:32:16.723885 0% 1% 6 k5.simula.no 2018-09-04T14:32:16.736027 0% 1% 7 k5.simula.no 2018-09-04T14:32:16.743254 0% 1% id hostname date CPU% MEM% 0 k5.simula.no 2018-09-04T14:32:19.193824 36% 1% 1 k5.simula.no 2018-09-04T14:32:19.199465 98% 1% 2 k5.simula.no 2018-09-04T14:32:19.199531 36% 1% 3 k5.simula.no 2018-09-04T14:32:19.194949 171% 1% 4 k5.simula.no 2018-09-04T14:32:19.194992 124% 1% 5 k5.simula.no 2018-09-04T14:32:19.195085 154% 1% 6 k5.simula.no 2018-09-04T14:32:19.198812 102% 1% 7 k5.simula.no 2018-09-04T14:32:19.199374 100% 1% id hostname date CPU% MEM% 0 k5.simula.no 2018-09-04T14:32:24.296745 26% 1% 1 k5.simula.no 2018-09-04T14:32:24.310477 122% 1% 2 k5.simula.no 2018-09-04T14:32:24.306473 100% 1% 3 k5.simula.no 2018-09-04T14:32:24.298284 112% 1% 4 k5.simula.no 2018-09-04T14:32:24.298303 95% 1% 5 k5.simula.no 2018-09-04T14:32:24.298479 128% 1% 6 k5.simula.no 2018-09-04T14:32:24.301048 99% 1% 7 k5.simula.no 2018-09-04T14:32:24.302155 100% 1% id hostname date CPU% MEM% 0 k5.simula.no 2018-09-04T14:32:26.921065 88% 1% 1 k5.simula.no 2018-09-04T14:32:26.931863 53% 1% 2 k5.simula.no 2018-09-04T14:32:26.856961 96% 1% 3 k5.simula.no 2018-09-04T14:32:26.832278 198% 1% 4 k5.simula.no 2018-09-04T14:32:26.947355 98% 1% 5 k5.simula.no 2018-09-04T14:32:26.932951 95% 1% 6 k5.simula.no 2018-09-04T14:32:26.946759 98% 1% 7 k5.simula.no 2018-09-04T14:32:26.946759 101% 1% id hostname date CPU% MEM% 0 k5.simula.no 2018-09-04T14:32:32.028608 0% 1% 1 k5.simula.no 2018-09-04T14:32:32.041544 0% 1% 2 k5.simula.no 2018-09-04T14:32:31.963207 0% 1% 3 k5.simula.no 2018-09-04T14:32:31.938610 90% 1% 4 k5.simula.no 2018-09-04T14:32:32.052456 0% 1% 5 k5.simula.no 2018-09-04T14:32:32.043155 0% 1% 6 k5.simula.no 2018-09-04T14:32:32.052442 0% 1% 7 k5.simula.no 2018-09-04T14:32:32.052450 0% 1%
Finally, tell the publisher threads to stop:
%px stop_publishing = True