import json
import hist
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
skfmt = ds.SkyhookFileFormat("parquet", "/opt/ceph/ceph.conf", "cephfs-data0")
dataset = ds.dataset("file:///mnt/cephfs/nyc", format=skfmt)
dataset.to_table(columns=["total_amount", "fare_amount"], filter=(ds.field("trip_distance") > 20.0)).to_pandas()
total_amount | fare_amount | |
---|---|---|
0 | 75.84 | 52.00 |
1 | 69.99 | 52.00 |
2 | 59.84 | 53.00 |
3 | 68.50 | 53.50 |
4 | 70.01 | 52.00 |
... | ... | ... |
376 | 78.88 | 67.00 |
377 | 64.84 | 58.50 |
378 | 0.31 | 0.01 |
379 | 58.80 | 57.50 |
380 | 229.80 | 228.50 |
381 rows × 2 columns
with open('ntuples.json', 'r') as f:
data = json.load(f)
uris = list()
for file in data['data']['nominal']['files']:
uris.append(file['path'])
uris[0:5]
['https://xrootd-local.unl.edu:1094//store/user/AGC/datasets/Run2015D/SingleMuon/MINIAOD/16Dec2015-v1/10000/00006301-CAA8-E511-AD39-549F35AD8BC9.root', 'https://xrootd-local.unl.edu:1094//store/user/AGC/datasets/Run2015D/SingleMuon/MINIAOD/16Dec2015-v1/10000/0034202D-A3A8-E511-BA9C-00259073E3DA.root', 'https://xrootd-local.unl.edu:1094//store/user/AGC/datasets/Run2015D/SingleMuon/MINIAOD/16Dec2015-v1/10000/0043758E-ECA8-E511-B849-002618FDA287.root', 'https://xrootd-local.unl.edu:1094//store/user/AGC/datasets/Run2015D/SingleMuon/MINIAOD/16Dec2015-v1/10000/004C08BC-C8A8-E511-943C-00266CFAE6E0.root', 'https://xrootd-local.unl.edu:1094//store/user/AGC/datasets/Run2015D/SingleMuon/MINIAOD/16Dec2015-v1/10000/005416D9-E0A8-E511-8AA1-0CC47A4C8E46.root']
import servicex as sx
from func_adl_servicex import ServiceXSourceUpROOT
dataset_name = uris[0:5]
sx_dataset = sx.ServiceXDataset(dataset_name, "uproot", result_destination="volume")
ds = ServiceXSourceUpROOT(sx_dataset, "Events")
missing_ET = ds.Select(lambda event: {'met_pt': event.met_pt}).AsAwkwardArray().value()
from dask.distributed import Client
client = Client("tcp://127.0.0.1:34423")
client
Client-2881f5c2-c56a-11ec-8100-0a69900a1b41
Connection method: Direct | |
Dashboard: /user/jayjeetc@ucsc.edu/proxy/8787/status |
Scheduler-cf7bdc6d-cb59-49a1-9778-0f81a9df1281
Comm: tcp://127.0.0.1:34423 | Workers: 1 |
Dashboard: /user/jayjeetc@ucsc.edu/proxy/8787/status | Total threads: 4 |
Started: 1 minute ago | Total memory: 1.00 GiB |
Comm: tcp://127.0.0.1:39253 | Total threads: 4 |
Dashboard: /user/jayjeetc@ucsc.edu/proxy/46119/status | Memory: 1.00 GiB |
Nanny: tcp://127.0.0.1:37487 | |
Local directory: /home/cms-jovyan/dask-worker-space/worker-uq_tjsgh | |
Tasks executing: 0 | Tasks in memory: 0 |
Tasks ready: 0 | Tasks in flight: 0 |
CPU usage: 2.0% | Last seen: Just now |
Memory usage: 159.33 MiB | Spilled bytes: 0 B |
Read bytes: 28.89 kiB | Write bytes: 32.36 kiB |
import dask.dataframe as dd
df = dd.read_parquet("/mnt/cephfs/AGC/AGC_copied", filters=[('met_pt', '>', 450)], format="skyhook")
df.compute()
met_pt | |
---|---|
0 | 460.105164 |
0 | 460.105164 |
0 | 460.105164 |
0 | 460.105164 |
0 | 460.105164 |
... | ... |
0 | 460.105164 |
0 | 460.105164 |
0 | 460.105164 |
0 | 460.105164 |
0 | 460.105164 |
9000 rows × 1 columns
from coffea import processor
from coffea.nanoevents import schemas
class Q1Processor(processor.ProcessorABC):
def process(self, events):
return (
hist.Hist.new.Reg(100, 0, 200, name="met", label="$E_{T}^{miss}$ [GeV]")
.Double()
.fill(events.met_pt)
)
def postprocess(self, accumulator):
return accumulator
fileset = {'SingleMu' : "/mnt/cephfs/AGC/AGC_100"}
run = processor.Runner(executor=processor.DaskExecutor(client=client),
schema=schemas.BaseSchema,
use_skyhook=True,
format="parquet"
)
output = run(fileset, "Events", processor_instance=Q1Processor())
output
[########################################] | 100% Completed | 4.4s