#!/usr/bin/env python # coding: utf-8 # # Managing job deadlines in IPython Parallel # # For https://github.com/ipython/ipyparallel/issues/277 # In[1]: import ipyparallel as ipp rc = ipp.Client() v = rc.load_balanced_view() # The first step is to define a JOB_DEADLINE constant # in the engines at startup. # # This should identify when the job will be killed by the scheduler. # In this case, I'm using a `WALL_SECONDS` placeholder env. # In[2]: get_ipython().run_line_magic('pycat', '~/.ipython/profile_default/ipengine_config.py') # In[3]: get_ipython().run_line_magic('px', 'JOB_DEADLINE - time.monotonic()') # The second step is to define a function that will return True # if there are enough seconds left to complete a job, # and False if there are not. # In[4]: def check_time_remaining(seconds): """Return true if there's at least `seconds` before JOB_DEADLINE""" import time return time.monotonic() + seconds < JOB_DEADLINE # In[5]: JOB_DEADLINE = time.monotonic() + 60 check_time_remaining(30) # In[6]: check_time_remaining(90) # To test this, we artificially set some deadlines with 30 seconds remaining: # In[7]: get_ipython().run_line_magic('px', 'JOB_DEADLINE = time.monotonic() + 30') get_ipython().run_line_magic('px', 'JOB_DEADLINE - time.monotonic()') # The `@ipp.depend` decorator takes a function that returns a boolean, # and ensures that tasks will only run on engines where that funcion returns True. # # Here, we only ask for five seconds and every engine has 30 seconds remaining, # so it will run anywhere: # In[8]: @ipp.depend(check_time_remaining, 5) def task(ignored): import os return os.getpid() v.map_sync(task, range(5)) # Now we are going to give engine 0 lots more time and ask for a minute. # The result is that the task will only ever be assigned to engine 0. # In[9]: rc[0]['JOB_DEADLINE'] = time.monotonic() + 1000 get_ipython().run_line_magic('px', 'JOB_DEADLINE - time.monotonic()') # Now a task that requires 60 seconds remaining will only be assigned to engine 0: # In[10]: @ipp.depend(check_time_remaining, 60) def task(ignored): import os return os.getpid() v.map_sync(task, range(5)) # You can also accomplisht he same thing this by raising a special UnmetDependency error # anywhere in your task to cause the scheduler to reassign to another engine, # without using `@ipp.depend`: # In[12]: def task(ignored): # check the deadline import ipyparallel as ipp import time if time.monotonic() + 60 > JOB_DEADLINE: # raising UnmetDependency will result in the scheduler # raise ipp.UnmetDependency() import os return os.getpid() v.map_sync(task, range(5))