#!/usr/bin/env python # coding: utf-8 # In this demo, we first show how to use the Arrow `Dataset` API `SkyhookFileFormat` API to scan parquet files by pushing down scan opertations into Ceph and then we show how to use the `Dataset` API to process parquet files containing NanoEvents stored in Ceph in parallel through Coffea using Dask. # ## Exploring SkyhookFileFormat with PyArrow # # We import the Dataset API and the Parquet API from PyArrow. # In[19]: import pyarrow import pyarrow.dataset as ds import pyarrow.parquet as pq # Now, we will instantiate the `SkyhookFileFormat`. Upon instantiation, the connection to the Ceph cluster is made under the hood. The connection is closed automatically upon object destruction. The `SkyhookFileFormat` API currently takes the Ceph configuration file as input. It inherits from the `FileFormat` API and uses the `DirectObjectAccess` API under the hood to interact with the underlying objects that make up a file in CephFS. Since, we mount CephFS, we use the `FileSystemDataset` that comes out of the box with Apache Arrow for instantiating our dataset, as by mounting CephFS we have just another directory of Parquet files. Having the suitability of using the `FileSystemDataset`, we just can start pushing down scan operations to our Parquet files by just plugging in `SkyhookFileFormat` in the format paramter. # In[20]: dataset = ds.dataset("file:///mnt/cephfs/nyc", format=ds.SkyhookFileFormat("parquet", "/etc/ceph/ceph.conf")) # Now we apply some projections and filters on the dataset. # In[21]: dataset.to_table(columns=["total_amount", "fare_amount"], filter=(ds.field("trip_distance") > 20.0)).to_pandas() # ## Install Dask # # We will be using Dask workers for parallel execution. So, let's install it. # In[22]: get_ipython().system('pip3 install dask[distributed]') get_ipython().system("pip3 install 'fsspec>=0.3.3'") # ## Import the required modules # # Import `uproot`, `awkward`, `coffea`. # In[23]: import uproot import awkward as ak from coffea.nanoevents import NanoEventsFactory, NanoAODSchema from coffea import processor, hist # ## Define a Processor instance # # The processor implementation given below has been taken from [here](https://github.com/CoffeaTeam/coffea/blob/master/binder/nanoevents.ipynb). # In[24]: class MyZPeak(processor.ProcessorABC): def __init__(self): self._histo = hist.Hist( "Events", hist.Cat("dataset", "Dataset"), hist.Bin("mass", "Z mass", 60, 60, 120), ) @property def accumulator(self): return self._histo # we will receive a NanoEvents instead of a coffea DataFrame def process(self, events): out = self.accumulator.identity() mmevents = events[ (ak.num(events.Muon) == 2) & (ak.sum(events.Muon.charge, axis=1) == 0) ] zmm = mmevents.Muon[:, 0] + mmevents.Muon[:, 1] out.fill( dataset=events.metadata["dataset"], mass=zmm.mass, ) return out def postprocess(self, accumulator): return accumulator # ## Convert Root files containing NanoEvents to a Parquet file # In[25]: ak.to_parquet( uproot.lazy("../tests/samples/nano_dy.root:Events"), "nano_dy.parquet", list_to32=True, use_dictionary=False, compression="GZIP", compression_level=1, ) ak.to_parquet( uproot.lazy("../tests/samples/nano_dimuon.root:Events"), "nano_dimuon.parquet", list_to32=True, use_dictionary=False, compression="GZIP", compression_level=1, ) # ## Write some NanoEvents Parquet files to CephFS # # Here we populate the CephFS mounted directory with the parquet files created in the previous step. In this version, we need to make sure that the individual file sizes is under 4MB which is the default object size of Ceph to ensure one-to-one mapping of files to objects, which is a requirement in the multiple-file design that we have now. # In[26]: import os os.makedirs("/mnt/cephfs/nanoevents/ZJets", exist_ok=True) os.makedirs("/mnt/cephfs/nanoevents/Data", exist_ok=True) for i in range(6): os.system(f"cp nano_dy.parquet /mnt/cephfs/nanoevents/ZJets/nano_dy.{i}.parquet") os.system(f"cp nano_dimuon.parquet /mnt/cephfs/nanoevents/Data/nano_dimuon.{i}.parquet") # In[27]: get_ipython().system('ls /mnt/cephfs/nanoevents/Data') # In[28]: get_ipython().system('ls /mnt/cephfs/nanoevents/ZJets') # ## Reading Nanoevents using SkyhookFileFormat # In[29]: events_skyhook = NanoEventsFactory.from_parquet("/mnt/cephfs/nanoevents/ZJets/nano_dy.0.parquet", skyhook_options = {"ceph_config_path": "/etc/ceph/ceph.conf", "ceph_data_pool": "cephfs_data"}).events() events_skyhook.Muon # ## Running a job in parallel using Dask # # The `LocalCluster()` used below creates a process pool with worker count equal to the number of cores available to the Notebook where each worker is single-threaded. The `LocalCluster` can be replaced by other cluster resource managers provided by Dask Distributed like `KuberneresCluster`, `YarnCluster`, etc. Here, we create a `LocalCluster` and get a client handle to it. # In[30]: from dask.distributed import Client, LocalCluster cluster = LocalCluster(processes=True, threads_per_worker=1) client = Client(cluster) # We have added a new function called `run_parquet_job` to the executor API in coffea to run jobs on Parquet files using the Arrow Dataset API under the hood. # This API takes an optional `ceph_config_path` parameter, which is basically the path to the configuration file of the Ceph cluster and instructs this function to read from RADOS using the `SkyhookFileFormat` (which allows pushdown) instead of the out of the box `ParquetFormat` API . This API also allows just passing a single directory path and the Datasets API does the dataset discovery task by itself. The calls to the Dataset API are launced in parallel and there will one Dataset API call per file. # In[31]: get_ipython().run_cell_magic('time', '', 'result = processor.run_parquet_job({\n "ZJets": "/mnt/cephfs/nanoevents/ZJets",\n "Data": "/mnt/cephfs/nanoevents/Data"\n },\n "Events",\n processor_instance=MyZPeak(),\n executor=processor.dask_executor,\n executor_args={"schema": processor.NanoAODSchema, "client": client, "use_skyhook": True}\n)\n') # ## Running iteratively using the `iterative_executor` # # Run the same job again, but now iteratively. The calls to the Dataset API will now be sequential. # In[32]: get_ipython().run_cell_magic('time', '', 'result = processor.run_parquet_job({\n "ZJets": "/mnt/cephfs/nanoevents/ZJets",\n "Data": "/mnt/cephfs/nanoevents/Data"\n },\n "Events",\n processor_instance=MyZPeak(),\n executor=processor.iterative_executor,\n executor_args={"schema": processor.NanoAODSchema, "use_skyhook": True}\n)\n') # As expected, much slower than running using Dask. # ## Plotting the results # # # In[33]: get_ipython().run_line_magic('matplotlib', 'inline') hist.plot1d(result)