This notebook showcases a pipeline that uses ServiceX for data delivery to coffea with Dask scaling. It is based on this CMS Open Data ttbar analysis with significant simplifications.
import asyncio
import time
import awkward as ak
from coffea.processor import servicex
from func_adl import ObjectStream
from func_adl_servicex import ServiceXSourceUpROOT
import hist
import matplotlib.pyplot as plt
from servicex import ServiceXDataset
Configuration options: enable / disable dask
and the use of caching with ServiceX
(to force re-running transforms).
# enable Dask
USE_DASK = True
# ServiceX behavior: ignore cache with repeated queries
SERVICEX_IGNORE_CACHE = True
The processor used here: select jets with $p_T > 25$ GeV and calculate $\textrm{H}_\textrm{T}^{\textrm{had}}$ (scalar sum of jet $p_T$) as observable.
class TtbarAnalysis(servicex.Analysis):
def __init__(self):
self.hist = hist.Hist.new.Reg(50, 0, 1000, name="ht", label="HT").Weight()
def process(self, events):
histogram = self.hist.copy()
# select jets with pT > 25 GeV
selected_jets = events.jet[events.jet.pt > 25]
# use HT (scalar sum of jet pT) as observable
ht = ak.sum(selected_jets.pt, axis=-1)
histogram.fill(ht=ht, weight=1.0)
return histogram
def postprocess(self, accumulator):
return accumulator
Specify which data to process, using a small public file here taken from 2015 CMS Open Data.
# input data to process
fileset = {
"ttbar": {
"files": ["https://xrootd-local.unl.edu:1094//store/user/AGC/datasets/RunIIFall15MiniAODv2/TT_TuneCUETP8M1_13TeV-powheg-pythia8/MINIAODSIM//PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext3-v1/00000/00DF0A73-17C2-E511-B086-E41D2D08DE30.root"],
"metadata": {}
}
}
Set up the query: only requesting specific columns here without any filtering applied.
def get_query(source: ObjectStream) -> ObjectStream:
"""Query for event / column selection: no filter, select single jet column
"""
return source.Select(lambda e: {"jet_pt": e.jet_pt})
The following cell is mostly boilerplate, which can hopefully be improved in the future.
def make_datasource(fileset:dict, name: str, query: ObjectStream, ignore_cache: bool):
"""Creates a ServiceX datasource for a particular Open Data file."""
datasets = [ServiceXDataset(fileset[name]["files"], backend_name="uproot", ignore_cache=ignore_cache)]
return servicex.DataSource(
query=query, metadata=fileset[name]["metadata"], datasets=datasets
)
async def produce_all_histograms(fileset, query, procesor_class, use_dask=False, ignore_cache=False):
"""Runs the histogram production, processing input files with ServiceX and
producing histograms with coffea.
"""
# create the query
ds = ServiceXSourceUpROOT("cernopendata://dummy", "events", backend_name="uproot")
ds.return_qastle = True
data_query = query(ds)
# executor: local or Dask
if not use_dask:
executor = servicex.LocalExecutor()
else:
executor = servicex.DaskExecutor(client_addr="tls://localhost:8786") # set up for coffea-casa
datasources = [
make_datasource(fileset, ds_name, data_query, ignore_cache=ignore_cache)
for ds_name in fileset.keys()
]
# create the analysis processor
analysis_processor = procesor_class()
async def run_updates_stream(accumulator_stream):
"""Run to get the last item in the stream"""
coffea_info = None
try:
async for coffea_info in accumulator_stream:
pass
except Exception as e:
raise Exception(f"Failure while processing {name}") from e
return coffea_info
output = await asyncio.gather(
*[
run_updates_stream(executor.execute(analysis_processor, source))
for source in datasources
]
)
return output
Execute everything: query ServiceX
, which sends columns back to coffea
processors asynchronously, collect the aggregated histogram built by coffea
.
t0 = time.time()
output = await produce_all_histograms(
fileset, get_query, TtbarAnalysis, use_dask=USE_DASK, ignore_cache=SERVICEX_IGNORE_CACHE
)
print(f"execution took {time.time()-t0:.2f} seconds")
[https://xrootd-loca...: 0%| | 0/9000000000.0 [00:00]
execution took 13.40 seconds
output[0].plot(label="ttbar")
plt.legend();