pyhf
¶For the sake of brevity and time, we won't go into a full discussion of what pyhf
is and what you can do with it. For now we'll point you to the latest pyhf
tutorial for pyhf
v0.6.2
as well as our vCHEP 2021 talk: Distributed statistical inference with pyhf
enabled through funcX
.
Very shortly though, pyhf
is a pure-Python implimentation of the HistFactory
family of statistical models that through optional computational backends like JAX provides autodifferentiation and hardware acceleration on GPUs. pyhf
is part of Scikit-HEP and is designed to have a clear Pythonic API with the goal of making it easier and clearer to produce and interpret binned models.
Taking an example from the pyhf
project README
, this is all the code that is needed to build a simple 1-bin model and then to perform a hypothesis test scan across multiple parameters of interest (POIs), plot those results, and inverting that determine the 95% CL upper limit on the POI value.
import matplotlib.pyplot as plt
import numpy as np
import pyhf
from pyhf.contrib.viz import brazil
pyhf.set_backend("numpy")
model = pyhf.simplemodels.uncorrelated_background(
signal=[10.0], bkg=[50.0], bkg_uncertainty=[7.0]
)
data = [55.0] + model.config.auxdata
poi_vals = np.linspace(0, 5, 41)
results = [
pyhf.infer.hypotest(
test_poi, data, model, test_stat="qtilde", return_expected_set=True
)
for test_poi in poi_vals
]
fig, ax = plt.subplots()
fig.set_size_inches(7, 5)
plot = brazil.plot_results(poi_vals, results, ax=ax)
obs_limit, exp_limits, (scan, results) = pyhf.infer.intervals.upperlimit(
data, model, poi_vals, return_results=True
)
print(f"observed limit: {obs_limit}")
The important part to emphasize for the purposes of this notebook talk though is just that pyhf
allows for statistical modelling of binned models and allows for fast fitting using Pythonic APIs.
funcX
is a high-performance Function as a Service (FaaS) platformfuncX
endpoints are logical entities that represent a specified computer resource.
funcX
service to dispatch user defined functions to resources for executionThe agent handles:
We'll see a bit more in a little bit
funcX
¶from time import sleep
import funcx
from funcx.sdk.client import FuncXClient
With the funcx-endpoint
CLI API
! funcx-endpoint --help
you need to create a template environment for your endpoint.
$ funcx-endpoint configure pyhf
Which will create a default funcX
configuration file at ~/.funcx/pyhf/config.py
.
funcX
requires the use of Gloubs and so will require you to first login to a Globus account to use the funcx-sdk
. Globus allows authentication through existing organizational logins or through Google accounts or ORCID iD so this shouldn't be a barrier to use.
2. Once you authenticate with Globus you'll then need to approve the funcx-sdk
's required permissions and you'll be given a time limited authorization code.
3. Copy this code and paste it back into your terminal you ran funcx-endpoint configure pyhf
in where you're asked to "Please Paste your Auth Code Below"
Upon success you'll see
A default profile has been create for <pyhf> at /home/jovyan/.funcx/pyhf/config.py
Configure this file and try restarting with:
$ funcx-endpoint start pyhf
If you're following along you'll want to switch over to a terminal to make this part easier
! echo "funcx-endpoint configure pyhf"
! ls -l ~/.funcx/pyhf/config.py
! cat ~/.funcx/pyhf/config.py
We'll go a step further though and use a prepared funcX
configuration found under funcX/binder-config.py
.
! cp funcX/binder-config.py ~/.funcx/pyhf/config.py
and look at it again
! cat ~/.funcx/pyhf/config.py
Let's break down some relevant information from Parsl
block
: Basic unit of resources acquired from a providermax_blocks
: Maximum number of blocks that can be active per executornodes_per_block
: Number of nodes requested per blockparallelism
: Ratio of task execution capacity to the sum of running tasks and available tasksAnd let's quickly consider this example from the Parsl docs that funcX
extends
from parsl.config import Config
from libsubmit.providers.local.local import Local
from parsl.executors import HighThroughputExecutor
config = Config(
executors=[
HighThroughputExecutor(
label='local_htex',
workers_per_node=2,
provider=Local(
min_blocks=1,
init_blocks=1,
max_blocks=2,
nodes_per_block=1,
parallelism=0.5
)
)
]
)
What's happening in the GIF above:
9
taks to compute4
tasks) reached5
: First block full and 5/9
> parallelism
so Parsl provisions a new block for executing the remaining tasksOkay, now we'll start the endpoint
! funcx-endpoint start pyhf
and you can verify that it is registered and up
! funcx-endpoint list
N.B.: You'll want to take careful note of this uuid
as this is the endpoint ID that you'll have your funcX
code use.
A good way to deal with this is to save it in a endpoint_id.txt
file that is ignored from version control.
! funcx-endpoint list | grep pyhf | awk '{print $(NF-1)}' > endpoint_id.txt
! cat endpoint_id.txt
To keep this as easy as possible to follow along with, we've done something that isn't very practical: We setup our funcx
endpoint locally (this is probably not where your dedicate compute will be, but for demonstration purposes we'll pretend that our funcx-endpoint
lives on another machine/cluster someplace).
Locally we can now write our code that we'd like funcX
to run for us as functions (remember FaaS)
def simple_example(backend="numpy", test_poi=1.0):
import time
import pyhf
pyhf.set_backend(backend)
tick = time.time()
model = pyhf.simplemodels.uncorrelated_background(
signal=[12.0, 11.0], bkg=[50.0, 52.0], bkg_uncertainty=[3.0, 7.0]
)
data = model.expected_data(model.config.suggested_init())
return {
"cls_obs": float(
pyhf.infer.hypotest(test_poi, data, model, test_stat="qtilde")
),
"fit-time": time.time() - tick,
}
The return is just a dict
of the observed $\mathrm{CL}_{s}$ value and the time to fit
simple_example()
we can then initalize our local funcX
client and register our function with it for execution
# Initialize funcX client
fxc = FuncXClient()
fxc.max_requests = 200
# register functions
infer_func = fxc.register_function(simple_example)
With our functions registered we can now have the funcx
client serialize and send them to the funcx
endpoint (which can be on any machine anywhere!) to be sent out to the funcx
worker nodes on the execution machine
with open("endpoint_id.txt") as endpoint_file:
pyhf_endpoint = str(endpoint_file.read().rstrip())
# Serialize and send to funcX ednpoint to run
task_id = fxc.run(
backend="numpy", test_poi=1.0, endpoint_id=pyhf_endpoint, function_id=infer_func
)
While that runs, we can now start to send queries from our local submission machine to the (remote) execution machine and check to see if the tasks we've submitted have finished execution
# wait for it to run. Here this is super fast, but you'd want to setup a loop to check periodically
sleep(1)
# retrieve output
result = fxc.get_result(task_id)
result
# Run a different test POI
task_id = fxc.run(
backend="numpy", test_poi=2.0, endpoint_id=pyhf_endpoint, function_id=infer_func
)
sleep(0.01)
try:
result = fxc.get_result(task_id)
except Exception as excep:
print(f"inference: {excep}")
sleep(2)
result = fxc.get_result(task_id)
result
To stop a funcX endpoint from running simple use the funcx-endpoint
CLI API again
! funcx-endpoint stop pyhf
! funcx-endpoint list