2. Event Filtering and Feature Engineering In this stage we prepare the input files for the three classifier models. Starting from the output of the previous stage (data ingestion) and producing the test and training datasets in Apache Parquet format.
To run this notebook we used the following configuration:
# No need to run this when using CERN SWAN service
# Just add the configuration parameters for Spark on the "star" button integration
# pip install pyspark or use your favorite way to set Spark Home, here we use findspark
import findspark
findspark.init('/home/luca/Spark/spark-3.3.2-bin-hadoop3') #set path to SPARK_HOME
# Create Spark session and configure according to your environment
from pyspark.sql import SparkSession
spark = ( SparkSession.builder
.appName("2-Feature Preparation")
.master("yarn")
.config("spark.driver.memory","2g")
.config("spark.executor.memory","64g")
.config("spark.executor.cores","8")
.config("spark.dynamicAllocation.enabled","true")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()
)
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/03/03 20:48:05 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 23/03/03 20:48:22 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
# Check if Spark Session has been created correctly
spark
SparkSession - in-memory
This loads info a Spark dataframe the parquet files produced in the previous data ingestion step.
# This is the input dataset.
# It is the output of Step 1 - Data Ingestion
# It is made available for CERN users on the Hadoop Analytix cluster
dataset_path = "hdfs://analytix/Training/Spark/TopologyClassifier/dataIngestion_full_13TeV_20190522"
data = ( spark.read
.format("parquet")
.load(dataset_path)
)
events = data.count()
print("There are {} events".format(events))
There are 25468470 events
We can also have a look at the distribution between classes after the filtering
labels = ['QCD', 'tt', 'W+jets']
counts = data.groupBy('label').count().collect()
qcd_events = 0
tt_events = 0
wjets_events = 0
print('There are:')
for i in range(3):
print('\t* {} {} events (frac = {:.3f})'
.format(
counts[i][1],
labels[counts[i].label],
counts[i][1]*1.0/events
))
if counts[i].label==0:
qcd_events = counts[i][1]
elif counts[i].label==1:
tt_events = counts[i][1]
elif counts[i].label==2:
wjets_events = counts[i][1]
There are: * 14265397 tt events (frac = 0.560) * 9776447 W+jets events (frac = 0.384) * 1426626 QCD events (frac = 0.056)
The dataset is imbalanced, we may need to undersample it.
In the parquet produced in the previous step we have three columns:
hfeatures
containing the 14 High Level Featureslfeature
containing the Low Level Features (list of 801 particles each of them with 19 features)label
identifying the sampledata.printSchema()
root |-- hfeatures: vector (nullable = true) |-- lfeatures: array (nullable = true) | |-- element: array (containsNull = true) | | |-- element: double (containsNull = true) |-- label: integer (nullable = true)
We can begin by preparing the input for the HLF classifier which simply requires to scale features and encode the label. To use Spark MinMaxScaler
we need to convert the input into dense vectors
.
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
vector_dense_udf = udf(lambda r : Vectors.dense(r),VectorUDT())
data = data.withColumn('hfeatures_dense',vector_dense_udf('hfeatures'))
We can now build the pipeline to scale HLF and encode labels
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
## One-Hot-Encode
encoder = OneHotEncoder(inputCols=["label"],
outputCols=["encoded_label"],
dropLast=False)
## Scale feature vector
scaler = MinMaxScaler(inputCol="hfeatures_dense",
outputCol="HLF_input")
pipeline = Pipeline(stages=[encoder, scaler])
%time fitted_pipeline = pipeline.fit(data)
CPU times: user 333 ms, sys: 264 ms, total: 597 ms Wall time: 31.3 s
# Apply the pipeline to data
data = fitted_pipeline.transform(data)
New columns has been created, if we want to drop some of them we can use
data = data.drop("col-name")
data.printSchema()
root |-- hfeatures: vector (nullable = true) |-- lfeatures: array (nullable = true) | |-- element: array (containsNull = true) | | |-- element: double (containsNull = true) |-- label: integer (nullable = true) |-- hfeatures_dense: vector (nullable = true) |-- encoded_label: vector (nullable = true) |-- HLF_input: vector (nullable = true)
Moving on the particle-sequence classifier we need to sort the particles in each event by decreasing $\Delta R$ distance from the isolated lepton, where $$\Delta R = \sqrt{\Delta \eta^2 + \Delta \phi^2}$$
From the production of the low level features we know that the isolated lepton is the first particle of the list and the 19 features are
features = [
'Energy', 'Px', 'Py', 'Pz', 'Pt', 'Eta', 'Phi',
'vtxX', 'vtxY', 'vtxZ', 'ChPFIso', 'GammaPFIso', 'NeuPFIso',
'isChHad', 'isNeuHad', 'isGamma', 'isEle', 'isMu', 'Charge'
]
therefore we need features 5 ($\eta$) and 6 ($\phi$) to compute $\Delta R$.
import math
class lepAngularCoordinates():
"""
This class is used to store the lepton and compute DeltaR
from the other particles
"""
def __init__(self, eta, phi):
self.Eta = eta
self.Phi = phi
def DeltaR(self, eta, phi):
deta = self.Eta - eta
dphi = self.Phi - phi
pi = math.pi
while dphi > pi: dphi -= 2*pi
while dphi < -pi: dphi += 2*pi
return math.sqrt(deta*deta + dphi*dphi)
from pyspark.sql.types import ArrayType, DoubleType
from sklearn.preprocessing import StandardScaler
@udf(returnType=ArrayType(ArrayType(DoubleType())))
def transform(particles):
## The isolated lepton is the first partiche in the list
ISOlep = lepAngularCoordinates(particles[0][5], particles[0][6])
## Sort the particles based on the distance from the isolated lepton
particles.sort(key = lambda part: ISOlep.DeltaR(part[5], part[6]),
reverse=True)
## Standardize
particles = StandardScaler().fit_transform(particles).tolist()
return particles
data = data.withColumn('GRU_input', transform('lfeatures'))
data.printSchema()
root |-- hfeatures: vector (nullable = true) |-- lfeatures: array (nullable = true) | |-- element: array (containsNull = true) | | |-- element: double (containsNull = true) |-- label: integer (nullable = true) |-- hfeatures_dense: vector (nullable = true) |-- encoded_label: vector (nullable = true) |-- HLF_input: vector (nullable = true) |-- GRU_input: array (nullable = true) | |-- element: array (containsNull = true) | | |-- element: double (containsNull = true)
qcd = data.filter('label=0')
tt = data.filter('label=1')
wjets = data.filter('label=2')
# Create the undersampled dataframes
# False means to sample without repetition
tt = tt.sample(False, qcd_events*1.0/tt_events)
wjets = wjets.sample(False, qcd_events*1.0/wjets_events)
dataUndersampled = qcd.union(tt).union(wjets)
dataUndersampled.groupBy('label').count().show()
+-----+-------+ |label| count| +-----+-------+ | 0|1426626| | 1|1427006| | 2|1426639| +-----+-------+
Because of how the dataset has been created it is made by "three blocks" obtained with the union of three samples. Therefore we need to shuffle the dataset. We splid this dataset into train
/test
and shuffle the train dataset.
from pyspark.sql.functions import rand
trainUndersampled, testUndersampled = dataUndersampled.randomSplit([0.8, 0.2], seed=42)
trainUndersampled = trainUndersampled.orderBy(rand(seed=42))
Notice that the whole pipeline will be trigger by the action of saving to the parquet files.
PATH = "hdfs://analytix/Training/Spark/TopologyClassifier/"
numTestPartitions = 800
%time testUndersampled.coalesce(numTestPartitions).write.parquet(PATH + 'testUndersampled.parquet')
CPU times: user 408 ms, sys: 341 ms, total: 749 ms Wall time: 1h 24s
numTrainPartitions = 800
%time trainUndersampled.coalesce(numTrainPartitions).write.parquet(PATH + 'trainUndersampled.parquet')
CPU times: user 1.93 s, sys: 1.59 s, total: 3.52 s Wall time: 2h 9min 18s
spark.stop()