PyHEP 2020: Columnar Analysis at Scale with Coffea

Authors

Presenter: Mat Adamec (UNL)

coffea:
DOI
  • Lindsey Gray, Matteo Cremonesi, Bo Jayatilaka, Oliver Gutsche, Nick Smith, Allison Hall, Kevin Pedro, Maria Acosta (FNAL); Andrew Melo (Vanderbilt); Stefano Belforte (INFN); and others
  • In collaboration with IRIS-HEP members: Jim Pivarski (Princeton), Ben Galewsky (NCSA), Mark Neubauer (UIUC)

coffea-casa:
  • Ken Bloom, Oksana Shadura (UNL); Garhan Attebury, Carl Lundstedt, Derek Weitzel (UNL-HCC); Mátyás Selmeci (UWM); Brian Bockelman (Morgridge Institute)


Introduction

This tutorial is intended to be a walkthrough of the foundations of an analysis implemented in Coffea (specifically, single top-Higgs production, https://doi.org/10.1103/PhysRevD.99.092005). We will go through data selection using a columnar representation, go over some edge cases to that representation, and plot some relevant data. At the end, we will wrap everything together and demonstrate how Coffea can be easily deployed for scaling an analysis up. This is also a prime opportunity to show off a prototype analysis facility being developed by the coffea-casa team, which was presented yesterday by Oksana Shadura ("A prototype U.S. CMS analysis facility").

Review

For the purposes of this tutorial, we will load in a sample file used in the analysis, as well as NanoEvents from the Coffea package. A NanoEvents tutorial was given on Monday by Nick Smith, but I will give a quick review of its basics.

In [ ]:
import numpy as np
import awkward as ak
from coffea.nanoaod import NanoEvents
from uproot_methods import TLorentzVectorArray
import time

file = 'root://hcc-stash.unl.edu:1094//osgconnect/public/dweitzel/coffea-casa/38E83594-51BD-7D46-B96D-620DD60078A7.root'
events = NanoEvents.from_file(file, entrystop=100000)

NanoEvents lazily reads file into a jagged array structure (specifically, an awkward array), events. We can poke around events by looking at the .columns attribute, which tells us all of the possible fields we can access.

In [ ]:
print(events)
print('\n')
print(events.columns)

These fields can be accessed as attributes of the event array. For example, to peek at Muon:

In [ ]:
print(events.Muon)
print(events.Muon.size)

A note about structure: notice that events.Muon is jagged; it is an array of subarrays, which have arbitrary length (they can even be empty)! The only thing that's necessary is that each event has one subarray. As we can see above, in our example some subarrays have one muon, some have two, some have none, some even have four; the total, however, is 100919 muons, spread across 100000 subarrays.

The Muon array also has further depth to it (most event columns do!) We can look at its columns just as we did with the events array, and we can access any of them as an attribute of events.Muon. As an example, pt:

In [ ]:
print(events.Muon.columns)
print('\n')
print(events.Muon.pt)

It's important that you feel comfortable accessing data within the events array as a basis for the rest of this tutorial. Please, take a moment and play around with it!

In [ ]:
 

Now, we can access data, but how do we manipulate it? Possibly the most critical component of columnar analysis is the concept of masking. A mask is an array of booleans, which another array can be masked by, thus accepting or rejecting elements of that array based on whether the corresponding mask element is True or False. A basic example below:

In [ ]:
data = np.array(['a', 'b', 'd', 'c'])
mask = np.array([True, True, False, True])

data[mask]

We can generate a mask by executing a conditional statement involving any of our awkward arrays. Let's say we want to select muons whose pt > 40:

In [ ]:
muons = events.Muon

print('Unmasked data:')
print(muons.pt)

print('\nMask of data:')
print(muons.pt > 40)

print('\nMasked data:')
print(muons.pt[muons.pt > 40])

We will employ masking in our analysis to reject leptons and events which do not meet the cuts that we desire.

Lepton Selection

Let's now begin with some lepton selectionn, which lends itself towards a simple columnar implementation. We will begin with muons, which have relatively straight-forward cuts, and show off some more complexity with electrons. All tables of cuts are taken from the single top-Higgs analysis mentioned above.

Muon Selection

How would we make these cuts with our columnar tools? An easy three-step plan:
  1. generate a mask for each condition
  2. combine them together into a single mask
  3. mask the muon array with the final mask
In practice, we can skip the second step by stringing our conditionals together with &'s. A quick warning: for masks in Coffea, '&' should always be used over 'and,' as the latter is considered ambiguous. This is an underlying quirk of numpy.

The only intermediate step is figuring out which columns you need and how they are structured; I've left in an extra cell if you want to poke at any specific ones.

In [ ]:
loose_muons = muons[(np.abs(muons.eta) < 2.4) &
                    (muons.pt > 5) &
                    (np.abs(muons.dxy) < 0.05) &
                    (np.abs(muons.dz) < 0.1) &
                    (muons.sip3d < 8) &
                    (muons.miniPFRelIso_all < 0.4) &
                    (muons.looseId)]
                    # Note that no other cuts are necessary for loose muons!

# To construct the tight selection, we may as well make use of the loose cut, since they overlap.
tight_muons = loose_muons[(loose_muons.pt > 15) &
                         (loose_muons.mediumId) &
                         (loose_muons.tightCharge > 1) &
                         (loose_muons.mvaTTH > 0.9)]
In [ ]:
print(muons.columns)
print('\n')
print(muons.mediumId)

Let's do the same thing for electrons, which are slightly complicated because some of the cuts fall into a set of two ranges: 0 < |$\eta$| < 1.479 and 1.479 < |$\eta$| < 2.5. These are listed respectively in parentheses: (range 1, range 2).

Electron Selection

A cell has been provided, again, for column-prodding purposes.

In [ ]:
electrons = events.Electron

# This ends up getting referenced a lot. Let's not generate it every time!
abs_eta = np.abs(electrons.eta)

loose_electrons = electrons[(abs_eta < 2.5) &
                           (electrons.pt > 7) &
                           (np.abs(electrons.dxy) < 0.05) &
                           (np.abs(electrons.dz) < 0.1) &
                           (electrons.miniPFRelIso_all < 0.4) &
                           (electrons.lostHits < 2) &
                           # The scary one, the only one with two ranges in loose selection. 
                           (((electrons.mvaFall17V2noIso > 0) & (abs_eta < 1.479)) |
                            ((electrons.mvaFall17V2noIso > 0.7) & (abs_eta > 1.479) & (abs_eta < 2.5)))]

# Again, all tight leptons pass the loose cut, so we may as well cut down on passing the same cuts twice.
# But we'll have to redefine abs_eta again!
abs_eta = np.abs(loose_electrons.eta)

tight_electrons = loose_electrons[(loose_electrons.pt > 15) &
                                 (loose_electrons.lostHits == 0) &
                                 (loose_electrons.tightCharge > 1) &
                                 (loose_electrons.convVeto) &
                                 (loose_electrons.mvaTTH > 0.90) &
                                 # Two ranges for sigma_ieie.
                                 (((abs_eta < 1.479) & (loose_electrons.sieie < 0.011)) | 
                                 ((abs_eta < 2.5) & (abs_eta > 1.479) & (loose_electrons.sieie < 0.03))) &
                                 # Two ranges for H/E
                                 (((abs_eta < 1.479) & (loose_electrons.hoe < 0.1)) | 
                                 ((abs_eta < 2.5) & (abs_eta > 1.479) & (loose_electrons.hoe < 0.07))) &
                                 # Two ranges for 1/E - 1/p
                                 (((abs_eta < 1.479) & (loose_electrons.eInvMinusPInv < 0.01) & (loose_electrons.eInvMinusPInv > -0.05)) |
                                 ((abs_eta < 2.5) & (abs_eta > 1.479) & (loose_electrons.eInvMinusPInv < 0.005) & (loose_electrons.eInvMinusPInv > -0.05)))]
In [ ]:
print(events.Electron.columns)
print('\n')
print(events.Electron.lostHits)

Event Selections

Okay, now that we've selected some good leptons, let's follow the event selection as prescribed:

In order to simplify some of these complex cuts, we're just going to go step-by-step in generating masks for each condition, and then we'll combine them all at the end. We begin with the jet cuts. Note that we're only interested in having at least one jet passing or failing the respective working points, so while any conditionals we make are per-jet, what we actually want is a per-event mask. We can sum the Trues in each subarray and make sure there is at least one, since they are equivalent to 1 (and False is 0).

In [ ]:
loose_wp = 0.5425
medium_wp = 0.8484
        
jets = events.Jet

jetMask_medium = ((np.abs(jets.eta) < 2.4) & (jets.pt > 25) &
                  (jets.btagCSVV2 > medium_wp))
print('Jet Level (all jets)')
print(jetMask_medium)
jetMask_medium = jetMask_medium.sum() >= 1
print('\nEvent Level (at least one jet)')
print(jetMask_medium)

jetMask_loose = ((((np.abs(jets.eta) < 2.4) & (jets.pt > 25)) | ((jets.pt > 40) & (np.abs(jets.eta) > 2.4))) &
                 (jets.btagCSVV2 <= loose_wp)).sum() >= 1

Okay, the next challenge we come to is making sure there are "no loose leptons with $m_{ll} < 12 GeV$." A problem immediately arises: how do we work with all pairs of leptons? We have an array of muons, we have an array of electrons, but we don't have a singular Lepton object. A columnar solution would be to construct such an object, while adding in an additional index variable to identify muons and electrons apart from each other, so that they remain distinct.

Before we do so, we should take a glance at the structure of awkward arrays. Note that .content returns a flat array of data from all events, and .offsets returns the index by which the entry in the flat array is offset by in the jagged array. You can similarly use .starts and .stops to see the indices where subarrays start and end, but .offsets is more succinct.

In [ ]:
print('Jagged Array:')
print(muons.pt)
print('\nFlat Content Array:')
print(muons.pt.content)
print('\nFlat Offsets Array:')
print(muons.pt.offsets)

So, what would a Lepton jagged array look like? Well, we'll need to replicate the above structure. To do so, we'll need both our data and some indices.

In [ ]:
# This is a helper function which adds muon (0) and electron (1) identifiers to a stacked lepton JaggedArray.
def make_labeled_p4(x, indices, itype):
    p4 = TLorentzVectorArray.from_ptetaphim(x.pt, x.eta, x.phi, x.mass)
    return ak.JaggedArray.zip(p4=p4,
                              flavor=itype*x.pt.ones_like().astype(np.int),
                              pidx=indices,
                              charge=x.charge)

# This generates a stacked lepton JaggedArray, allowing combination of both muons and electrons for computations across flavor.
def stack_leptons(muons, electrons):
    # Construct new lepton indices within every event array.
    muons_indices = ak.JaggedArray.fromoffsets(muons.pt.offsets, 
                                             np.arange(0, muons.pt.content.size)) - muons.pt.offsets[:-1]
    electrons_indices = ak.JaggedArray.fromoffsets(electrons.pt.offsets, 
                                             np.arange(0, electrons.pt.content.size)) - electrons.pt.offsets[:-1]
    # Assign 0/1 value depending on whether lepton is muon/electron.
    muons_p4 = make_labeled_p4(muons, muons_indices, 0)
    electrons_p4 = make_labeled_p4(electrons, electrons_indices, 1)
    # Concatenate leptons, per-subarray/per-event (axis 1).
    stacked_p4 = ak.concatenate((muons_p4, electrons_p4), axis=1)
    
    return stacked_p4

Let's take a look at this in practice. Note that the leptons array is, from this view, a little bit ambiguous, but we can still recover whether an object is a Muon or Electron because we've added that identity as a 0 (for muon) or 1 (for electron) in flavor.

In [ ]:
loose_leptons = stack_leptons(loose_muons, loose_electrons)
print('Muons:')
print(loose_muons)
print('\nElectrons:')
print(loose_electrons)
print('\nLeptons:')
print(loose_leptons)
print('\nLepton Types:')
print(loose_leptons.flavor)

Now, let's return to our selection! Recall that our goal is to have "no loose leptons with $m_{ll} < 12 GeV$." Okay; all we need is to generate all pairs within our loose_leptons array and ensure they meet our mass requirement.

Luckily, Coffea comes with combinatorics tools that help us with the "get all pairs" portion. In particular, leptons.choose(2) will generate all possible pairs of leptons in each event. The first of this pair is then accessible to us through the attribute .i0, and the second through .i1.

In [ ]:
loose_pairs = loose_leptons.choose(2)
print('Leptons:')
print(loose_leptons)
print('\nLepton Pairs:')
print(loose_pairs)
print('\nFirst of Pair:')
print(loose_pairs.i0)
print('\nSecond of Pair:')
print(loose_pairs.i1)
In [ ]:
loose_pairs_mass = (loose_pairs.i0.p4 + loose_pairs.i1.p4).mass
looseMask = ((loose_pairs_mass >= 12).all() & (loose_pairs_mass.counts > 0))

For fun, let's compare this to an explicit solution, using an event loop. By breaking apart our loose muons and electrons into the constituent parts of their four-momentum arrays, we can run a loop through every event, concatenate each part of the four-momentum, and have four concatenated arrays of lepton four-momenta constituents for each event. We can then check that these arrays meet our conditions, and generate a mask by looping through every event. It is much less pretty, but you can see that it gives the same result.

In [ ]:
from itertools import combinations

# This is a helper function which adds up the mass of two 4-vectors based on their components.
def massAddition(l1_px, l1_py, l1_pz, l1_E,
                 l2_px, l2_py, l2_pz, l2_E):
    return np.sqrt((l1_E + l2_E)**2 - 
                  ((l1_px + l2_px)**2 + 
                   (l1_py + l2_py)**2 + 
                   (l1_pz + l2_pz)**2))

def looseFilter(loose_e_starts, loose_e_stops, loose_mu_starts, loose_mu_stops,
               loose_e_px, loose_e_py, loose_e_pz, loose_e_E,
               loose_mu_px, loose_mu_py, loose_mu_pz, loose_mu_E):
    
    # All events pass by default, if they do not, we turn the 1's to 0's.
    final_mask = np.ones(loose_e_starts.size, dtype=np.bool_)
    
    # With the starts and stops, we can get each event's data per-event, and work with concatenated e's and mu's.
    for e_start, e_stop, mu_start, mu_stop, i in zip(loose_e_starts, loose_e_stops, loose_mu_starts, loose_mu_stops, range(0, loose_e_starts.size)):
        event_E = np.concatenate((loose_e_E[e_start:e_stop], loose_mu_E[mu_start:mu_stop]))
        event_px = np.concatenate((loose_e_px[e_start:e_stop], loose_mu_px[mu_start:mu_stop]))
        event_py = np.concatenate((loose_e_py[e_start:e_stop], loose_mu_py[mu_start:mu_stop]))
        event_pz = np.concatenate((loose_e_pz[e_start:e_stop], loose_mu_pz[mu_start:mu_stop]))
        # This handles empty events.
        if event_E.size <= 1:
            final_mask[i] = 0
        # Generate every pair, check that their combined mass is greater than 12 (if not, change 1 to 0 in mask)
        for combination in np.array(list(combinations(np.argsort(event_E), 2))):
            if 12 > (massAddition(event_px[combination[0]], event_py[combination[0]], event_pz[combination[0]], event_E[combination[0]],
                           event_px[combination[1]], event_py[combination[1]], event_pz[combination[1]], event_E[combination[1]])):
                final_mask[i] = 0
    return final_mask

loopMask = looseFilter(loose_electrons.starts, loose_electrons.stops, loose_muons.starts, loose_muons.stops,
            loose_electrons.x.content, loose_electrons.y.content, loose_electrons.z.content, loose_electrons.energy.content,
            loose_muons.x.content, loose_muons.y.content, loose_muons.z.content, loose_muons.energy.content)

(looseMask == loopMask).all()

Okay, back to the columnar representation. There are still selections we need to make for the dilepton and three-lepton channels. A reminder:

We will now implement the rest of these, making use of our stacked array object.

In [ ]:
# Dilepton selection.
tight_leptons = stack_leptons(tight_muons, tight_electrons)
tight_pairs = tight_leptons.choose(2)

# We want exactly one same-sign pair.
dileptonMask = ((tight_pairs.counts == 1) & (tight_pairs.i0.charge == tight_pairs.i1.charge)).any()

# We want at least one lepton to have pt > 25. They have pt > 15 through the lepton selection.
dileptonMask = dileptonMask & ((tight_pairs.i0.p4.pt > 25) | (tight_pairs.i1.p4.pt > 25)).any()

# Okay, now we need to identify ee pairs, and check that their separation from the Z mass is sufficient.
# Our flavor tracker helps us with this. If we add the flavor of a pair, then since e = 1, mu = 0, 2 indicates ee.
tight_types = (tight_pairs.i0.flavor + tight_pairs.i1.flavor)
dileptonMask = dileptonMask & ((tight_types < 2) | (np.abs((tight_pairs.i0.p4 + tight_pairs.i1.p4).mass - 91.18) > 10)).any()
        
    
    
# Trilepton selection.
tight_triples = tight_leptons.choose(3)

# We want exactly three leptons in each event, or one triplet.
trileptonMask = (tight_triples.counts == 1)

# Again, we want at least one lepton in the triplet to have pt > 25.
trileptonMask = trileptonMask & ((tight_triples.i0.p4.pt > 25) | (tight_triples.i1.p4.pt > 25) | (tight_triples.i2.p4.pt > 25)).any()

# Then we want to discard OSSF pairs that are close to the Z mass. We don't need our stacked array for this.
tight_muon_pairs = tight_muons.choose(2)
tight_electron_pairs = tight_electrons.choose(2)

# Takes a pair, tells you if it is an OSSF with |m_ll - m_z| < 15. Reject these events.
def ossf_zmass_check(dileptons):
    return ((dileptons.i0.charge != dileptons.i1.charge) & 
            (np.abs(91.18 - (dileptons.i0 + dileptons.i1).mass) < 15.)).any()

trileptonMask = trileptonMask & (~ossf_zmass_check(tight_muon_pairs)) & (~ossf_zmass_check(tight_electron_pairs))

And that gives us masks for all of the conditions we need. We can now go back to our friendly arrays, put all of our masks together, and apply them to our leptons with ease, getting a dilepton and a three-lepton array for each lepton.

In [ ]:
dilepton_event = (looseMask) & (dileptonMask) & (jetMask_medium) & (jetMask_loose)
trilepton_event = (looseMask) & (trileptonMask) & (jetMask_medium) & (jetMask_loose)

tight_ll_electrons = tight_electrons[dilepton_event]
tight_lll_electrons = tight_electrons[trilepton_event]
tight_ll_muons = tight_muons[dilepton_event]
tight_lll_muons = tight_muons[trilepton_event]

From here, we can break our results naturally into different channels in a columnar fashion. For example, mumu events are simply those where tight_ll_muons.counts == 2, as the dileptonMask guarantees events with 2 leptons, and 2 muons thus implies 0 electrons.

In [ ]:
mumu_channel = tight_ll_muons[tight_ll_muons.counts == 2]
ee_channel = tight_ll_electrons[tight_ll_electrons.counts == 2]

Plotting

On to some visuals! Coffea has a Hist class dedicated to histogramming, which will be comfortable and familiar to use if you've ever touched Matplotlib. We can use this class, again with some minor columnar manipulations, to plot some parameters of interest. Let's stick to the mumu and ee channels for this, since they're already defined above.

Let's try something simple: a plot of the pt of the 2nd-highest-pt lepton in each event. Because awkward arrays are naturally sorted from highest to lowest pt, this'll just involve slicing an array to get the second muon in each event.

In [ ]:
from coffea import hist

# For all event pts (:), get second index (1). Also, removed jagged structure to get flat array.
mumu_pt2 = mumu_channel.pt[:, 1]
ee_pt2 = ee_channel.pt[:, 1]

second_pt = hist.Hist("Events", 
                      hist.Cat(name='channel', label='Channel'), 
                      hist.Bin(name='pt2', label='Second pT', n_or_arr=20, lo= 0, hi=150))

second_pt.fill(channel='mumu', pt2=mumu_pt2)
second_pt.fill(channel='ee', pt2=ee_pt2)

hist.plot1d(second_pt, overlay='channel')

How about something with a little more topology? Let's plot "$\Delta \eta$ between the max-$\eta$ light jet and the max-pt bjet." In our case, a light jet will be defined as one that fails loose_wp and a bjet will be one that passes medium_wp. For simplicity, we will stay strictly in the mumu channel.

In [ ]:
mumu_jets = jets[dilepton_event & (tight_muons.counts == 2)]

light_jets = mumu_jets[mumu_jets.btagCSVV2 <= loose_wp]
bjets = mumu_jets[mumu_jets.btagCSVV2 > medium_wp]

Now, this cut is simplified slightly by the fact that we only desire one of each jet type in an event. We can thus do a simple array selection to get our max-$\eta$ light jet and our max-pt bjet, then do a .cross() between them to generate the pairs (and filter out empty events, though our event selection should disallow this). A .cross() is like a .choose(), but it generates pairs across arrays, rather than within one.

In [ ]:
# argmax() returns the index of the maximum-valued element. Just another way to get max for an unordered array.
max_light_jets = light_jets[light_jets.eta.argmax()]
max_bjets = bjets[bjets.pt.argmax()] 

cross_jets = max_bjets.cross(max_light_jets)
print('Max-pt bjets:')
print(max_bjets)
print('\nMax-Eta Light Jets:')
print(max_light_jets)
print('\nCrossed:')
cross_jets

And now all we have to do is find the difference in $\eta$ between the values of the pair. The respective pair-wise elements can once again be accessed by .i0 and .i1. We take their difference (which is taking the difference of their Lorentz vector) and then ask for $\eta$.

In [ ]:
# For all event pts (:), get second index (1). Also, removed jagged structure to get flat array.
mumu_fwdJet_leadbJet = np.abs((cross_jets.i0 - cross_jets.i1).eta)

mumu_etadiff = hist.Hist("Events",
                      hist.Cat(name='channel', label='Channel'),
                      hist.Bin(name='etadiff', label=r'$\Delta \eta$ between the max-$\eta$ light jet and the max-pt bjet', n_or_arr=20, lo=0, hi=6))

mumu_etadiff.fill(channel='mumu', etadiff=mumu_fwdJet_leadbJet.flatten())
hist.plot1d(mumu_etadiff, overlay='channel')

Demo

Alright, we've seen a lot of neat columnar tools, but it doesn't end there! Coffea also makes scale-out easy. All we have to do is bundle up our work in the process.ProcessorABC class, and we can feed in however many files we desire (which is, probably, more than 1) with ease. This class also features several executors which allow deployment on Dask, Spark, and Parsl, as well as your local machine.

To end this tutorial, we will diverge from this notebook a little bit, as I will demonstrate a run of the analysis on a Dask cluster; specifically, the analysis facility that Oksana Shadura gave a talk about yesterday. If you want to look at, use as a reference, or play with my analysis later, you can run the analysis file that I've provided, which uses the futures_executor to execute locally. You will, however, need a CMS certificate to be able to access its datasets. Failure to do so will result in the oh-so-familiar OSError: [FATAL] Invalid address.

Acknowledgements

These projects are supported by National Science Foundation grants ACI-1450323 (through DIANA-HEP), OAC-1836650 (through IRIS-HEP), and PHY-1624356 (through U.S. CMS).

In [ ]: