Joeri Hermans (Technical Student, IT-DB-SAS, CERN)
Departement of Knowledge Engineering
Maastricht University, The Netherlands
!(date +%d\ %B\ %G)
07 December 2016
In this notebook we will be preprocessing a 4.6 GB CSV file containing simulated ATLAS events. Afterwards we will save the processed data to the Parquet format for further analysis. After the completion of this notebook, we will have a processed dataset ready for model development, training and evaluation.
import numpy as np
import time
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from distkeras.utils import shuffle
from distkeras.transformers import OneHotTransformer
Using TensorFlow backend.
Edit the variables in the cell below. If you are running Spark in local mode, please set the local
flag to true and adjust the resources you wish to use on your local machine. The same goes for the case when you are running Spark 2.0 and higher.
# Modify these variables according to your needs.
application_name = "Distributed Deep Learning: Data Prerocessing"
using_spark_2 = False
local = False
if local:
# Tell master to use local resources.
master = "local[*]"
num_cores = 3
num_executors = 1
else:
# Tell master to use YARN.
master = "yarn-client"
num_executors = 8
num_cores = 2
In the following cells you are not required to change something. Adjusting the configuration in the cell above should be sufficient for running this notebook.
import os
# Use the DataBricks CSV reader, this has some nice functionality regarding invalid values.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory","2g")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
# Check if the user is running Spark 2.0 +
if using_spark_2:
sc = SparkSession.builder.config(conf=conf) \
.appName(application_name) \
.getOrCreate()
else:
# Create the Spark context.
sc = SparkContext(conf=conf)
# Add the missing imports
from pyspark import SQLContext
sqlContext = SQLContext(sc)
# Check if we are using Spark 2.0
if using_spark_2:
reader = sc
else:
reader = sqlContext
# Read the dataset.
raw_dataset = reader.read.format('com.databricks.spark.csv') \
.options(header='true', inferSchema='true').load("data/atlas_higgs.csv")
# Double-check the inferred schema, and get fetch a row to show how the dataset looks like.
raw_dataset.printSchema()
root |-- EventId: integer (nullable = true) |-- DER_mass_MMC: double (nullable = true) |-- DER_mass_transverse_met_lep: double (nullable = true) |-- DER_mass_vis: double (nullable = true) |-- DER_pt_h: double (nullable = true) |-- DER_deltaeta_jet_jet: double (nullable = true) |-- DER_mass_jet_jet: double (nullable = true) |-- DER_prodeta_jet_jet: double (nullable = true) |-- DER_deltar_tau_lep: double (nullable = true) |-- DER_pt_tot: double (nullable = true) |-- DER_sum_pt: double (nullable = true) |-- DER_pt_ratio_lep_tau: double (nullable = true) |-- DER_met_phi_centrality: double (nullable = true) |-- DER_lep_eta_centrality: double (nullable = true) |-- PRI_tau_pt: double (nullable = true) |-- PRI_tau_eta: double (nullable = true) |-- PRI_tau_phi: double (nullable = true) |-- PRI_lep_pt: double (nullable = true) |-- PRI_lep_eta: double (nullable = true) |-- PRI_lep_phi: double (nullable = true) |-- PRI_met: double (nullable = true) |-- PRI_met_phi: double (nullable = true) |-- PRI_met_sumet: double (nullable = true) |-- PRI_jet_num: integer (nullable = true) |-- PRI_jet_leading_pt: double (nullable = true) |-- PRI_jet_leading_eta: double (nullable = true) |-- PRI_jet_leading_phi: double (nullable = true) |-- PRI_jet_subleading_pt: double (nullable = true) |-- PRI_jet_subleading_eta: double (nullable = true) |-- PRI_jet_subleading_phi: double (nullable = true) |-- PRI_jet_all_pt: double (nullable = true) |-- Weight: double (nullable = true) |-- Label: string (nullable = true)
Next, we will take all the columns in the CSV except the EventId, Weight, and Label column since they are not relevant features.
# Record the starting time of the data preprocessing.
time_start = time.time()
# First, we would like to extract the desired features from the raw dataset.
# We do this by constructing a list with all desired columns.
features = raw_dataset.columns
features.remove('EventId')
features.remove('Weight')
features.remove('Label')
# Next, we use Spark's VectorAssembler to "assemble" (create) a vector of all desired features.
# http://spark.apache.org/docs/latest/ml-features.html#vectorassembler
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
# This transformer will take all columns specified in features, and create an additional column "features" which will contain all the desired features aggregated into a single vector.
dataset = vector_assembler.transform(raw_dataset)
# Show what happened after applying the vector assembler.
# Note: "features" column got appended to the end.
dataset.select("features").take(1)
[Row(features=DenseVector([138.47, 51.655, 97.827, 27.98, 0.91, 124.711, 2.666, 3.064, 41.928, 197.76, 1.582, 1.396, 0.2, 32.638, 1.017, 0.381, 51.626, 2.273, -2.414, 16.824, -0.277, 258.733, 2.0, 67.435, 2.15, 0.444, 46.062, 1.24, -2.475, 113.497]))]
Apply feature normalization with standard scaling using Spark's StandardScaler. This will transform a feature to have mean 0, and std 1.
standard_scaler = StandardScaler(inputCol="features", outputCol="features_normalized", withStd=True, withMean=True)
standard_scaler_model = standard_scaler.fit(dataset)
dataset = standard_scaler_model.transform(dataset)
The dataset is devided into 2 classes, i.e., Signal(s), and Background(b). In order to make our lives easier in the future. We need to provide a mapping with a one-hot encoded vector (of course, this is a design decision). We achieve this by applying a StringIndexer. Again, a StringIndexer is an internal feature transformer provided by Apache Spark.
label_indexer = StringIndexer(inputCol="Label", outputCol="label_index").fit(dataset)
dataset = label_indexer.transform(dataset)
# Show the result of the label transformation.
dataset.select("Label", "label_index").take(5)
[Row(Label=u's', label_index=1.0), Row(Label=u'b', label_index=0.0), Row(Label=u'b', label_index=0.0), Row(Label=u'b', label_index=0.0), Row(Label=u'b', label_index=0.0)]
# We observe that Keras is not able to work with these indexes.
# What it actually expects is a vector with an identical size to the output layer.
# Our framework provides functionality to do this with ease. What it basically does,
# given an expected vector dimension, it prepares zero vector with the specified dimensionality,
# and will set the neuron with a specific label index to one.
# For example:
# 1. Assume we have a label index: 3
# 2. Output dimensionality: 5
# With these parameters, we obtain the following vector in the DataFrame column: [0,0,0,1,0]
# First, we fetch the columns of interest.
dataset = dataset.select("features_normalized", "label_index")
# Number of classes (signal and background).
nb_classes = 2
# Construct a one-hot encoded vector using the provided index.
transformer = OneHotTransformer(output_dim=nb_classes, input_col="label_index", output_col="label")
dataset = transformer.transform(dataset)
# Only select the columns we need (less data shuffling) while training.
dataset = dataset.select("features_normalized", "label_index", "label")
dataset.cache()
DataFrame[features_normalized: vector, label_index: double, label: vector]
We shuffle the complete dataset in order to be able to draw stochastic samples from the dataframe.
# Randomize the dataset.
dataset = shuffle(dataset)
Finally, we save the shuffled and processed dataset to disk for later use.
# Store the preprocessed dataset as a Parquet file.
dataset.write.save("data/processed.parquet", format="parquet")
time_end = time.time()
dt = time_end - time_start
print("Total time: " + str(dt) + " seconds.")
print("Total time: " + str(dt / 60) + " minutes.")
Total time: 694.612912178 seconds. Total time: 11.5768818696 minutes.