This tutorial shows you how to analyze datasets using RDataFrame
from a Python notebook, offloading computations to a Spark cluster. The example analysis performs the following steps:
This material is based on the analysis done by Stefan Wunsch, available here in CERN's Open Data portal.
This notebook has been tested using the following configurations on CERN SWAN:
# This import gives access to all the ROOT C++ code from Python
import ROOT
Welcome to JupyROOT 6.26/08
In addition, there is a package called PyRDF
- soon to be integrated in ROOT - that implements a layer on top of RDataFrame
. PyRDF
allows analyses written with RDataFrame
to be executed on a set of distributed resources.
In this notebook, we are going to use the Spark backend of PyRDF
to submit RDataFrame
computations to an external Spark cluster. Note that the connection to the cluster needs to be done prior to the execution of this notebook (click on the ?
button on the top bar for help on how to connect to Spark clusters from SWAN).
import PyRDF
# Select Spark backend, partition the dataset in 16 fragments when running distributedly
PyRDF.use("spark", {'npartitions': '16'})
# Use RDataFrame through PyRDF
from PyRDF import RDataFrame
The creation of the ROOT dataframe and the construction of the computation graph via PyRDF
are done in the exact same way as with plain RDataFrame
from Python. In practice, this means that the same code can be executed locally or distributedly just by switching the backend we would like to use.
Let's see that in action. First we will create a ROOT dataframe that is connected to a dataset named Events
stored in a ROOT file. The file is pulled in via XRootD from EOS public, but note how it could also be stored in your CERNBox space or in any other EOS repository accessible from SWAN (e.g. the experiment ones).
The dataset Events
is a TTree
(a "table" in first order) and has the following branches (also referred to as "columns"):
Branch name | Data type | Description |
---|---|---|
nMuon |
unsigned int |
Number of muons in this event |
Muon_pt |
float[nMuon] |
Transverse momentum of the muons stored as an array of size nMuon |
Muon_eta |
float[nMuon] |
Pseudo-rapidity of the muons stored as an array of size nMuon |
Muon_phi |
float[nMuon] |
Azimuth of the muons stored as an array of size nMuon |
Muon_charge |
int[nMuon] |
Charge of the muons stored as an array of size nMuon and either -1 or 1 |
Muon_mass |
float[nMuon] |
Mass of the muons stored as an array of size nMuon |
df = RDataFrame("Events", "root://eospublic.cern.ch//eos/opendata/cms/derived-data/AOD2NanoAODOutreachTool/Run2012BC_DoubleMuParked_Muons.root")
The full dataset contains half a year of CMS data taking in 2012 with 61 mio events. That would be a bit long to process if we just used the few cores provided by the SWAN session itself. Luckily, we can offload our computations to a Spark cluster and get the result back for inspection in the notebook.
Remember this rule of thumb:
RDataFrame
computation on a small subset of your input dataset. The resources of the SWAN session (a few cores and GBs of RAM) should be enough for that purpose. At this stage, RDataFrame::Range
comes in handy to execute on less data.RDataFrame
via PyRDF
and select the Spark backend. Since the computations are offloaded to a Spark cluster, you won't be overloading your SWAN session.# No need to select a subset of the dataset, we've got distributed resources!
# df_range = df.Range(1000000)
Physics datasets are often general purpose datasets and therefore need extensive filtering of the events for the actual analysis. Here, we implement only a simple selection based on the number of muons and the charge to cut down the dataset in events that are relevant for our study.
In particular, we are applying two filters to keep:
df_2mu = df.Filter("nMuon == 2", "Events with exactly two muons")
df_oc = df_2mu.Filter("Muon_charge[0] != Muon_charge[1]", "Muons with opposite charge")
Since we want to see the resonances in the mass spectrum, where dimuon events are more likely, we need to compute the invariant mass from the four-vectors of the muon candidates. Because this operation is non-trivial, we will use ROOT's ROOT::VecOps::InvariantMass
function to do the job for us.
The invariant mass will be stored in a new column that we will create with the Define
operation of RDataFrame
. The parameters of Define
are the name of the new column and a string with the function to be invoked, which includes the dataset columns to be used as arguments for the such function.
Note how, even though ROOT::VecOps::InvariantMass
is a C++ function, we can use it in our RDataFrame
workflow from Python: the second parameter of Define
is a string that contains C++ code. Such code will be just-in-time compiled by ROOT and called during the event loop in C++. This allows to benefit from C++ performance in computationally-expensive code even when programming in Python.
df_mass = df_oc.Define("Dimuon_mass", "ROOT::VecOps::InvariantMass(Muon_pt, Muon_eta, Muon_phi, Muon_mass)")
As (almost) always in physics, we have a look at the results in the form of a histogram. Let's book a histogram as one endpoint of our computation graph.
nbins = 30000
low = 0.25
up = 300
h = df_mass.Histo1D(("Dimuon_mass", "Dimuon_mass", nbins, low, up), "Dimuon_mass")
Now, the computation graph is set up. Next, we want to have a look at the result.
Note that the event loop actually runs the first time we try to access the histogram object (results of an RDataFrame
graph are computed lazily). Therefore, since we selected the Spark backend, the offloading of the computations happens in the cell below. You should see a monitoring display appearing in the output of the cell that tells you about the progress of the Spark job.
%%time
measures the time spend in the full cell. You can compare it with how long it takes to run locally over just 1 mio. events!
%%time
ROOT.gStyle.SetOptStat(0); ROOT.gStyle.SetTextFont(42)
c = ROOT.TCanvas("c", "", 800, 700)
c.SetLogx(); c.SetLogy()
h.SetTitle("")
h.GetXaxis().SetTitle("m_{#mu#mu} (GeV)"); h.GetXaxis().SetTitleSize(0.04)
h.GetYaxis().SetTitle("N_{Events}"); h.GetYaxis().SetTitleSize(0.04)
h.Draw()
label = ROOT.TLatex(); label.SetNDC(True)
label.SetTextSize(0.040); label.DrawLatex(0.100, 0.920, "#bf{CMS Open Data}")
label.SetTextSize(0.030); label.DrawLatex(0.500, 0.920, "#sqrt{s} = 8 TeV, L_{int} = 11.6 fb^{-1}");
[Stage 0:=====================================================> (15 + 1) / 16]
CPU times: user 555 ms, sys: 109 ms, total: 664 ms Wall time: 39.3 s
<cppyy.gbl.TLatex object at 0xbb294f0>
ROOT provides interactive JavaScript graphics for Jupyter, which can be activated with the %jsroot
magic. Click and drag on the axis to zoom in and double click to reset the view.
%jsroot on
c.Draw()
spark.stop()