#!/usr/bin/env python # coding: utf-8 # # Running inference tools # # As machine learning (ML) becomes more popular in HEP analysis, `coffea` also # provide tools to assist with using ML tools within the coffea framework. For # training and validation, you would likely need custom data mangling tools to # convert HEP data formats ([NanoAOD][nanoaod], [PFNano][pfnano]) to a format that # best interfaces with the ML tool of choice, as for training and validation, you # typical want to have fine control over what computation is done. For more # advanced use cases of data mangling and data saving, refer to the [awkward array # manual][datamangle] and [uproot][uproot_write]/[parquet][ak_parquet] write # operations for saving intermediate states. The helper tools provided in coffea # focuses on ML inference, where ML tool outputs are used as another variable to # be used in the event/object selection chain. # # [nanoaod]: https://twiki.cern.ch/twiki/bin/view/CMSPublic/WorkBookNanoAOD # [pfnano]: https://github.com/cms-jet/PFNano # [datamangle]: https://awkward-array.org/doc/main/user-guide/how-to-restructure.html # [uproot_write]: https://uproot.readthedocs.io/en/latest/basic.html#writing-ttrees-to-a-file # [ak_parquet]: https://awkward-array.org/doc/main/reference/generated/ak.to_parquet.html # # ## Why these wrapper tools are needed # # The typical operation of using ML inference tools in the awkward/coffea analysis # tools involves the conversion and padding of awkward array to ML tool containers # (usually something that is `numpy`-compatible), run the inference, then # convert-and-truncate back into the awkward array syntax required for the # analysis chain to continue. With awkward arrays' laziness now being handled # entirely by [`dask`][dask_awkward], the conversion operation of awkward array to # other array types needs to be wrapped in a way that is understandable to `dask`. # The packages in the `ml_tools` package attempts to wrap the common tools used by # the HEP community with a common interface to reduce the verbosity of the code on # the analysis side. # # [dask_awkward]: https://dask-awkward.readthedocs.io/en/stable/gs-limitations.html # # ## Example using ParticleNet-like jet variable calculation using PyTorch # # The example given in this notebook be using [`pytorch`][pytorch] to calculate a # jet-level discriminant using its constituent particles. An example for how to # construct such a `pytorch` network can be found in the docs file, but for # `mltools` in coffea, we only support the [TorchScript][pytorch] format files to # load models to ensure operability when scaling to clusters. Let us first start # by downloading the example ParticleNet model file and a small `PFNano` # compatible file, and a simple function to open the `PFNano` with and without # dask. # # [pytorch]: https://pytorch.org/ # [pytorch_jit]: https://pytorch.org/tutorials/beginner/saving_loading_models.html#export-load-model-in-torchscript-format # # In[1]: get_ipython().system('wget --quiet -O model.pt https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/triton_models_test/pn_test/1/model.pt') get_ipython().system('wget --quiet -O pfnano.root https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/pfnano.root') # In[2]: from coffea.nanoevents import NanoEventsFactory from coffea.nanoevents.schemas import PFNanoAODSchema def open_events(): factory = NanoEventsFactory.from_root( {"file:./pfnano.root": "Events"}, schemaclass=PFNanoAODSchema, ) return factory.events() # Now we prepare a class to handle inference request by extending the # `mltools.torch_wrapper` class. As the base class cannot know anything about the # data mangling required for the users particular model, we will need to overload # at least the method `prepare_awkward`: # # - The input can be an arbitrary number of awkward arrays or dask awkward array # (but never a mix of dask/non-dask array). In this example, we will be passing # in the event array. # - The output should be single tuple `a` + single dictionary `b`, this is to # ensure that arbitrarily complicated outputs can be passed to the underlying # `pytorch` model instance like `model(*a, **b)`. The contents of `a` and `b` # should be `numpy`-compatible awkward-like arrays: if the inputs are non-dask # awkward arrays, the return should also be non-dask awkward arrays that can be # trivially converted to `numpy` arrays via a `ak.to_numpy` call; if the inputs # are dask awkward arrays, the return should be still be dask awkward arrays # that can be trivially converted via a `to_awkward().to_numpy()` call. # # In this ParticleNet-like example, the model expects the following inputs: # # - A `N` jets x `2` coordinate x `100` constituents "points" array, # representing the constituent coordinates. # - A `N` jets x `5` feature x `100` constituents "features" array, representing # the constituent features of interest to be used for inference. # - A `N` jets x `1` mask x `100` constituent "mask" array, representing whether # a constituent should be masked from the inference request. # # In this case, we will need to flatten the `E` events x `N` jets structure, # then, we will need to stack the constituent attributes of interest via # `ak.concatenate` into a single array. # # After defining this minimum class, we can attempt to run inference using the # `__call__` method defined in the base class. # # In[3]: from coffea.ml_tools.torch_wrapper import torch_wrapper import awkward as ak import dask_awkward import numpy as np class ParticleNetExample1(torch_wrapper): def prepare_awkward(self, events): jets = ak.flatten(events.Jet) def pad(arr): return ak.fill_none( ak.pad_none(arr, 100, axis=1, clip=True), 0.0, ) # Human readable version of what the inputs are # Each array is a N jets x 100 constituent array imap = { "points": { "deta": pad(jets.eta - jets.constituents.pf.eta), "dphi": pad(jets.delta_phi(jets.constituents.pf)), }, "features": { "dr": pad(jets.delta_r(jets.constituents.pf)), "lpt": pad(np.log(jets.constituents.pf.pt)), "lptf": pad(np.log(jets.constituents.pf.pt / jets.pt)), "f1": pad(np.log(np.abs(jets.constituents.pf.d0) + 1)), "f2": pad(np.log(np.abs(jets.constituents.pf.dz) + 1)), }, "mask": { "mask": pad(ak.ones_like(jets.constituents.pf.pt)), }, } # Compacting the array elements into the desired dimension using # ak.concatenate retmap = { k: ak.concatenate([x[:, np.newaxis, :] for x in imap[k].values()], axis=1) for k in imap.keys() } # Returning everything using a dictionary. Also perform type conversion! return (), { "points": ak.values_astype(retmap["points"], "float32"), "features": ak.values_astype(retmap["features"], "float32"), "mask": ak.values_astype(retmap["mask"], "float16"), } # Setting up the model container pn_example1 = ParticleNetExample1("model.pt") # Running on dask_awkward array dask_events = open_events() dask_results = pn_example1(dask_events) print("Dask awkward results:", dask_results.compute()) # Runs file! # For each jet in the input to the `torch` model, the model returns a 2-tuple # probability value. Without additional specification, the `torch_wrapper` class # performs a trival conversion of `ak.from_numpy` of the torch model's output. We # can specify that we want to fold this back into nested structure by overloading # the `postprocess_awkward` method of the class. # # For the ParticleNet example we are going perform additional computation for the # conversion back to awkward array formats: # # - Calculate the `softmax` method for the return of each jet (commonly used as # the singular ML inference "scores") # - Fold the computed `softmax` array back into nested structure that is # compatible with the original events.Jet array. # # Notice that the inputs of the `postprocess_awkward` method is different from the # `prepare_awkward` method, only by that the first argument is the return array # of the model inference after the trivial `from_numpy` conversion. Notice that # the return_array is a dask array. # # In[4]: class ParticleNetExample2(ParticleNetExample1): def postprocess_awkward(self, return_array, events): softmax = np.exp(return_array)[:, 0] / ak.sum(np.exp(return_array), axis=-1) njets = ak.count(events.Jet.pt, axis=-1) return ak.unflatten(softmax, njets) pn_example2 = ParticleNetExample2("model.pt") # Running on dask awkward dask_events = open_events() dask_jets = dask_events.Jet dask_jets["MLresults"] = pn_example2(dask_events) dask_events["Jet"] = dask_jets print(dask_events.Jet.MLresults.compute()) # Of course, the implementation of the classes above can be written in a single # class. Here is a copy-and-paste implementation of the class with all the # functionality described in the cells above: # # In[5]: class ParticleNetExample(torch_wrapper): def prepare_awkward(self, events): jets = ak.flatten(events.Jet) def pad(arr): return ak.fill_none( ak.pad_none(arr, 100, axis=1, clip=True), 0.0, ) # Human readable version of what the inputs are # Each array is a N jets x 100 constituent array imap = { "points": { "deta": pad(jets.eta - jets.constituents.pf.eta), "dphi": pad(jets.delta_phi(jets.constituents.pf)), }, "features": { "dr": pad(jets.delta_r(jets.constituents.pf)), "lpt": pad(np.log(jets.constituents.pf.pt)), "lptf": pad(np.log(jets.constituents.pf.pt / jets.pt)), "f1": pad(np.log(np.abs(jets.constituents.pf.d0) + 1)), "f2": pad(np.log(np.abs(jets.constituents.pf.dz) + 1)), }, "mask": { "mask": pad(ak.ones_like(jets.constituents.pf.pt)), }, } # Compacting the array elements into the desired dimension using # ak.concatenate retmap = { k: ak.concatenate([x[:, np.newaxis, :] for x in imap[k].values()], axis=1) for k in imap.keys() } # Returning everything using a dictionary. Also take care of type # conversion here. return (), { "points": ak.values_astype(retmap["points"], "float32"), "features": ak.values_astype(retmap["features"], "float32"), "mask": ak.values_astype(retmap["mask"], "float16"), } def postprocess_awkward(self, return_array, events): softmax = np.exp(return_array)[:, 0] / ak.sum(np.exp(return_array), axis=-1) njets = ak.count(events.Jet.pt, axis=-1) return ak.unflatten(softmax, njets) pn_example = ParticleNetExample("model.pt") # Running on dask awkward arrays dask_events = open_events() dask_jets = dask_events.Jet dask_jets["MLresults"] = pn_example(dask_events) dask_events["Jet"] = dask_jets print(dask_events.Jet.MLresults.compute()) print(dask_awkward.necessary_columns(dask_events.Jet.MLresults)) # In particular, analyzers should check that the last line contains only the # branches required for ML inference; if there are many non-required branches, # this may lead the significant performance penalties. # # As per other dask tools, the users can extract how dask is analyzing the # processing the computation routines using the following snippet. # In[6]: print(dask_results.dask) dask_results.visualize(optimize_graph=False) # Or a peek at the optimized results: # In[7]: dask_results.visualize(optimize_graph=True) # ## Comments about generalizing to other ML tools # # All ML wrappers provided in the `coffea.mltools` module (`triton_wrapper` for # [triton][triton] server inference, `torch_wrapper` for pytorch, # `xgboost_wrapper` for [xgboost][xgboost] inference, `tf_wrapper` for tensorflow) # follow the same design: analyzers is responsible for providing the model of # interest, along with providing an inherited class that overloads of the following # methods to data type conversion: # # - `prepare_awkward`: converting awkward arrays to `numpy`-compatible awkward # arrays, the output arrays should be in the format of a tuple `a` and a # dictionary `b`, which can be expanded out to the input of the ML tool like # `model(*a, **b)`. Notice some additional trivial conversion, such as the # conversion to available kernels for `pytorch`, converting to a matrix format # for `xgboost`, and slice of array for `triton` is handled automatically by the # respective wrappers. # - `postprocess_awkward` (optional): converting the trivial converted numpy array # results back to the analysis specific format. If this is not provided, then a # simple `ak.from_numpy` conversion results is returned. # # If the ML tool of choice for your analysis has not been implemented by the # `coffea.mltools` modules, consider constructing your own with the provided # `numpy_call_wrapper` base class in `coffea.mltools`. Aside from the functions # listed above, you will also need to provide the `numpy_call` method to perform # any additional data format conversions, and call the ML tool of choice. If you # think your implementation is general, also consider submitting a PR to the # `coffea` repository! # # [triton]: https://catalog.ngc.nvidia.com/orgs/nvidia/containers/tritonserver # [xgboost]: https://xgboost.readthedocs.io/en/stable/ # # ## Additional comments on common `prepare_awkward` patterns # # The key requirement of all wrapper classes in `ml_tools` pacakge, is that to convert # awkward arrays into `numpy`-compatible formats using just `awkward` related tools, # which ensures that no eager data conversion is performed on dask arrays. Below are # some common patterns that are useful when defining a user-level class. # # ### Casting multiple fields a collection to be separate axis # # Given our collection of particles of length $N$, our tool is interested in just a # sub-set of fields is to be represented as an $N\time M$ array. You can do acheive this # using just `ak.concatenate` and dimension expansion with `np.newaxis`: # # ```python # fields_of_interest = ["field1", "field2", "field3"] # part_np_array = ak.concatenate( # [ # part[field][:,np.newaxis] # Expanding length N array to Nx1 # for field in fields_of_interest # ], # axis=1, # ) # This should now be a Nx3 array # ``` # # ### Fixing collection dimensions # # Many ML inteference tools work with fixed dimension inputs, with missing entries # being set to a placeholder values. A common method for achieving this in awkward # is with `pad_none`/`fill_none` calls, for example to pad the number of particles # passed to the inference tool in each event to be a fixed length of 128: # # ```python # part_padded = ak.fill_none( # ak.pad_none(part, 128, axis=1, clip=True), # -1000, # Placeholder value # axis=1, # ) # ``` # # The dimensions of this resulting `part_padded` array is still `N x var`, indicating # that the number of entries `axis=1` can potentially be variable. Depending on the # ML tools being used, this axis dimension may to be fixed. To strictly convert this # to a `Nx128` array, one can call `flatten`/`unflatten` pairs: # # ```python # part_padded = ak.flatten(part_padded) # part_padded = ak.unflatten(part_padded, 128) # Now this is a Nx128 array # ```