import ipyparallel as ipp
rc = ipp.Client()
v = rc.load_balanced_view()
The first step is to define a JOB_DEADLINE constant in the engines at startup.
This should identify when the job will be killed by the scheduler.
In this case, I'm using a WALL_SECONDS
placeholder env.
%pycat ~/.ipython/profile_default/ipengine_config.py
c.IPEngineApp.startup_command = """ import time # discover wall time from environment somehow JOB_DEADLINE = time.monotonic() + int(os.environ.get('WALL_SECONDS') or 300) """
%px JOB_DEADLINE - time.monotonic()
Out[0:22]: 964.6923915249936
Out[1:22]: -91.68114140800026
Out[2:22]: -91.68072056400706
Out[3:22]: -91.68094777798979
Out[4:22]: -91.6809879700013
Out[5:22]: -91.68255755599239
Out[6:22]: -91.67973450399586
Out[7:22]: -91.68247941001027
The second step is to define a function that will return True if there are enough seconds left to complete a job, and False if there are not.
def check_time_remaining(seconds):
"""Return true if there's at least `seconds` before JOB_DEADLINE"""
import time
return time.monotonic() + seconds < JOB_DEADLINE
JOB_DEADLINE = time.monotonic() + 60
check_time_remaining(30)
True
check_time_remaining(90)
False
To test this, we artificially set some deadlines with 30 seconds remaining:
%px JOB_DEADLINE = time.monotonic() + 30
%px JOB_DEADLINE - time.monotonic()
Out[0:24]: 29.97752740100259
Out[1:24]: 29.977728619996924
Out[2:24]: 29.97821459399711
Out[3:24]: 29.978383469002438
Out[4:24]: 29.980074470993713
Out[5:24]: 29.977392746004625
Out[6:24]: 29.97598537900194
Out[7:24]: 29.97550065800897
The @ipp.depend
decorator takes a function that returns a boolean,
and ensures that tasks will only run on engines where that funcion returns True.
Here, we only ask for five seconds and every engine has 30 seconds remaining, so it will run anywhere:
@ipp.depend(check_time_remaining, 5)
def task(ignored):
import os
return os.getpid()
v.map_sync(task, range(5))
[55317, 55308, 55320, 55307, 55311]
Now we are going to give engine 0 lots more time and ask for a minute. The result is that the task will only ever be assigned to engine 0.
rc[0]['JOB_DEADLINE'] = time.monotonic() + 1000
%px JOB_DEADLINE - time.monotonic()
Out[0:25]: 999.9920492689998
Out[1:25]: 29.91646959100035
Out[2:25]: 29.917046825998113
Out[3:25]: 29.91725869999209
Out[4:25]: 29.918654893001076
Out[5:25]: 29.917170721993898
Out[6:25]: 29.913648749003187
Out[7:25]: 29.914027128004818
Now a task that requires 60 seconds remaining will only be assigned to engine 0:
@ipp.depend(check_time_remaining, 60)
def task(ignored):
import os
return os.getpid()
v.map_sync(task, range(5))
[55302, 55302, 55302, 55302, 55302]
You can also accomplisht he same thing this by raising a special UnmetDependency error
anywhere in your task to cause the scheduler to reassign to another engine,
without using @ipp.depend
:
def task(ignored):
# check the deadline
import ipyparallel as ipp
import time
if time.monotonic() + 60 > JOB_DEADLINE:
# raising UnmetDependency will result in the scheduler
#
raise ipp.UnmetDependency()
import os
return os.getpid()
v.map_sync(task, range(5))
[55302, 55302, 55302, 55302, 55302]