As machine learning (ML) becomes more popular in HEP analysis, coffea
also
provide tools to assist with using ML tools within the coffea framework. For
training and validation, you would likely need custom data mangling tools to
convert HEP data formats (NanoAOD, PFNano) to a format that
best interfaces with the ML tool of choice, as for training and validation, you
typical want to have fine control over what computation is done. For more
advanced use cases of data mangling and data saving, refer to the awkward array
manual and uproot/parquet write
operations for saving intermediate states. The helper tools provided in coffea
focuses on ML inference, where ML tool outputs are used as another variable to
be used in the event/object selection chain.
The typical operation of using ML inference tools in the awkward/coffea analysis
tools involves the conversion and padding of awkward array to ML tool containers
(usually something that is numpy
-compatible), run the inference, then
convert-and-truncate back into the awkward array syntax required for the
analysis chain to continue. With awkward arrays' laziness now being handled
entirely by dask
, the conversion operation of awkward array to
other array types needs to be wrapped in a way that is understandable to dask
.
The packages in the ml_tools
package attempts to wrap the common tools used by
the HEP community with a common interface to reduce the verbosity of the code on
the analysis side.
The example given in this notebook be using pytorch
to calculate a
jet-level discriminant using its constituent particles. An example for how to
construct such a pytorch
network can be found in the docs file, but for
mltools
in coffea, we only support the TorchScript format files to
load models to ensure operability when scaling to clusters. Let us first start
by downloading the example ParticleNet model file and a small PFNano
compatible file, and a simple function to open the PFNano
with and without
dask.
!wget --quiet -O model.pt https://github.com/CoffeaTeam/coffea/raw/ml_tools/tests/samples/triton_models_test/pn_test/1/model.pt
!wget --quiet -O pfnano.root https://github.com/CoffeaTeam/coffea/raw/ml_tools/tests/samples/pfnano.root
from coffea.nanoevents import NanoEventsFactory
from coffea.nanoevents.schemas import PFNanoAODSchema
def open_events(permit_dask=False):
factory = NanoEventsFactory.from_root(
"file:./pfnano.root",
schemaclass=PFNanoAODSchema,
permit_dask=permit_dask,
)
return factory.events()
Now we prepare a class to handle inference request by extending the
mltools.torch_wrapper
class. As the base class cannot know anything about the
data mangling required for the users particular model, we will need to overload
at least the method prepare_awkward
:
The input can be an arbitrary number of awkward arrays or dask awkward array (but never a mix of dask/non-dask array). In this example, we will be passing in the event array.
The output should be single tuple a
+ single dictionary b
, this is to
ensure that arbitrarily complicated outputs can be passed to the underlying
pytorch
model instance like model(*a, **b)
. The contents of a
and b
should be numpy
-compatible awkward-like arrays: if the inputs are non-dask
awkward arrays, the return should also be non-dask awkward arrays that can be
trivially converted to numpy
arrays via a ak.to_numpy
call; if the inputs
are dask awkward arrays, the return should be still be dask awkward arrays
that can be trivially converted via a to_awkward().to_numpy()
call. To
minimize changes to the code, a simple dask_awkward/awkward
switcher
get_awkward_lib
is provided, as there should be (near)-perfect feature
parity between the dask and non-dask arrays.
In this ParticleNet-like example, the model expects the following inputs:
N
jets x 2
coordinate x 100
constituents "points" array,
representing the constituent coordinates.N
jets x 5
feature x 100
constituents "features" array, representing
the constituent features of interest to be used for inference.N
jets x 1
mask x 100
constituent "mask" array, representing whether
a constituent should be masked from the inference request.In this case, we will need to flatten the E
events x N
jets structure,
then, we will need to stack the constituent attributes of interest via
ak.concatenate
into a single array.
After defining this minimum class, we can attempt to run inference using the
__call__
method defined in the base class. Notice that overloading this single
method will automatically allow for the inference to be called on both awkward
and dask-awkward.
from coffea.ml_tools.torch_wrapper import torch_wrapper
import awkward
import dask_awkward
import numpy as np
class ParticleNetExample1(torch_wrapper):
def prepare_awkward(self, events):
ak = self.get_awkward_lib(events)
jets = ak.flatten(events.Jet)
def pad(arr):
return ak.fill_none(
ak.pad_none(arr, 100, axis=1, clip=True),
0.0,
)
# Human readable version of what the inputs are
# Each array is a N jets x 100 constituent array
imap = {
"points": {
"deta": pad(jets.eta - jets.constituents.pf.eta),
"dphi": pad(jets.delta_phi(jets.constituents.pf)),
},
"features": {
"dr": pad(jets.delta_r(jets.constituents.pf)),
"lpt": pad(np.log(jets.constituents.pf.pt)),
"lptf": pad(np.log(jets.constituents.pf.pt / jets.pt)),
"f1": pad(np.log(np.abs(jets.constituents.pf.d0) + 1)),
"f2": pad(np.log(np.abs(jets.constituents.pf.dz) + 1)),
},
"mask": {
"mask": pad(ak.ones_like(jets.constituents.pf.pt)),
},
}
# Compacting the array elements into the desired dimension using
# ak.concatenate
retmap = {
k: ak.concatenate([x[:, np.newaxis, :] for x in imap[k].values()], axis=1)
for k in imap.keys()
}
# Returning everything using a dictionary. Also perform type conversion!
return (), {
"points": ak.values_astype(retmap["points"], "float32"),
"features": ak.values_astype(retmap["features"], "float32"),
"mask": ak.values_astype(retmap["mask"], "float16"),
}
# Setting up the model container
pn_example1 = ParticleNetExample1("model.pt")
# Running on awkward arrays
ak_events = open_events(permit_dask=False)
ak_results = pn_example1(ak_events)
print("Awkward results:", ak_results) # Runs fine!
# Running on dask_awkward array
dask_events = open_events(permit_dask=True)
dask_results = pn_example1(dask_events)
print("Dask awkward results:", dask_results) # Also runs file!
# Checking that the results are identical
assert awkward.all(dask_results.compute() == ak_results)
/home/ensc/VirtualENV/coffea-test/lib/python3.8/site-packages/coffea/ml_tools/helper.py:163: UserWarning: No format checks were performed on input! warnings.warn("No format checks were performed on input!")
Awkward results: [[0.0693, -0.0448], [0.0678, -0.0451], ..., [0.0616, ...], [0.0587, -0.0172]] Dask awkward results: dask.awkward<numpy_call_ParticleNetExample1_143880bd51265ab5cb0ff3b61a1522c0, npartitions=1>
For each jet in the input to the torch
model, the model returns a 2-tuple
probability value. Without additional specification, the torch_wrapper
class
performs a trival conversion of ak.from_numpy
of the torch model's output. We
can specify that we want to fold this back into nested structure by overloading
the postprocess_awkward
method of the class.
For the ParticleNet example we are going perform additional computation for the conversion back to awkward array formats:
softmax
method for the return of each jet (commonly used as
the singular ML inference "scores")softmax
array back into nested structure that is
compatible with the original events.Jet array.Notice that the inputs of the postprocess_awkward
method is different from the
prepare_awkward
method, only by that the first argument is the return array
of the model inference after the trivial from_numpy
conversion. Notice that
the return_array can be dask arrays, so the awkward/dask-awkward switching
function should also be used in this method.
class ParticleNetExample2(ParticleNetExample1):
def postprocess_awkward(self, return_array, events):
ak = self.get_awkward_lib(return_array)
softmax = np.exp(return_array)[:, 0] / ak.sum(np.exp(return_array), axis=-1)
njets = ak.count(events.Jet.pt, axis=-1)
return ak.unflatten(softmax, njets)
pn_example2 = ParticleNetExample2("model.pt")
# Running on awkward
ak_events = open_events(permit_dask=False)
ak_jets = ak_events.Jet
ak_jets["MLresults"] = pn_example2(ak_events)
ak_events["Jet"] = ak_jets
print(ak_events.Jet.MLresults)
# Running on dask awkward
dask_events = open_events(permit_dask=True)
dask_jets = dask_events.Jet
dask_jets["MLresults"] = pn_example2(dask_events)
dask_events["Jet"] = dask_jets
print(dask_events.Jet.MLresults)
assert awkward.all(ak_events.Jet.MLresults == dask_events.Jet.MLresults.compute())
[[0.528, 0.528, 0.524, 0.523, 0.521, 0.52, 0.519, 0.519], ..., [0.528, ...]]
/home/ensc/VirtualENV/coffea-test/lib/python3.8/site-packages/dask_awkward/lib/structure.py:751: UserWarning: Please ensure that dask.awkward<count, npartitions=1> is partitionwise-compatible with dask.awkward<divide, npartitions=1> (e.g. counts comes from a dak.num(array, axis=1)), otherwise this unflatten operation will fail when computed! warnings.warn(
dask.awkward<MLresults, npartitions=1>
Of course, the implementation of the classes above can be written in a single class. Here is a copy-and-paste implementation of the class with all the functionality described in the cells above:
class ParticleNetExample(torch_wrapper):
def prepare_awkward(self, events):
ak = self.get_awkward_lib(events)
jets = ak.flatten(events.Jet)
def pad(arr):
return ak.fill_none(
ak.pad_none(arr, 100, axis=1, clip=True),
0.0,
)
# Human readable version of what the inputs are
# Each array is a N jets x 100 constituent array
imap = {
"points": {
"deta": pad(jets.eta - jets.constituents.pf.eta),
"dphi": pad(jets.delta_phi(jets.constituents.pf)),
},
"features": {
"dr": pad(jets.delta_r(jets.constituents.pf)),
"lpt": pad(np.log(jets.constituents.pf.pt)),
"lptf": pad(np.log(jets.constituents.pf.pt / jets.pt)),
"f1": pad(np.log(np.abs(jets.constituents.pf.d0) + 1)),
"f2": pad(np.log(np.abs(jets.constituents.pf.dz) + 1)),
},
"mask": {
"mask": pad(ak.ones_like(jets.constituents.pf.pt)),
},
}
# Compacting the array elements into the desired dimension using
# ak.concatenate
retmap = {
k: ak.concatenate([x[:, np.newaxis, :] for x in imap[k].values()], axis=1)
for k in imap.keys()
}
# Returning everything using a dictionary. Also take care of type
# conversion here.
return (), {
"points": ak.values_astype(retmap["points"], "float32"),
"features": ak.values_astype(retmap["features"], "float32"),
"mask": ak.values_astype(retmap["mask"], "float16"),
}
def postprocess_awkward(self, return_array, events):
ak = self.get_awkward_lib(return_array, events)
softmax = np.exp(return_array)[:, 0] / ak.sum(np.exp(return_array), axis=-1)
njets = ak.count(events.Jet.pt, axis=-1)
return ak.unflatten(softmax, njets)
pn_example = ParticleNetExample("model.pt")
# Running on awkward arrays
ak_events = open_events(permit_dask=False)
ak_jets = ak_events.Jet
ak_jets["MLresults"] = pn_example(ak_events)
ak_events["Jet"] = ak_jets
print(ak_events.Jet.MLresults)
# Running on dask awkward arrays
dask_events = open_events(permit_dask=True)
dask_jets = dask_events.Jet
dask_jets["MLresults"] = pn_example(dask_events)
dask_events["Jet"] = dask_jets
print(dask_events.Jet.MLresults)
# Checking that we get identical results
assert awkward.all(dask_events.Jet.MLresults.compute() == ak_events.Jet.MLresults)
print(dask_awkward.necessary_columns(dask_events.Jet.MLresults))
[[0.528, 0.528, 0.524, 0.523, 0.521, 0.52, 0.519, 0.519], ..., [0.528, ...]] dask.awkward<MLresults, npartitions=1> {'from-uproot-9ff71c2b6c06eb15f091c3e41ead5497': ['Jet.pt', 'PFCands.d0', 'JetPFCands.pFCandsIdxG', 'PFCands.phi', 'Jet.phi', 'Jet.eta', 'PFCands.eta', 'PFCands.pt', 'Jet.pFCandsIdxG', 'PFCands.dz']}
In particular, analyzers should check that the last line contains only the branches required for ML inference; if there are many non-required branches, this may lead the significant performance penalties.
As per other dask tools, the users can extract how dask is analyzing the processing the computation routines using the following snippet.
print(dask_results.dask)
dask_results.visualize(optimize_graph=False)
HighLevelGraph with 103 layers. <dask.highlevelgraph.HighLevelGraph object at 0x7f2fc7d9a550> 0. from-uproot-40ed9ba7c4cc45e94105e61283f124a3 1. JetPFCands-f14e9b07eb3c690873f7ea00af7ee8eb 2. PFCands-d4a8642b8c01c731c7d43e4bde63dba7 3. JetPFCands-6271438633eb0569493e2a65beb35cb1 4. PFCands-988dd86d5d366180aedd789b9fd97788 5. JetPFCands-b9487676f464447b15804d03bca317da 6. PFCands-915a7a9dd26d7fb2110a52bc38fae16a 7. JetPFCands-1535528183c6524bb8872d941e3a2320 8. PFCands-a4e509a69376ee57025fb7c7bfbf7903 9. JetPFCands-0e8588a67ca85b26e05c720442904a9d 10. PFCands-e95bd3adde3a14965f38e941f6b912c3 11. JetPFCands-f14ebf216a7d1a1761a234ad85c21a54 12. PFCands-18e5d62f1b50a1c0edafa1f39ebe765d 13. JetPFCands-6beb51836e39d136853699523596bf0e 14. PFCands-27deeb09b87b1ac27f7199f705a0b003 15. JetPFCands-8da6714502a3b61dd64f44221e6d0299 16. PFCands-7e6e94c83dcffeb122571204de8d1392 17. Jet-fa3ab6fd9c4726ec37bac08fbeed78c9 18. flatten-4b26fe19858d9257a26a602aef991da8 19. pFCandsIdxG-e6ffb60d1c3a5132916c6e67aa728476 20. _apply_global_index-deae4d7cd7d1e0d8d803f96bc280cfa6 21. pFCandsIdxG-2edff7ce8b2126a66b44af833d8b713b 22. _apply_global_index-0d462573e4213b3c2d08e82f54e93c0f 23. pt-506e1c8af61c15ef17f71c0f91448da4 24. ones-like-4cecc07513ea49b0b40dff2fd351a5b9 25. pad-none-62e19b5d7cbf908345333361b2c3676f 26. fill-none-d69152c91524f0008baeabefd0ecee5e 27. getitem-8d5c6a7b1c97dc8ed9e10c049cc6c2be 28. <dask_awkward.lib.operations._ConcatenateFnAxisGT0-9ad4ca8ff26531037bb0cc11a7f2ad7b 29. values-astype-2aa08a09a73ce9f519a33d832d4fb9e0 30. pFCandsIdxG-d05662f0c636a8ba6ea4d2c8098e6724 31. _apply_global_index-bca0b107ba20464e7274ff0057c9c5e4 32. pFCandsIdxG-7d03c6ba83d0292b837070935e4f179f 33. _apply_global_index-3fee5a0ad3d79adbd6a0b3548e419670 34. dz-d84c08a881cdb8357cf60965fa9f0514 35. absolute-621e2d660cd9416b3dcec859c51f492d 36. add-832b07089f33f40fb7400a0d0fc2f056 37. log-f8bc95274ee24348b4a9c761a2b3dd98 38. pad-none-826aa3b61cfe7e95e4b97664e4838ee6 39. fill-none-fc36a3bd147b79337086981e85e8b3e0 40. getitem-2fcaebe4cad7140dabc447c856a1554d 41. pFCandsIdxG-fe9303d1a0e50dbf3072a4acdb8e18da 42. _apply_global_index-2a5bcce5b02484770cd000a274ebbaf2 43. pFCandsIdxG-5ec68f469f4143bb528c7465f5acaed6 44. _apply_global_index-f4e66cd12c20d5263f386ff0522fa064 45. d0-d3afb63d9984e8866d7d4c9d12a78b68 46. absolute-e10b15a7112e058001c6f750b725ebcb 47. add-ddfff3a6ad28203867a6c95dbcbe8c92 48. log-9d8a3462a857fc09ba36a62019ae3d6f 49. pad-none-003013bd782852d5a8eb4eb65f2bfb38 50. fill-none-d6d6dbcb020bba081e2941328868316f 51. getitem-3d26ecbb9663c9447de86a7f22402303 52. pt-c1c08a6f87adb59bc21fa26af117e088 53. pFCandsIdxG-80130338d3935d06bbe9b5d92c0cd4a0 54. _apply_global_index-1915fc96cf43d456c1f1da1e4f303d6d 55. pFCandsIdxG-034ba70cc8a6e45005800a930daeb81d 56. _apply_global_index-a1cc5d5d20b5db9ceb57fcfd576a9191 57. pt-315a7084b6e4dab545ccadb5a0db1ef4 58. divide-23fbc75b17f947f0900380ad2400c68f 59. log-f0c4cf3c299d4e8ee17d7e77be599884 60. pad-none-07caa722d36e199167481c1f614ae95d 61. fill-none-d14724eabffff62abe0c1b6c09146055 62. getitem-32f06961eb19fb2dbf4c7f981f5e1813 63. pFCandsIdxG-72f4f07442540275be1f9035bafecd43 64. _apply_global_index-bb29a2f3b8ac6d857d2e8be71992f7df 65. pFCandsIdxG-355ce670f4252da6d0cf79a617b71ad7 66. _apply_global_index-521fe6eb7254dce4894fd038838d89d4 67. pt-67c848d59484c26a1fdd3e462d472e7b 68. log-66c6953623e150d562859512627cabca 69. pad-none-a50fc8156f9b07aaafd3cce56d43ae91 70. fill-none-80fd6ae936e0efaa36a59e07ec7939cd 71. getitem-8703b8d79d8ec80382d2b1776378497f 72. pFCandsIdxG-c7aa707e27ed4ebeec227e4164931a3a 73. _apply_global_index-864fdce0b5fb921805356148809a09a3 74. pFCandsIdxG-40ebe488b5db2d7c7465e7ce37055cbb 75. _apply_global_index-124147639902656cffbf1be792a05568 76. delta-r-e26e7e27fd16d3b095c87281f84ab15b 77. pad-none-0ee0161b4c9a8e7e6c93a80bd776d97c 78. fill-none-f4957da021529a0e512ae5a3a00ddc8f 79. getitem-da8190a602ae329b7519c504a4fb1f8e 80. <dask_awkward.lib.operations._ConcatenateFnAxisGT0-94da2cb85eb62883bb7ed60ccb3ec975 81. values-astype-ec0a2f827e0ba814ec816efefcb57570 82. pFCandsIdxG-473bd48d6c3864fea7e4b2cb000ada1d 83. _apply_global_index-40b5e8c860542bfc1881443dcabcd3e8 84. pFCandsIdxG-27f5b433f817655bd4ec25ccc0f63bec 85. _apply_global_index-89427cf0f03a65aff27c761846405067 86. delta-phi-e1ea313cebc1c707e8c2d7529afa15f6 87. pad-none-6a21220ace9c2a30d93fbc4994526fd3 88. fill-none-601b81a21345678251df6bc77fab9e0b 89. getitem-d4d55e610305243a00ca2de75c288353 90. pFCandsIdxG-3d0e8498e93670136a314a894c9b436d 91. _apply_global_index-5d6192da248e624fa7bc8f2194c33fb9 92. pFCandsIdxG-61299a8da18075b6fff8303f525acf77 93. _apply_global_index-36bc916646db1e8b807a14b053c1fd92 94. eta-47da89c5d868ac233b6251b7221b7a66 95. eta-c44f6e7fabca938462b4e2ea68fca37d 96. subtract-c3852dc5e9ffafcc05519f4978eddb54 97. pad-none-759cb714701908c124f788730b7320e9 98. fill-none-e3f8210381d08b7a71a11e857d066eab 99. getitem-eba73f6ed6b09ec959d457903b73d39d 100. <dask_awkward.lib.operations._ConcatenateFnAxisGT0-3567704001a71a5020b605f689c2eaa9 101. values-astype-381417681b50dd8d3274ab22a8a7f0d6 102. numpy_call_ParticleNetExample1_143880bd51265ab5cb0ff3b61a1522c0-8a7fd3678eea90e891eb5b11dd380281
Or a peek at the optimized results:
dask_results.visualize(optimize_graph=True)
/home/ensc/VirtualENV/coffea-test/lib/python3.8/site-packages/coffea/ml_tools/helper.py:163: UserWarning: No format checks were performed on input! warnings.warn("No format checks were performed on input!")
All ML wrappers provided in the coffea.mltools
module (triton_wrapper
for
triton server inference, torch_wrapper
for pytorch, and
xgboost_wrapper
for xgboost inference) follow the same design:
analyzers is responsible for providing the model of interest, along with
providing an inherited class that overloads of the following methods to data
type conversion:
prepare_awkward
: converting awkward arrays to numpy
-compatible awkward
arrays, the output arrays should be in the format of a tuple a
and a
dictionary b
, which can be expanded out to the input of the ML tool like
model(*a, **b)
. Notice some additional trivial conversion, such as the
conversion to available kernels for pytorch
, converting to a matrix format
for xgboost
, and slice of array for triton
is handled automatically by the
respective wrappers. To handle both dask/non-dask arrays, the user should use
the provided get_awkward_lib
library switcher.postprocess_awkward
(optional): converting the trivial converted numpy array
results back to the analysis specific format. If this is not provided, then a
simple ak.from_numpy
conversion results is returned.If the ML tool of choice for your analysis has not been implemented by the
coffea.mltools
modules, consider constructing your own with the provided
numpy_call_wrapper
base class in coffea.mltools
. Aside from the functions
listed above, you will also need to provide the numpy_call
method to perform
any additional data format conversions, and call the ML tool of choice. If you
think your implementation is general, also consider submitting a PR to the
coffea
repository!