#!/usr/bin/env python # coding: utf-8 # ## Pipeline for the topology classifier with Apache Spark # # **2. Event Filtering and Feature Engineering** In this stage we prepare the input files for the three classifier models. Starting from the output of the previous stage (data ingestion) and producing the test and training datasets in Apache Parquet format. # # To run this notebook we used the following configuration: # * *Software stack*: Spark 3.3.2 # * *Platform*: CentOS 7, Python 3.9 # * *Spark cluster*: Analytix # In[1]: # No need to run this when using CERN SWAN service # Just add the configuration parameters for Spark on the "star" button integration # pip install pyspark or use your favorite way to set Spark Home, here we use findspark import findspark findspark.init('/home/luca/Spark/spark-3.3.2-bin-hadoop3') #set path to SPARK_HOME # Create Spark session and configure according to your environment from pyspark.sql import SparkSession spark = ( SparkSession.builder .appName("2-Feature Preparation") .master("yarn") .config("spark.driver.memory","2g") .config("spark.executor.memory","64g") .config("spark.executor.cores","8") .config("spark.dynamicAllocation.enabled","true") .config("spark.ui.showConsoleProgress", "false") .getOrCreate() ) # In[2]: # Check if Spark Session has been created correctly spark # This loads info a Spark dataframe the parquet files produced in the previous data ingestion step. # In[3]: # This is the input dataset. # It is the output of Step 1 - Data Ingestion # It is made available for CERN users on the Hadoop Analytix cluster dataset_path = "hdfs://analytix/Training/Spark/TopologyClassifier/dataIngestion_full_13TeV_20190522" data = ( spark.read .format("parquet") .load(dataset_path) ) events = data.count() print("There are {} events".format(events)) # We can also have a look at the distribution between classes after the filtering # In[4]: labels = ['QCD', 'tt', 'W+jets'] counts = data.groupBy('label').count().collect() qcd_events = 0 tt_events = 0 wjets_events = 0 print('There are:') for i in range(3): print('\t* {} {} events (frac = {:.3f})' .format( counts[i][1], labels[counts[i].label], counts[i][1]*1.0/events )) if counts[i].label==0: qcd_events = counts[i][1] elif counts[i].label==1: tt_events = counts[i][1] elif counts[i].label==2: wjets_events = counts[i][1] # The dataset is imbalanced, we may need to undersample it. #
# ## Feature preparation # # In the parquet produced in the previous step we have three columns: # 1. `hfeatures` containing the 14 High Level Features # 2. `lfeature` containing the Low Level Features (list of 801 particles each of them with 19 features) # 3. `label` identifying the sample # In[5]: data.printSchema() # We can begin by preparing the input for the HLF classifier which simply requires to scale features and encode the label. To use Spark `MinMaxScaler` we need to convert the input into `dense vectors`. # In[6]: from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf vector_dense_udf = udf(lambda r : Vectors.dense(r),VectorUDT()) data = data.withColumn('hfeatures_dense',vector_dense_udf('hfeatures')) # We can now build the pipeline to scale HLF and encode labels # In[7]: from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoder from pyspark.ml.feature import MinMaxScaler ## One-Hot-Encode encoder = OneHotEncoder(inputCols=["label"], outputCols=["encoded_label"], dropLast=False) ## Scale feature vector scaler = MinMaxScaler(inputCol="hfeatures_dense", outputCol="HLF_input") pipeline = Pipeline(stages=[encoder, scaler]) get_ipython().run_line_magic('time', 'fitted_pipeline = pipeline.fit(data)') # In[8]: # Apply the pipeline to data data = fitted_pipeline.transform(data) # New columns has been created, if we want to drop some of them we can use # ```Python # data = data.drop("col-name") # ``` # In[9]: data.printSchema() # Moving on the particle-sequence classifier we need to sort the particles in each event by decreasing $\Delta R$ distance from the isolated lepton, where $$\Delta R = \sqrt{\Delta \eta^2 + \Delta \phi^2}$$ # # From the production of the low level features we know that the isolated lepton is the first particle of the list and the 19 features are # ```Python # features = [ # 'Energy', 'Px', 'Py', 'Pz', 'Pt', 'Eta', 'Phi', # 'vtxX', 'vtxY', 'vtxZ', 'ChPFIso', 'GammaPFIso', 'NeuPFIso', # 'isChHad', 'isNeuHad', 'isGamma', 'isEle', 'isMu', 'Charge' # ] # ``` # therefore we need features 5 ($\eta$) and 6 ($\phi$) to compute $\Delta R$. # In[10]: import math class lepAngularCoordinates(): """ This class is used to store the lepton and compute DeltaR from the other particles """ def __init__(self, eta, phi): self.Eta = eta self.Phi = phi def DeltaR(self, eta, phi): deta = self.Eta - eta dphi = self.Phi - phi pi = math.pi while dphi > pi: dphi -= 2*pi while dphi < -pi: dphi += 2*pi return math.sqrt(deta*deta + dphi*dphi) # In[11]: from pyspark.sql.types import ArrayType, DoubleType from sklearn.preprocessing import StandardScaler @udf(returnType=ArrayType(ArrayType(DoubleType()))) def transform(particles): ## The isolated lepton is the first partiche in the list ISOlep = lepAngularCoordinates(particles[0][5], particles[0][6]) ## Sort the particles based on the distance from the isolated lepton particles.sort(key = lambda part: ISOlep.DeltaR(part[5], part[6]), reverse=True) ## Standardize particles = StandardScaler().fit_transform(particles).tolist() return particles # In[12]: data = data.withColumn('GRU_input', transform('lfeatures')) # In[13]: data.printSchema() # ## Undersample the dataset # In[14]: qcd = data.filter('label=0') tt = data.filter('label=1') wjets = data.filter('label=2') # In[15]: # Create the undersampled dataframes # False means to sample without repetition tt = tt.sample(False, qcd_events*1.0/tt_events) wjets = wjets.sample(False, qcd_events*1.0/wjets_events) dataUndersampled = qcd.union(tt).union(wjets) # In[16]: dataUndersampled.groupBy('label').count().show() # ## Shuffle the dataset # # Because of how the dataset has been created it is made by "three blocks" obtained with the union of three samples. Therefore we need to shuffle the dataset. We splid this dataset into `train`/`test` and shuffle the train dataset. # In[17]: from pyspark.sql.functions import rand trainUndersampled, testUndersampled = dataUndersampled.randomSplit([0.8, 0.2], seed=42) trainUndersampled = trainUndersampled.orderBy(rand(seed=42)) # Notice that the whole pipeline will be trigger by the action of saving to the parquet files. # ## Save the datasets as Apache Parquet files # In[18]: PATH = "hdfs://analytix/Training/Spark/TopologyClassifier/" numTestPartitions = 800 get_ipython().run_line_magic('time', "testUndersampled.coalesce(numTestPartitions).write.parquet(PATH + 'testUndersampled.parquet')") # In[19]: numTrainPartitions = 800 get_ipython().run_line_magic('time', "trainUndersampled.coalesce(numTrainPartitions).write.parquet(PATH + 'trainUndersampled.parquet')") # In[20]: spark.stop() # In[ ]: