%matplotlib inline
import numpy as np
import coffea.processor as processor
import awkward as ak
import hist
from coffea.nanoevents import schemas
from coffea.nanoevents.schemas import BaseSchema, NanoAODSchema
class Q1Processor(processor.ProcessorABC):
def process(self, events):
return (
hist.Hist.new.Reg(100, 0, 200, name="met", label="$E_{T}^{miss}$ [GeV]")
.Double()
.fill(events.met_pt)
)
def postprocess(self, accumulator):
return accumulator
from dask.distributed import Client
client = Client("tcp://127.0.0.1:41775")
client
Client-687fed24-c250-11ec-9352-7ed8580c18e7
Connection method: Direct | |
Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status |
Scheduler-29f1bd3d-9f2b-4282-af2d-ce17c9cd4c36
Comm: tcp://127.0.0.1:41775 | Workers: 1 |
Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status | Total threads: 4 |
Started: 5 hours ago | Total memory: 15.70 GiB |
Comm: tcp://127.0.0.1:33089 | Total threads: 4 |
Dashboard: /user/oksana.shadura@cern.ch/proxy/37031/status | Memory: 15.70 GiB |
Nanny: tcp://127.0.0.1:33895 | |
Local directory: /home/cms-jovyan/dask-worker-space/worker-098e4b0c | |
Tasks executing: 0 | Tasks in memory: 0 |
Tasks ready: 0 | Tasks in flight: 0 |
CPU usage: 2.0% | Last seen: Just now |
Memory usage: 465.52 MiB | Spilled bytes: 0 B |
Read bytes: 1.87 kiB | Write bytes: 1.87 kiB |
run = processor.Runner(
executor=processor.DaskExecutor(client=client),
use_skyhook=True,
format="parquet",
schema=BaseSchema,
)
out = run(
{'SingleMu' : "/mnt/cephfs/AGC/modified_agc.parquet"},
"events",
processor_instance=Q1Processor()
)
[########################################] | 100% Completed | 0.1s
out.plot1d()
[StairsArtists(stairs=<matplotlib.patches.StepPatch object at 0x7fbd24c4d5e0>, errorbar=<ErrorbarContainer object of 3 artists>, legend_artist=<ErrorbarContainer object of 3 artists>)]