In this demo, we first show how to use the Arrow Dataset
API SkyhookFileFormat
API to scan parquet files by pushing down scan opertations into Ceph and then we show how to use the Dataset
API to process parquet files containing NanoEvents stored in Ceph in parallel through Coffea using Dask.
We import the Dataset API and the Parquet API from PyArrow.
import pyarrow.dataset as ds
Now, we will instantiate the SkyhookFileFormat
. Upon instantiation, the connection to the Ceph cluster is made under the hood. The connection is closed automatically upon object destruction. The SkyhookFileFormat
API currently takes the Ceph configuration file as input. It inherits from the FileFormat
API and uses the DirectObjectAccess
API under the hood to interact with the underlying objects that make up a file in CephFS. Since, we mount CephFS, we use the FileSystemDataset
that comes out of the box with Apache Arrow for instantiating our dataset, as by mounting CephFS we have just another directory of Parquet files. Having the suitability of using the FileSystemDataset
, we just can start pushing down scan operations to our Parquet files by just plugging in SkyhookFileFormat
in the format paramter.
dataset = ds.dataset("file:///mnt/cephfs/nyc", format=ds.SkyhookFileFormat("parquet", "/etc/ceph/ceph.conf"))
Now we apply some projections and filters on the dataset.
dataset.to_table(columns=["total_amount", "fare_amount"], filter=(ds.field("trip_distance") > 20.0)).to_pandas()
total_amount | fare_amount | |
---|---|---|
0 | 75.84 | 52.00 |
1 | 69.99 | 52.00 |
2 | 59.84 | 53.00 |
3 | 68.50 | 53.50 |
4 | 70.01 | 52.00 |
... | ... | ... |
376 | 78.88 | 67.00 |
377 | 64.84 | 58.50 |
378 | 0.31 | 0.01 |
379 | 58.80 | 57.50 |
380 | 229.80 | 228.50 |
381 rows × 2 columns
We will be using Dask workers for parallel execution. So, let's install it.
!pip3 install dask[distributed]
!pip3 install 'fsspec>=0.3.3'
Requirement already satisfied: dask[distributed] in /usr/local/lib/python3.6/site-packages (2021.3.0) Requirement already satisfied: pyyaml in /usr/lib64/python3.6/site-packages (from dask[distributed]) (3.12) Requirement already satisfied: distributed>=2021.03.0 in /usr/local/lib/python3.6/site-packages (from dask[distributed]) (2021.3.0) Requirement already satisfied: sortedcontainers!=2.0.0,!=2.0.1 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (2.4.0) Requirement already satisfied: click>=6.6 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (8.0.3) Requirement already satisfied: toolz>=0.8.2 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (0.11.2) Requirement already satisfied: tblib>=1.6.0 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (1.7.0) Requirement already satisfied: zict>=0.1.3 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (2.0.0) Requirement already satisfied: psutil>=5.0 in /usr/local/lib64/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (5.8.0) Requirement already satisfied: cloudpickle>=1.5.0 in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (2.0.0) Requirement already satisfied: setuptools in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (57.0.0) Requirement already satisfied: tornado>=5 in /usr/local/lib64/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (6.1) Requirement already satisfied: contextvars in /usr/local/lib/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (2.4) Requirement already satisfied: msgpack>=0.6.0 in /usr/local/lib64/python3.6/site-packages (from distributed>=2021.03.0->dask[distributed]) (1.0.3) Requirement already satisfied: importlib-metadata in /usr/local/lib/python3.6/site-packages (from click>=6.6->distributed>=2021.03.0->dask[distributed]) (4.8.3) Requirement already satisfied: heapdict in /usr/local/lib/python3.6/site-packages (from zict>=0.1.3->distributed>=2021.03.0->dask[distributed]) (1.0.1) Requirement already satisfied: immutables>=0.9 in /usr/local/lib64/python3.6/site-packages (from contextvars->distributed>=2021.03.0->dask[distributed]) (0.16) Requirement already satisfied: typing-extensions>=3.7.4.3 in /usr/local/lib/python3.6/site-packages (from immutables>=0.9->contextvars->distributed>=2021.03.0->dask[distributed]) (4.0.1) Requirement already satisfied: zipp>=0.5 in /usr/local/lib/python3.6/site-packages (from importlib-metadata->click>=6.6->distributed>=2021.03.0->dask[distributed]) (3.6.0) WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv Requirement already satisfied: fsspec>=0.3.3 in /usr/local/lib/python3.6/site-packages (2021.11.1) WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
Import uproot
, awkward
, coffea
.
import uproot
import awkward as ak
from coffea.nanoevents import NanoEventsFactory
from coffea import processor, hist
class MyZPeak(processor.ProcessorABC):
def __init__(self):
self._histo = hist.Hist(
"Events",
hist.Cat("dataset", "Dataset"),
hist.Bin("mass", "Z mass", 60, 60, 120),
)
@property
def accumulator(self):
return self._histo
# we will receive a NanoEvents instead of a coffea DataFrame
def process(self, events):
out = self.accumulator.identity()
mmevents = events[
(ak.num(events.Muon) == 2)
& (ak.sum(events.Muon.charge, axis=1) == 0)
]
zmm = mmevents.Muon[:, 0] + mmevents.Muon[:, 1]
out.fill(
dataset=events.metadata["dataset"],
mass=zmm.mass,
)
return out
def postprocess(self, accumulator):
return accumulator
ak.to_parquet(
uproot.lazy("../tests/samples/nano_dy.root:Events"),
"nano_dy.parquet",
list_to32=True,
use_dictionary=False,
compression="GZIP",
compression_level=1,
)
ak.to_parquet(
uproot.lazy("../tests/samples/nano_dimuon.root:Events"),
"nano_dimuon.parquet",
list_to32=True,
use_dictionary=False,
compression="GZIP",
compression_level=1,
)
Here we populate the CephFS mounted directory with the parquet files created in the previous step. In this version, we need to make sure that the individual file sizes is under 4MB which is the default object size of Ceph to ensure one-to-one mapping of files to objects, which is a requirement in the multiple-file design that we have now.
import os
os.makedirs("/mnt/cephfs/nanoevents/ZJets", exist_ok=True)
os.makedirs("/mnt/cephfs/nanoevents/Data", exist_ok=True)
for i in range(6):
os.system(f"cp nano_dy.parquet /mnt/cephfs/nanoevents/ZJets/nano_dy.{i}.parquet")
os.system(f"cp nano_dimuon.parquet /mnt/cephfs/nanoevents/Data/nano_dimuon.{i}.parquet")
!ls /mnt/cephfs/nanoevents/Data
nano_dimuon.0.parquet nano_dimuon.2.parquet nano_dimuon.4.parquet nano_dimuon.1.parquet nano_dimuon.3.parquet nano_dimuon.5.parquet
!ls /mnt/cephfs/nanoevents/ZJets
nano_dy.0.parquet nano_dy.2.parquet nano_dy.4.parquet nano_dy.1.parquet nano_dy.3.parquet nano_dy.5.parquet
events_skyhook = NanoEventsFactory.from_parquet("/mnt/cephfs/nanoevents/ZJets/nano_dy.0.parquet", skyhook_options = {"ceph_config_path": "/etc/ceph/ceph.conf", "ceph_data_pool": "cephfs_data"}).events()
events_skyhook.Muon
/workspace/binder/coffea/nanoevents/schemas/nanoaod.py:195: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8 RuntimeWarning,
<MuonArray [[], [], [], [], ... [], [], [], []] type='40 * var * muon'>
The LocalCluster()
used below creates a process pool with worker count equal to the number of cores available to the Notebook where each worker is single-threaded. The LocalCluster
can be replaced by other cluster resource managers provided by Dask Distributed like KuberneresCluster
, YarnCluster
, etc. Here, we create a LocalCluster
and get a client handle to it.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True, threads_per_worker=1)
client = Client(cluster)
/usr/local/lib/python3.6/site-packages/distributed/node.py:155: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 44011 instead http_address["port"], self.http_server.port
We have added a new function called run_parquet_job
to the executor API in coffea to run jobs on Parquet files using the Arrow Dataset API under the hood.
This API takes an optional ceph_config_path
parameter, which is basically the path to the configuration file of the Ceph cluster and instructs this function to read from RADOS using the SkyhookFileFormat
(which allows pushdown) instead of the out of the box ParquetFormat
API . This API also allows just passing a single directory path and the Datasets API does the dataset discovery task by itself. The calls to the Dataset API are launced in parallel and there will one Dataset API call per file.
%%time
result = processor.run_parquet_job({
"ZJets": "/mnt/cephfs/nanoevents/ZJets",
"Data": "/mnt/cephfs/nanoevents/Data"
},
"Events",
processor_instance=MyZPeak(),
executor=processor.dask_executor,
executor_args={"schema": processor.NanoAODSchema, "client": client, "use_skyhook": True}
)
CPU times: user 2.66 s, sys: 518 ms, total: 3.18 sCompleted | 7.1s Wall time: 7.21 s
iterative_executor
¶Run the same job again, but now iteratively. The calls to the Dataset API will now be sequential.
%%time
result = processor.run_parquet_job({
"ZJets": "/mnt/cephfs/nanoevents/ZJets",
"Data": "/mnt/cephfs/nanoevents/Data"
},
"Events",
processor_instance=MyZPeak(),
executor=processor.iterative_executor,
executor_args={"schema": processor.NanoAODSchema, "use_skyhook": True}
)
Processing: 0%| | 0/12 [00:00<?, ?chunk/s]
/workspace/binder/coffea/nanoevents/schemas/nanoaod.py:195: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8 RuntimeWarning,
CPU times: user 8.12 s, sys: 854 ms, total: 8.98 s Wall time: 8.94 s
As expected, much slower than running using Dask.
%matplotlib inline
hist.plot1d(result)
<AxesSubplot:xlabel='Z mass', ylabel='Events'>