# early imports
import IPython
import numpy as np
# setup notebook extensions
!jupyter nbextension enable splitcell/splitcell
!jupyter nbextension enable rise/main
Enabling notebook extension splitcell/splitcell... - Validating: OK Enabling notebook extension rise/main... - Validating: OK
Design Pattern for Analysis Automation on Interchangeable,
Distributed Resources using Luigi Analysis Workflows
Scale: Measure of resource consumption and amount of data to be processed
Complexity: Measure of granularity and inhomogeneity of workloads
→ Decisions should not dictate code design!
→ Workflow often not documented,
only exists in the physicists head!
→ Daily working environment must provide preservation features out-of-the-box!
# analysis.py
import luigi
class Inference(luigi.Task):
split = luigi.ChoiceParameter(default="test", choices=("test", "valid", "train"))
# parameters are translated into command-line interface arguments
def requires(self):
from mva import MVAEvaluation
return MVAEvaluation(split=self.split) # pass parameters upstream
def output(self):
return luigi.LocalTarget(f"data_{self.split}.h5") # encode *significant* parameters into output path
def run(self):
outp = self.output() # this is the LocalTarget from above
inp = self.input() # this is output() of MVAEvaluation
do_whathever_an_inference_does(inp.path, outp.path)
make
-like execution system¶luigi --module analysis Inference \
--split test \
--other-parameters ...
Target
's existluigi is a perfect tool to model complex workflows, simple structure, easy to extend
law extends luigi (i.e. it does not replace it)
Main goal: decouple algorithm code from
1. run locations,
2. storage locations, and
3. software environments
Provides a toolbox to follow a design pattern
No constraints on data format, language, ...
No fixation on dedicated resources
Not a framework
Before we start, import luigi and law, and load IPython magics to execute tasks from within notebooks:
# basic imports
import luigi
import law
import json
# load law ipython magics
law.contrib.load("ipython")
law.ipython.register_magics(log_level="INFO")
# drop-in replacement for base task with some interactive features
Task = law.ipython.Task
INFO: law.contrib.ipython.magic - magics successfully registered: %law, %ilaw
IPython magics:
%ilaw
runs a task inside the current session%law
runs a task as a subprocess (not used in this notebook)class TimesTwo(Task):
n = luigi.IntParameter() # no default!
def output(self):
return law.LocalFileTarget(
f"data/n{self.n}.json")
def run(self):
# method 1: the verbose way
output = self.output()
output.parent.touch() # creates the data/ dir
# define data to save
# note: self.n is the value of the "n" parameter
# and != self.__class__.n (parameter instance!)
data = {"in": self.n, "out": self.n * 2}
# pythonic way to save data
with open(output.path, "w") as f:
json.dump(data, f, indent=4)
%ilaw run TimesTwo --n 5
--print-status <tree_depth>
¶%ilaw run TimesTwo --n 5 --print-status 0
!cat data/n5.json
--remove-output <tree_depth>
¶%ilaw run TimesTwo --n 5 --remove-output 0
!cat data/n5.json
class TimesTwo(Task):
n = luigi.IntParameter() # no default!
def output(self):
return law.LocalFileTarget(
f"data/n{self.n}.json")
def run(self):
# method 1: using target *formatters*
data = {"in": self.n, "out": self.n * 2}
self.output().dump(data, formatter="json",
indent=4)
# all arguments passed to "dump" implementation
# variety of available formatters: yaml, numpy,
# h5py, root, matplotlib, tensorflow, keras,
# coffea, zip, tar, pickle, ...
%ilaw run TimesTwo --n 6
Idea: work with remote files / directories
as if they were local
Remote Target
s based on GFAL2 Python bindings, supports all WLCG protocols (dCache, XRootD, GridFTP, SRM, ...) + DropBox
Implement identical target API
Automatic retries
Round-robin (over different doors)
Local caching
Configure DropBox access in law.cfg:
[dropbox]
base: dropbox://dropbox.com/my_dir
app_key: ...
app_secret: ...
access_token: ...
# load the dropbox and numpy contrib packages
law.contrib.load("dropbox", "numpy")
# define a directory
my_dir = law.dropbox.DropboxFileTarget("/")
# save a numpy array in a new file
my_file = my_dir.child("data.npz", type="f")
my_file.dump(np.zeros((10, 20)), formatter="numpy")
# directory listing
my_dir.listdir() # -> ["data.npz"]
# load the data again
zeros = my_file.load(formatter="numpy")
# play around with objects
my_file.parent == my_dir # -> True
my_dir.child("other.txt", type="f").touch()
my_file.sibling("other.txt").exists() # > True
See examples/dropbox_targets for examples and more infos on configuring access to your DropBox.
Idea: submission built into tasks, no need to write extra code
Currently supported job systems:
HTCondor, LSF, gLite, ARC, (Slurm)
Automatic resubmission
Full job control (# tasks per job, # parallel jobs, # of job retries, early stopping, ... )
Dashboard interface
A Workflow
task has branches, defined in create_branch_map
:
--branch <n>
to the command line to run a certain branch locallyThe task to the right has 26 branches (branch 0 to 25), with each branch writing one character of the alphabet into a file.
class CreateChars(Task, law.LocalWorkflow, HTCondorWorkflow):
def create_branch_map(self):
# map branch numbers 0-25 to
# ascii numbers 97-122 (= a-z)
return {
i: num for i, num in
enumerate(range(97, 123))
}
def output(self):
return law.LocalFileTarget(
f"output_{self.branch}.json")
def run(self):
# branch_data holds the integer number to convert
char = chr(self.branch_data)
# write the output
self.output().dump({"char": char})
law run CreateChars --branch 0
would write {"char": "a"}
into output_0.json
.law run CreateChars
would run all branches locally.Checkout examples/htcondor_at_cern.
Diverging software requirements between workloads is a great feature / challenge / problem
Introduce sandboxing:
"Run entire task in different environment"
Existing sandbox implementations:
import os
class SandboxExample(Task, law.SandboxTask):
sandbox = "bash::test_env1.sh"
# for docker container, use
# sandbox = "docker::image_name"
# for singularity container, use
# sandbox = "singularity::image_name"
def output(self):
return law.LocalFileTarget("data/sandbox_variable.txt")
# the run method is encapsulated
def run(self):
value = os.getenv("MY_VARIABLE")
print(f"MY_VARIABLE: {value}")
self.output().dump(value, formatter="text")
!cat test_env1.sh
#!/usr/bin/env bash export MY_VARIABLE="foo"
%ilaw run SandboxExample
# mockup output, please run via command line :)
INFO: luigi-interface - Informed scheduler that task SandboxExample__99914b932b has status PENDING INFO: luigi-interface - Done scheduling tasks INFO: luigi-interface - Running Worker with 1 processes INFO: luigi-interface - [pid 3238510] Worker Worker(salt=533918042, workers=1, host=lxplus806.cern.ch, username=mrieger, pid=3238510) running SandboxExample() INFO: luigi-interface - [pid 3238510] Worker Worker(salt=533918042, workers=1, host=lxplus806.cern.ch, username=mrieger, pid=3238510) done SandboxExample() INFO: luigi-interface - Informed scheduler that task SandboxExample__99914b932b has status DONE INFO: luigi-interface - Worker Worker(salt=533918042, workers=1, host=lxplus806.cern.ch, username=mrieger, pid=3238510) was stopped. Shutting down Keep-Alive thread INFO: luigi-interface - ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 SandboxExample(...) This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
======================== entering sandbox ======================== sandbox: bash::test_env1.sh task : SandboxExample__99914b932b ================================================================== MY_VARIABLE: foo ======================== leaving sandbox ========================= sandbox: bash::test_env1.sh task : SandboxExample__99914b932b ==================================================================