import numpy as np
%matplotlib inline
from coffea import hist
import coffea.processor as processor
import awkward as ak
from coffea.nanoevents import schemas
# This program plots an event-level variable (in this case, MET, but switching it is as easy as a dict-key change). It also demonstrates an easy use of the book-keeping cutflow tool, to keep track of the number of events processed.
# The processor class bundles our data analysis together while giving us some helpful tools. It also leaves looping and chunks to the framework instead of us.
class Processor(processor.ProcessorABC):
def __init__(self):
# Bins and categories for the histogram are defined here. For format, see https://coffeateam.github.io/coffea/stubs/coffea.hist.hist_tools.Hist.html && https://coffeateam.github.io/coffea/stubs/coffea.hist.hist_tools.Bin.html
dataset_axis = hist.Cat("dataset", "")
MET_axis = hist.Bin("MET", "MET [GeV]", 50, 0, 100)
# The accumulator keeps our data chunks together for histogramming. It also gives us cutflow, which can be used to keep track of data.
self._accumulator = processor.dict_accumulator({
'MET': hist.Hist("Counts", dataset_axis, MET_axis),
'cutflow': processor.defaultdict_accumulator(int)
})
@property
def accumulator(self):
return self._accumulator
def process(self, events):
output = self.accumulator.identity()
# This is where we do our actual analysis. The dataset has columns similar to the TTree's; events.columns can tell you them, or events.[object].columns for deeper depth.
dataset = events.metadata["dataset"]
MET = events.MET.pt
# We can define a new key for cutflow (in this case 'all events'). Then we can put values into it. We need += because it's per-chunk (demonstrated below)
output['cutflow']['all events'] += ak.size(MET)
output['cutflow']['number of chunks'] += 1
# This fills our histogram once our data is collected. The hist key ('MET=') will be defined in the bin in __init__.
output['MET'].fill(dataset=dataset, MET=MET)
return output
def postprocess(self, accumulator):
return accumulator
from dask.distributed import Client
client = Client("tcp://127.0.0.1:36157")
client
Client-e9018bf0-8b2d-11ec-8904-82674678ea37
Connection method: Direct | |
Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status |
Scheduler-fa970486-b334-458d-93f6-e1fcf5106709
Comm: tcp://127.0.0.1:36157 | Workers: 1 |
Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status | Total threads: 4 |
Started: 2 hours ago | Total memory: 15.70 GiB |
Comm: tcp://127.0.0.1:34225 | Total threads: 4 |
Dashboard: /user/oksana.shadura@cern.ch/proxy/46647/status | Memory: 15.70 GiB |
Nanny: tcp://127.0.0.1:36243 | |
Local directory: /home/cms-jovyan/dask-worker-space/worker-crip7cn_ | |
Tasks executing: 0 | Tasks in memory: 1 |
Tasks ready: 0 | Tasks in flight: 0 |
CPU usage: 2.0% | Last seen: Just now |
Memory usage: 518.93 MiB | Spilled bytes: 0 B |
Read bytes: 41.93 kiB | Write bytes: 57.77 kiB |
import os, shutil
import uproot
import awkward as ak
if not os.path.isfile("/mnt/cephfs/Run2012B_SingleMu/Run2012B_SingleMu.parquet"):
ak.to_parquet(
uproot.lazy("nano_dy.root:Events"),
"Run2012B_SingleMu.parquet",
list_to32=True,
use_dictionary=False,
compression="GZIP",
compression_level=1,
)
if not os.path.isdir("/mnt/cephfs/nanoevents/Run2012B_SingleMu"):
os.makedirs("/mnt/cephfs/nanoevents/Run2012B_SingleMu")
shutil.copyfile('Run2012B_SingleMu.parquet', '/mnt/cephfs/Run2012B_SingleMu/Run2012B_SingleMu.parquet')
fileset = {'SingleMu' : ["/mnt/cephfs/Run2012B_SingleMu/Run2012B_SingleMu.parquet"]}
run = processor.Runner(executor=processor.DaskExecutor(client=client),
schema=schemas.NanoAODSchema,
savemetrics=True,
use_skyhook=True,
skyhook_options = {"ceph_config_path": "/opt/ceph/ceph.conf", "ceph_data_pool": "cephfs_data0"},
format="parquet")
output, metrics = run(fileset, "Events", processor_instance=Processor())
metrics
--------------------------------------------------------------------------- FileNotFoundError Traceback (most recent call last) /tmp/ipykernel_2308/2740435347.py in <module> 8 format="parquet") 9 ---> 10 output, metrics = run(fileset, "Events", processor_instance=Processor()) 11 12 metrics /opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py in __call__(self, fileset, treename, processor_instance) 1344 events_total = sum(len(c) for c in chunks) 1345 else: -> 1346 chunks = [c for c in chunks] 1347 1348 exe_args = { /opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py in <listcomp>(.0) 1344 events_total = sum(len(c) for c in chunks) 1345 else: -> 1346 chunks = [c for c in chunks] 1347 1348 exe_args = { /opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py in _chunk_generator(self, fileset, treename) 1125 dataset_filelist_map = {} 1126 for dataset, basedir in fileset.items(): -> 1127 ds_ = ds.dataset(basedir, format="parquet") 1128 dataset_filelist_map[dataset] = ds_.files 1129 chunks = [] /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes) 659 elif isinstance(source, (tuple, list)): 660 if all(_is_path_like(elem) for elem in source): --> 661 return _filesystem_dataset(source, **kwargs) 662 elif all(isinstance(elem, Dataset) for elem in source): 663 return _union_dataset(source, **kwargs) /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes) 399 400 if isinstance(source, (list, tuple)): --> 401 fs, paths_or_selector = _ensure_multiple_sources(source, filesystem) 402 else: 403 fs, paths_or_selector = _ensure_single_source(source, filesystem) /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _ensure_multiple_sources(paths, filesystem) 318 continue 319 elif file_type == FileType.NotFound: --> 320 raise FileNotFoundError(info.path) 321 elif file_type == FileType.Directory: 322 raise IsADirectoryError( FileNotFoundError: /mnt/cephfs/Run2012B_SingleMu/Run2012B_SingleMu.parquet
distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client _GatheringFuture exception was never retrieved future: <_GatheringFuture finished exception=CancelledError()> asyncio.exceptions.CancelledError
# Generates a 1D histogram from the data output to the 'MET' key. fill_opts are optional, to fill the graph (default is a line).
hist.plot1d(output['MET'], overlay='dataset', fill_opts={'edgecolor': (0,0,0,0.3), 'alpha': 0.8})
# Easy way to print all cutflow dict values. Can just do print(output['cutflow']["KEY_NAME"]) for one.
for key, value in output['cutflow'].items():
print(key, value)