Joeri Hermans (Technical Student, IT-DB-SAS, CERN)
Departement of Data Science & Knowledge Engineering
Maastricht University, The Netherlands
In this notebook we download the CIFAR-10 dataset, and prepare it in such a way it can be processed by Spark.
import cPickle as pickle
import csv
import numpy as np
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *
Using TensorFlow backend.
!rm cifar-10-python.tar.gz
!rm -r cifar-10-batches-py
!wget https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
--2017-01-26 15:42:04-- https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz Resolving www.cs.toronto.edu... 128.100.3.30 Connecting to www.cs.toronto.edu|128.100.3.30|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 170498071 (163M) [application/x-gzip] Saving to: “cifar-10-python.tar.gz” 100%[======================================>] 170,498,071 4.88M/s in 33s 2017-01-26 15:42:40 (4.89 MB/s) - “cifar-10-python.tar.gz” saved [170498071/170498071]
!tar -xvzf cifar-10-python.tar.gz
cifar-10-batches-py/ cifar-10-batches-py/data_batch_4 cifar-10-batches-py/readme.html cifar-10-batches-py/test_batch cifar-10-batches-py/data_batch_3 cifar-10-batches-py/batches.meta cifar-10-batches-py/data_batch_2 cifar-10-batches-py/data_batch_5 cifar-10-batches-py/data_batch_1
# Define the required datastructures.
training_instances = []
training_labels = []
# Iterate through all training batches, and load them in memory.
for i in range(1, 6):
path = "cifar-10-batches-py/data_batch_" + str(i)
fd = open(path, "rb")
d = pickle.load(fd)
fd.close()
# Add the training data to our datastructures.
num_instances = len(d['data'])
for j in range(0, num_instances):
training_instances.append(d['data'][j])
training_labels.append(d['labels'][j])
print("Number of training instances: " + str(len(training_instances)))
Number of training instances: 50000
# Define the reuiqred datastructures.
test_instances = []
test_labels = []
# Load the test batch.
path = "cifar-10-batches-py/test_batch"
fd = open(path, "rb")
d = pickle.load(fd)
fd.close()
# Add the testset to our datastructures.
num_instances = len(d['data'])
for j in range(0, num_instances):
test_instances.append(d['data'][j])
test_labels.append(d['labels'][j])
print("Number of test instances: " + str(len(test_instances)))
Number of test instances: 10000
At this point we have the training and test set in memory. At this point we basically have 2 options to prepare it for Apache Spark. First, we simply "parallelize" the data, and continue from there. However, this requires some additional logic. The second approach is to write it to a file which Spark will be able to read (CSV, Parquet, Avro...). Due to the simplicity of the second approach, we will choose to write the contents of our datastructures in a CSV file.
# First, prepare the column names.
columns = ['label']
# Now, add the pixel column names. Note, first 1024 pixels are red, then green and finally blue.
for c in ['r','g','b']:
for i in range(0, 1024):
column_name = "p_" + str(i) + "_" + c
columns.append(column_name)
# Now, we should have 3072 (data) + 1 (label) column names.
print("Number of columns: " + str(len(columns)))
Number of columns: 3073
training_set = []
test_set = []
# Prepare the training set.
for i in range(0, len(training_instances)):
row = np.insert(training_instances[i], 0, training_labels[i])
training_set.append(row)
# Prepare the test set.
for i in range(0, len(test_instances)):
row = np.insert(test_instances[i], 0, test_labels[i])
test_set.append(row)
print("Size training set: " + str(len(training_set)))
print("Size test set: " + str(len(test_set)))
Size training set: 50000 Size test set: 10000
def save(path, columns, dataset):
with open(path, 'wb') as f:
w = csv.writer(f)
# Write the columns.
w.writerow(columns)
# Iterate through all instances in the training set.
n = len(dataset)
for i in range(0, n):
w.writerow(dataset[i].tolist())
# Save the datasets to disk.
save("cifar-10-training.csv", columns, training_set)
save("cifar-10-test.csv", columns, test_set)
# Confirming that produced CSV's are present
!ls | grep cifar | grep csv
cifar-10-test.csv cifar-10-training.csv
# Remove the old training and test set from HDFS.
!hdfs dfs -rm data/cifar-10-training.csv
!hdfs dfs -rm data/cifar-10-test.csv
# Copy the training and test set to HDFS.
!hdfs dfs -copyFromLocal cifar-10-training.csv data/cifar-10-training.csv
!hdfs dfs -copyFromLocal cifar-10-test.csv data/cifar-10-test.csv
Deleted data/cifar-10-training.csv Deleted data/cifar-10-test.csv
# Modify these variables according to your needs.
application_name = "CIFAR-10 Preprocessing Notebook"
using_spark_2 = False
local = False
path_train = "data/cifar-10-training.csv"
path_test = "data/cifar-10-test.csv"
if local:
# Tell master to use local resources.
master = "local[*]"
num_processes = 3
num_executors = 1
else:
# Tell master to use YARN.
master = "yarn-client"
num_executors = 20
num_processes = 1
num_workers = num_executors * num_processes
import os
# Use the DataBricks CSV reader, this has some nice functionality regarding invalid values.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "4g")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
# Check if the user is running Spark 2.0 +
if using_spark_2:
sc = SparkSession.builder.config(conf=conf) \
.appName(application_name) \
.getOrCreate()
else:
# Create the Spark context.
sc = SparkContext(conf=conf)
# Add the missing imports
from pyspark import SQLContext
sqlContext = SQLContext(sc)
# Check if we are using Spark 2.0
if using_spark_2:
reader = sc
else:
reader = sqlContext
# Read the training set.
raw_dataset_train = reader.read.format('com.databricks.spark.csv') \
.options(header='true', inferSchema='true') \
.load(path_train)
# Read the testing set.
raw_dataset_test = reader.read.format('com.databricks.spark.csv') \
.options(header='true', inferSchema='true') \
.load(path_test)
# Count the number of instances in the training and test set (to check).
print("Training set size: " + str(raw_dataset_train.count()))
print("Test set size: " + str(raw_dataset_test.count()))
Training set size: 50000 Test set size: 10000
In order to ensure compatibility with Apache Spark, we vectorize the columns, and add the resulting vectors as a seperate column. However, in order to achieve this, we first need a list of the required columns. This is shown in the cell below.
features = raw_dataset_train.columns
features.remove('label')
Once we have a list of columns names, we can pass this to Spark's VectorAssembler. This VectorAssembler will take a list of features, vectorize them, and place them in a column defined in outputCol
.
# Assemble the columns.
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
dataset_train = vector_assembler.transform(raw_dataset_train)
dataset_test = vector_assembler.transform(raw_dataset_test)
# Repartition the dataset.
dataset_train = dataset_train.repartition(num_workers)
dataset_test = dataset_test.repartition(num_workers)
Once we have the inputs for our Neural Network (features column) after applying the VectorAssembler, we should also define the outputs. Since we are dealing with a classification task, the output of our Neural Network should be a one-hot encoded vector with 10 elements. For this, we provide a OneHotTransformer
which accomplish this exact task.
nb_classes = 10
encoder = OneHotTransformer(nb_classes, input_col="label", output_col="label_encoded")
dataset_train = encoder.transform(dataset_train)
dataset_test = encoder.transform(dataset_test)
Finally, normalize the pixel intensities with the range [0, 1].
# Allocate a MinMaxTransformer.
transformer = MinMaxTransformer(n_min=0.0, n_max=1.0, \
o_min=0.0, o_max=250.0, \
input_col="features", \
output_col="features_normalized")
# Transform the datasets.
dataset_train = transformer.transform(dataset_train)
dataset_test = transformer.transform(dataset_test)
# Delete the old preprocessed Parquet files.
!hdfs dfs -rm -r data/cifar-10-train-preprocessed.parquet
!hdfs dfs -rm -r data/cifar-10-test-preprocessed.parquet
Deleted data/cifar-10-train-preprocessed.parquet Deleted data/cifar-10-test-preprocessed.parquet
dataset_train.write.parquet("data/cifar-10-train-preprocessed.parquet")
dataset_test.write.parquet("data/cifar-10-test-preprocessed.parquet")