#!/usr/bin/env python # coding: utf-8 # # MNIST Analysis with Distributed Keras # # **Joeri Hermans** (Technical Student, IT-DB-SAS, CERN) # *Departement of Knowledge Engineering* # *Maastricht University, The Netherlands* # In[1]: get_ipython().system('(date +%d\\ %B\\ %G)') # In this notebook we will show you how to process the [MNIST](http://yann.lecun.com/exdb/mnist/) dataset using Distributed Keras. As in the [workflow](https://github.com/JoeriHermans/dist-keras/blob/master/examples/workflow.ipynb) notebook, we will guide you through the complete machine learning pipeline. # # ## Preparation # # To get started, we first load all the required imports. Please make sure you installed `dist-keras`, and `seaborn`. Furthermore, we assume that you have access to an installation which provides Apache Spark. # # Before you start this notebook, place make sure you ran the "MNIST preprocessing" notebook first, since we will be evaluating a manually "enlarged dataset". # In[2]: get_ipython().run_line_magic('matplotlib', 'inline') import numpy as np from keras.optimizers import * from keras.models import Sequential from keras.layers.core import * from keras.layers.convolutional import * from pyspark import SparkContext from pyspark import SparkConf from matplotlib import pyplot as plt from pyspark import StorageLevel from pyspark.ml.feature import StandardScaler from pyspark.ml.feature import VectorAssembler from pyspark.ml.feature import OneHotEncoder from pyspark.ml.feature import MinMaxScaler from pyspark.ml.feature import StringIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator from distkeras.trainers import * from distkeras.predictors import * from distkeras.transformers import * from distkeras.evaluators import * from distkeras.utils import * # In the following cell, adapt the parameters to fit your personal requirements. # In[3]: # Modify these variables according to your needs. application_name = "Distributed Keras MNIST Analysis" using_spark_2 = False local = False path = "mnist.parquet" if local: # Tell master to use local resources. master = "local[*]" num_processes = 3 num_executors = 1 else: # Tell master to use YARN. master = "yarn-client" num_executors = 30 num_processes = 1 # In[4]: # This variable is derived from the number of cores and executors, and will be used to assign the number of model trainers. num_workers = num_executors * num_processes print("Number of desired executors: " + `num_executors`) print("Number of desired processes / executor: " + `num_processes`) print("Total number of workers: " + `num_workers`) # In[5]: conf = SparkConf() conf.set("spark.app.name", application_name) conf.set("spark.master", master) conf.set("spark.executor.cores", `num_processes`) conf.set("spark.executor.instances", `num_executors`) conf.set("spark.locality.wait", "0") conf.set("spark.executor.memory", "5g") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); # Check if the user is running Spark 2.0 + if using_spark_2: sc = SparkSession.builder.config(conf=conf) \ .appName(application_name) \ .getOrCreate() else: # Create the Spark context. sc = SparkContext(conf=conf) # Add the missing imports from pyspark import SQLContext sqlContext = SQLContext(sc) # In[6]: # Check if we are using Spark 2.0 if using_spark_2: reader = sc else: reader = sqlContext # Read the training and test set. training_set = reader.read.parquet('data/mnist_train_big.parquet') \ .select("features_normalized_dense", "label_encoded", "label") test_set = reader.read.parquet('data/mnist_test_preprocessed.parquet') \ .select("features_normalized_dense", "label_encoded", "label") # In[7]: # Print the schema of the dataset. training_set.printSchema() # ## Model Development # ### Multilayer Perceptron # In[8]: mlp = Sequential() mlp.add(Dense(1000, input_shape=(784,))) mlp.add(Activation('relu')) mlp.add(Dropout(0.2)) mlp.add(Dense(200)) mlp.add(Activation('relu')) mlp.add(Dropout(0.2)) mlp.add(Dense(10)) mlp.add(Activation('softmax')) # In[9]: mlp.summary() # In[10]: optimizer_mlp = 'adam' loss_mlp = 'categorical_crossentropy' # ## Training # # Prepare the training and test set for evaluation and training. # In[11]: training_set = training_set.repartition(num_workers) test_set = test_set.repartition(num_workers) training_set.cache() test_set.cache() print("Number of training instances: " + str(training_set.count())) print("Number of testing instances: " + str(test_set.count())) # ## Evaluation # # We define a utility function which will compute the accuracy for us. # In[12]: def evaluate_accuracy(model, test_set, features="features_normalized_dense"): evaluator = AccuracyEvaluator(prediction_col="prediction_index", label_col="label") predictor = ModelPredictor(keras_model=model, features_col=features) transformer = LabelIndexTransformer(output_dim=10) test_set = test_set.select(features, "label") test_set = predictor.predict(test_set) test_set = transformer.transform(test_set) score = evaluator.evaluate(test_set) return score # ### ADAG # In[ ]: trainer = ADAG(keras_model=mlp, worker_optimizer=optimizer_mlp, loss=loss_mlp, num_workers=num_workers, batch_size=4, communication_window=5, num_epoch=1, features_col="features_normalized_dense", label_col="label_encoded") # Modify the default parallelism factor. trained_model = trainer.train(training_set) # In[19]: # View the weights of the trained model. trained_model.get_weights() # In[20]: print("Training time: " + str(trainer.get_training_time())) print("Accuracy: " + str(evaluate_accuracy(trained_model, test_set)))