import ipyparallel as ipp
def get_usage():
"""return a dict of usage info for this process"""
import socket
import psutil
from datetime import datetime
hn = socket.gethostname()
p = psutil.Process()
mem = p.memory_percent()
cpu = p.cpu_percent(interval=0.1)
return {
'date': datetime.utcnow(),
'cpu': cpu,
'mem': mem,
'hostname': hn,
}
@ipp.interactive
def publish_data_forever(get_usage, interval):
"""Forever, call get_usage and publish the data via datapub
This will be available on all AsyncResults as `ar.data`
"""
from threading import Thread
import time
def main():
from ipyparallel.datapub import publish_data
while not globals().get('stop_publishing', False):
publish_data(get_usage())
time.sleep(interval)
Thread(target=main, daemon=True).start()
Connect to our client
rc = ipp.Client()
dv = rc[:]
view = rc.load_balanced_view()
dv.apply_async(get_usage).get_dict()
{0: {'cpu': 0.3, 'date': datetime.datetime(2018, 6, 29, 20, 56, 2, 204485), 'hostname': 'touchy.local', 'mem': 0.3194570541381836}, 1: {'cpu': 0.3, 'date': datetime.datetime(2018, 6, 29, 20, 56, 2, 204486), 'hostname': 'touchy.local', 'mem': 0.31845569610595703}, 2: {'cpu': 0.4, 'date': datetime.datetime(2018, 6, 29, 20, 56, 2, 185678), 'hostname': 'touchy.local', 'mem': 0.3191232681274414}, 3: {'cpu': 0.3, 'date': datetime.datetime(2018, 6, 29, 20, 56, 2, 204999), 'hostname': 'touchy.local', 'mem': 0.31888484954833984}}
Start reporting usage in a thread
from functools import partial
dv.apply_sync(partial(publish_data_forever, get_usage), 2)
[None, None, None, None]
Our work task that has bursts of cpu usage
def burn_randomly_forever(n=10):
import numpy as np
for i in range(n):
a = np.random.random((1024, 1024))
b = np.random.random((1024, 1024))
tic = time.time()
while time.time() - tic < 2:
c = a.dot(b)
time.sleep(1)
def print_usage(ar):
"""Nicely print usage data"""
print(
"{:20s} {:32s} {:3s}% {:3s}%".format(
"hostname",
"date",
"CPU",
"MEM",
)
)
for report in ar.data:
if not report:
# no data yet
continue
print(
"{:20s} {:32s} {:3.0f}% {:3.0f}%".format(
report["hostname"],
report["date"].isoformat(),
report["cpu"],
report["mem"],
))
Give it a test:
import time
ar = dv.apply_async(burn_randomly_forever, 5)
while not ar.done():
print_usage(ar)
time.sleep(2)
ar.get()
hostname date CPU% MEM% hostname date CPU% MEM% touchy.local 2018-06-29T20:56:57.210105 76% 1% touchy.local 2018-06-29T20:56:57.202633 89% 1% touchy.local 2018-06-29T20:56:57.219408 84% 1% touchy.local 2018-06-29T20:56:57.202631 88% 1% hostname date CPU% MEM% touchy.local 2018-06-29T20:56:59.319295 0% 1% touchy.local 2018-06-29T20:56:59.307491 0% 1% touchy.local 2018-06-29T20:56:59.320750 0% 1% touchy.local 2018-06-29T20:56:59.307491 0% 1% hostname date CPU% MEM% touchy.local 2018-06-29T20:57:01.423370 85% 1% touchy.local 2018-06-29T20:57:01.411284 67% 1% touchy.local 2018-06-29T20:57:01.423499 85% 1% touchy.local 2018-06-29T20:57:01.411973 82% 1% hostname date CPU% MEM% touchy.local 2018-06-29T20:57:03.527549 73% 1% touchy.local 2018-06-29T20:57:03.513848 82% 1% touchy.local 2018-06-29T20:57:03.528478 71% 1% touchy.local 2018-06-29T20:57:03.516548 62% 1% hostname date CPU% MEM% touchy.local 2018-06-29T20:57:05.629291 0% 1% touchy.local 2018-06-29T20:57:05.620618 0% 1% touchy.local 2018-06-29T20:57:05.633550 0% 1% touchy.local 2018-06-29T20:57:05.620569 0% 1% hostname date CPU% MEM% touchy.local 2018-06-29T20:57:07.735505 62% 1% touchy.local 2018-06-29T20:57:07.729172 71% 1% touchy.local 2018-06-29T20:57:07.742521 62% 1% touchy.local 2018-06-29T20:57:07.729172 52% 1% hostname date CPU% MEM% touchy.local 2018-06-29T20:57:09.841484 55% 1% touchy.local 2018-06-29T20:57:09.830631 64% 1% touchy.local 2018-06-29T20:57:09.844362 50% 1% touchy.local 2018-06-29T20:57:09.835502 38% 1%
[None, None, None, None]
Tell the publisher threads to stop:
%px stop_publishing = True