This is a rendered copy of processor.ipynb. You can optionally run it interactively on binder at this link
Coffea relies mainly on uproot to provide access to ROOT files for analysis.
As a usual analysis will involve processing tens to thousands of files, totalling gigabytes to terabytes of data, there is a certain amount of work to be done to build a parallelized framework to process the data in a reasonable amount of time. Of course, one can work directly within uproot to achieve this, as we'll show in the beginning, but coffea provides the coffea.processor
module, which allows users to worry just about the actual analysis code and not about how to implement efficient parallelization, assuming that the parallization is a trivial map-reduce operation (e.g. filling histograms and adding them together). The module provides the following key features:
ProcessorABC
abstract base class that can be derived from to implement the analysis code;accumulate()
utility to reduce the outputs to a single result, as showin in the accumulators notebook tutorial; andLet's start by writing a simple processor class that reads some CMS open data and plots a dimuon mass spectrum. We'll start by copying the ProcessorABC skeleton and filling in some details:
flag
, as we won't use itBaseSchema
interpretation (the files used here could be read with NanoAODSchema
but we want to show how to build vector objects from other TTree formats)import hist
import dask
import awkward as ak
import hist.dask as hda
import dask_awkward as dak
from coffea import processor
from coffea.nanoevents.methods import candidate
from coffea.dataset_tools import (
apply_to_fileset,
max_chunks,
preprocess,
)
from distributed import Client
client = Client()
class MyProcessor(processor.ProcessorABC):
def __init__(self):
pass
def process(self, events):
dataset = events.metadata['dataset']
muons = ak.zip(
{
"pt": events.Muon_pt,
"eta": events.Muon_eta,
"phi": events.Muon_phi,
"mass": events.Muon_mass,
"charge": events.Muon_charge,
},
with_name="PtEtaPhiMCandidate",
behavior=candidate.behavior,
)
h_mass = (
hda.Hist.new
.StrCat(["opposite", "same"], name="sign")
.Log(1000, 0.2, 200., name="mass", label="$m_{\mu\mu}$ [GeV]")
.Int64()
)
cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) == 0)
# add first and second muon in every event together
dimuon = muons[cut][:, 0] + muons[cut][:, 1]
h_mass.fill(sign="opposite", mass=dimuon.mass)
cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) != 0)
dimuon = muons[cut][:, 0] + muons[cut][:, 1]
h_mass.fill(sign="same", mass=dimuon.mass)
return {
dataset: {
"entries": ak.num(events, axis=0),
"mass": h_mass,
}
}
def postprocess(self, accumulator):
pass
If we were to just use bare uproot to execute this processor, we could do that with the following example, which:
BaseSchema
(roughly equivalent to the output of uproot.lazy
)MyProcessor
instanceprocess()
function, which returns our accumulatorsfrom coffea.nanoevents import NanoEventsFactory, BaseSchema
filename = "file://Run2012B_DoubleMuParked.root"
events = NanoEventsFactory.from_root(
{filename: "Events"},
steps_per_file=2_000,
metadata={"dataset": "DoubleMuon"},
schemaclass=BaseSchema,
).events()
p = MyProcessor()
out = p.process(events)
(computed,) = dask.compute(out)
print(computed)
/Users/saransh/Code/HEP/coffea/.env/lib/python3.11/site-packages/coffea/nanoevents/factory.py:299: RuntimeWarning: You have set steps_per_file to 2000, this should only be used for a small number of inputs (e.g. for early-stage/exploratory analysis) since it does not inform dask of each chunk lengths at creation time, which can cause unexpected slowdowns at scale. If you would like to process larger datasets please specify steps using the appropriate uproot "files" specification: https://github.com/scikit-hep/uproot5/blob/v5.1.2/src/uproot/_dask.py#L109-L132. warnings.warn(
{'DoubleMuon': {'entries': 26084708, 'mass': Hist( StrCategory(['opposite', 'same'], name='sign'), Regular(1000, 0.2, 200, transform=log, name='mass', label='$m_{\\mu\\mu}$ [GeV]'), storage=Int64()) # Sum: 12819609.0 (12835141.0 with flow)}}
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
computed["DoubleMuon"]["mass"].plot1d(ax=ax)
ax.set_xscale("log")
ax.legend(title="Dimuon charge")
<matplotlib.legend.Legend at 0x29782ba10>
One could expand on this code to run over several chunks of the file, setting entry_start
and entry_stop
as appropriate. Then, several datasets could be processed by iterating over several files. However, the dask.compute
and coffea.dataset_tools
can help with this! We can preprocess
multiple files and then use our custom MyProcessor
class to generate the relevant dask task graph. Finally, the result can be obtained by calling dask.compute
. Since these files are very large, we limit to just reading the first few chunks of events from each dataset with maxchunks
.
fileset = {
'DoubleMuon': {
"files": {
'file://Run2012B_DoubleMuParked.root': "Events",
'file://Run2012C_DoubleMuParked.root': "Events",
}
},
'ZZ to 4mu': {
"files": {
'file://ZZTo4mu.root': "Events"
}
}
}
dataset_runnable, dataset_updated = preprocess(
fileset,
align_clusters=False,
step_size=100_000,
files_per_batch=1,
skip_bad_files=True,
save_form=False,
)
to_compute = apply_to_fileset(
MyProcessor(),
max_chunks(dataset_runnable, 300),
schemaclass=BaseSchema,
)
(out,) = dask.compute(to_compute)
print(out)
{'DoubleMuon': {'DoubleMuon': {'entries': 56084708, 'mass': Hist( StrCategory(['opposite', 'same'], name='sign'), Regular(1000, 0.2, 200, transform=log, name='mass', label='$m_{\\mu\\mu}$ [GeV]'), storage=Int64()) # Sum: 28258324.0 (28290486.0 with flow)}}, 'ZZ to 4mu': {'ZZ to 4mu': {'entries': 1499064, 'mass': Hist( StrCategory(['opposite', 'same'], name='sign'), Regular(1000, 0.2, 200, transform=log, name='mass', label='$m_{\\mu\\mu}$ [GeV]'), storage=Int64()) # Sum: 288707.0 (289326.0 with flow)}}}
The run may depend on how many cores are available on the machine you are running this notebook and your connection to eospublic.cern.ch
.
fig, ax = plt.subplots(figsize=(10, 6))
out["DoubleMuon"]["DoubleMuon"]["mass"].plot1d(ax=ax)
ax.set_xscale("log")
ax.legend(title="Dimuon charge")
<matplotlib.legend.Legend at 0x297ca0710>
Let's flesh out this analysis into a 4-muon analysis, searching for diboson events:
from collections import defaultdict
import numba
@numba.njit
def find_4lep_kernel(events_leptons, builder):
"""Search for valid 4-lepton combinations from an array of events * leptons {charge, ...}
A valid candidate has two pairs of leptons that each have balanced charge
Outputs an array of events * candidates {indices 0..3} corresponding to all valid
permutations of all valid combinations of unique leptons in each event
(omitting permutations of the pairs)
"""
for leptons in events_leptons:
builder.begin_list()
nlep = len(leptons)
for i0 in range(nlep):
for i1 in range(i0 + 1, nlep):
if leptons[i0].charge + leptons[i1].charge != 0:
continue
for i2 in range(nlep):
for i3 in range(i2 + 1, nlep):
if len({i0, i1, i2, i3}) < 4:
continue
if leptons[i2].charge + leptons[i3].charge != 0:
continue
builder.begin_tuple(4)
builder.index(0).integer(i0)
builder.index(1).integer(i1)
builder.index(2).integer(i2)
builder.index(3).integer(i3)
builder.end_tuple()
builder.end_list()
return builder
def find_4lep(events_leptons):
if ak.backend(events_leptons) == "typetracer":
# here we fake the output of find_4lep_kernel since
# operating on length-zero data returns the wrong layout!
ak.typetracer.length_zero_if_typetracer(events_leptons.charge) # force touching of the necessary data
return ak.Array(ak.Array([[(0,0,0,0)]]).layout.to_typetracer(forget_length=True))
return find_4lep_kernel(events_leptons, ak.ArrayBuilder()).snapshot()
class FancyDimuonProcessor(processor.ProcessorABC):
def process(self, events):
dataset_axis = hist.axis.StrCategory([], growth=True, name="dataset", label="Primary dataset")
mass_axis = hist.axis.Regular(300, 0, 300, name="mass", label=r"$m_{\mu\mu}$ [GeV]")
pt_axis = hist.axis.Regular(300, 0, 300, name="pt", label=r"$p_{T,\mu}$ [GeV]")
h_nMuons = hda.Hist(
dataset_axis,
hda.hist.hist.axis.IntCategory(range(6), name="nMuons", label="Number of good muons"),
storage="weight", label="Counts",
)
h_m4mu = hda.hist.Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
h_mZ1 = hda.hist.Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
h_mZ2 = hda.hist.Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
h_ptZ1mu1 = hda.hist.Hist(dataset_axis, pt_axis, storage="weight", label="Counts")
h_ptZ1mu2 = hda.hist.Hist(dataset_axis, pt_axis, storage="weight", label="Counts")
cutflow = defaultdict(int)
dataset = events.metadata['dataset']
muons = ak.zip({
"pt": events.Muon_pt,
"eta": events.Muon_eta,
"phi": events.Muon_phi,
"mass": events.Muon_mass,
"charge": events.Muon_charge,
"isolation": events.Muon_pfRelIso03_all,
}, with_name="PtEtaPhiMCandidate", behavior=candidate.behavior)
# make sure they are sorted by transverse momentum
muons = muons[ak.argsort(muons.pt, axis=1)]
cutflow['all events'] = ak.num(muons, axis=0)
# impose some quality and minimum pt cuts on the muons
muons = muons[
(muons.pt > 5)
& (muons.isolation < 0.2)
]
cutflow['at least 4 good muons'] += ak.sum(ak.num(muons) >= 4)
h_nMuons.fill(dataset=dataset, nMuons=ak.num(muons))
# reduce first axis: skip events without enough muons
muons = muons[ak.num(muons) >= 4]
# find all candidates with helper function
fourmuon = dak.map_partitions(find_4lep, muons)
fourmuon = [muons[fourmuon[idx]] for idx in "0123"]
fourmuon = ak.zip({
"z1": ak.zip({
"lep1": fourmuon[0],
"lep2": fourmuon[1],
"p4": fourmuon[0] + fourmuon[1],
}),
"z2": ak.zip({
"lep1": fourmuon[2],
"lep2": fourmuon[3],
"p4": fourmuon[2] + fourmuon[3],
}),
})
cutflow['at least one candidate'] += ak.sum(ak.num(fourmuon) > 0)
# require minimum dimuon mass
fourmuon = fourmuon[(fourmuon.z1.p4.mass > 60.) & (fourmuon.z2.p4.mass > 20.)]
cutflow['minimum dimuon mass'] += ak.sum(ak.num(fourmuon) > 0)
# choose permutation with z1 mass closest to nominal Z boson mass
bestz1 = ak.singletons(ak.argmin(abs(fourmuon.z1.p4.mass - 91.1876), axis=1))
fourmuon = ak.flatten(fourmuon[bestz1])
h_m4mu.fill(
dataset=dataset,
mass=(fourmuon.z1.p4 + fourmuon.z2.p4).mass,
)
h_mZ1.fill(
dataset=dataset,
mass=fourmuon.z1.p4.mass,
)
h_mZ2.fill(
dataset=dataset,
mass=fourmuon.z2.p4.mass,
)
h_ptZ1mu1.fill(
dataset=dataset,
pt=fourmuon.z1.lep1.pt,
)
h_ptZ1mu2.fill(
dataset=dataset,
pt=fourmuon.z1.lep2.pt,
)
return {
'nMuons': h_nMuons,
'mass': h_m4mu,
'mass_z1': h_mZ1,
'mass_z2': h_mZ2,
'pt_z1_mu1': h_ptZ1mu1,
'pt_z1_mu2': h_ptZ1mu2,
'cutflow': {dataset: cutflow},
}
def postprocess(self, accumulator):
pass
import time
tstart = time.time()
to_compute = apply_to_fileset(
FancyDimuonProcessor(),
max_chunks(dataset_runnable, 300),
schemaclass=BaseSchema,
)
(out,) = dask.compute(to_compute)
print(out)
elapsed = time.time() - tstart
print(elapsed)
{'DoubleMuon': {'nMuons': Hist( StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'), IntCategory([0, 1, 2, 3, 4, 5], name='nMuons', label='Number of good muons'), storage=Weight()) # Sum: WeightedSum(value=5.59547e+07, variance=5.59547e+07) (WeightedSum(value=5.60847e+07, variance=5.60847e+07) with flow), 'mass': Hist( StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=54219, variance=54219) (WeightedSum(value=60748, variance=60748) with flow), 'mass_z1': Hist( StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=60314, variance=60314) (WeightedSum(value=60748, variance=60748) with flow), 'mass_z2': Hist( StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=60079, variance=60079) (WeightedSum(value=60748, variance=60748) with flow), 'pt_z1_mu1': Hist( StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='pt', label='$p_{T,\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=60736, variance=60736) (WeightedSum(value=60748, variance=60748) with flow), 'pt_z1_mu2': Hist( StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='pt', label='$p_{T,\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=57002, variance=57002) (WeightedSum(value=60748, variance=60748) with flow), 'cutflow': {'DoubleMuon': defaultdict(<class 'int'>, {'all events': dask.awkward<numaxis0, type=Scalar, dtype=int64>, 'at least 4 good muons': dask.awkward<add, type=Scalar, dtype=int64>, 'at least one candidate': dask.awkward<add, type=Scalar, dtype=int64>, 'minimum dimuon mass': dask.awkward<add, type=Scalar, dtype=int64>})}}, 'ZZ to 4mu': {'nMuons': Hist( StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'), IntCategory([0, 1, 2, 3, 4, 5], name='nMuons', label='Number of good muons'), storage=Weight()) # Sum: WeightedSum(value=1.49776e+06, variance=1.49776e+06) (WeightedSum(value=1.49906e+06, variance=1.49906e+06) with flow), 'mass': Hist( StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=79350, variance=79350) (WeightedSum(value=98261, variance=98261) with flow), 'mass_z1': Hist( StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=98198, variance=98198) (WeightedSum(value=98261, variance=98261) with flow), 'mass_z2': Hist( StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=97699, variance=97699) (WeightedSum(value=98261, variance=98261) with flow), 'pt_z1_mu1': Hist( StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='pt', label='$p_{T,\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=98259, variance=98259) (WeightedSum(value=98261, variance=98261) with flow), 'pt_z1_mu2': Hist( StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'), Regular(300, 0, 300, name='pt', label='$p_{T,\\mu}$ [GeV]'), storage=Weight()) # Sum: WeightedSum(value=97998, variance=97998) (WeightedSum(value=98261, variance=98261) with flow), 'cutflow': {'ZZ to 4mu': defaultdict(<class 'int'>, {'all events': dask.awkward<numaxis0, type=Scalar, dtype=int64>, 'at least 4 good muons': dask.awkward<add, type=Scalar, dtype=int64>, 'at least one candidate': dask.awkward<add, type=Scalar, dtype=int64>, 'minimum dimuon mass': dask.awkward<add, type=Scalar, dtype=int64>})}}} 109.41525983810425
nevt = out['ZZ to 4mu']['cutflow']['ZZ to 4mu']['all events'] + out['DoubleMuon']['cutflow']['DoubleMuon']['all events']
print("Events/s:", (nevt / elapsed).compute())
Events/s: 526286.4803794603
What follows is just us looking at the output, you can execute it if you wish
# scale ZZ simulation to expected yield
lumi = 11.6 # 1/fb
zzxs = 7200 * 0.0336**2 # approximate 8 TeV ZZ(4mu)
nzz = out['ZZ to 4mu']['cutflow']['ZZ to 4mu']['all events']
scaled = {}
for (name1, h1), (nam2, h2) in zip(out['ZZ to 4mu'].items(), out['DoubleMuon'].items()):
if isinstance(h1, hist.Hist) and isinstance(h2, hist.Hist):
scaled[name1] = h1.copy() + h2.copy()
scaled[name1].view()[0, :] *= lumi * zzxs / nzz.compute()
fig, ax = plt.subplots()
scaled['nMuons'].plot1d(ax=ax, overlay='dataset')
ax.set_yscale('log')
ax.set_ylim(1, None)
(1, 54961323.59635242)
fig, ax = plt.subplots()
scaled['mass'][:, ::hist.rebin(4)].plot1d(ax=ax, overlay='dataset');
fig, ax = plt.subplots()
scaled['mass_z1'].plot1d(ax=ax, overlay='dataset');
fig, ax = plt.subplots()
scaled['mass_z2'].plot1d(ax=ax, overlay='dataset')
ax.set_xlim(2, 300)
(2.0, 300.0)
fig, ax = plt.subplots()
scaled['pt_z1_mu1'].plot1d(ax=ax, overlay='dataset');
fig, ax = plt.subplots()
scaled['pt_z1_mu2'].plot1d(ax=ax, overlay='dataset');