Jupyter Notebooks—one of the industry’s favorite open-source web applications for creating and sharing documents housing live code, equations, visualizations, and more—are great for iterative data science work.
However, working with large datasets can be challenging since data doesn’t typically fit on the local disk or into local memory. Instead, data is stored in a cluster and a distributed computing framework such as Apache Spark is required to process it within a reasonable amount of time.
Now, this is where BeakerX comes in.
BeakerX is a collection of Jupyter kernels and notebook extensions that allows you to work efficiently with large Spark datasets directly from a notebook. In addition, you can still use the data science libraries you're familiar with for local development.
In this tutorial, you’ll learn how to use BeakerX to build an interactive notebook that reads a dataset from HDFS and classifies the data using a linear model built with Spark in Scala. We’ll also produce a report using popular Python libraries.
Before we dive in, it’s important to note that this tutorial assumes you're using Jupyter Notebooks on DC/OS, which offers notebooks preconfigured with many popular data science tools such as Spark, BeakerX, pandas, and scikit-learn.
To run this tutorial you need the HDFS and Jupyter packages installed on your DC/OS cluster. Directions for installing HDFS and configuring Jupyter can be found in this tutorial video.
Once you have these packages installed, navigate to your Jupyter instance and open a terminal tab. Run the following commands from the Jupyter terminal to clone the repository that contains this notebook.
git clone https://github.com/dcos-labs/data-science.git
Using the Jupyter file browser, navigate to the folder called data-science, then beakerx, and open the notebook called BeakerX.ipynb.
The code below renders a slider that we can use to change the value of sample_size
, which will be used further down. UI components are a great way to produce interactive notebooks that make data exploration more engaging and BeakerX offers a number of forms, widgets, and interactive components.
Notice the %%groovy
magic on the first line. This tells BeakerX to interpret the cell content using the Groovy programming language. BeakerX supports multiple languages per notebook this way, allowing users to take advantage of the best tools for the job—even if they're in different programming languages.
%%groovy
import com.twosigma.beakerx.widget.IntSlider
sample_size = new IntSlider()
sample_size.min = 100
sample_size.max = 10000
sample_size.step = 100
sample_size.value = 5000
sample_size
The next cell demonstrates how to use the BeakerX Autotranslation feature. This feature allows you to synchronize data between cells that are written in different programming languages. Assigning the value to the beakerx
object allows us to read it back in other languages later.
%%groovy
beakerx.sample_size = sample_size.value
Now, we can easily read the value of sample_size
from the default language of this notebook, which is Scala.
beakerx.sample_size
It is also available in other languages, for example Python. For more examples in additional languages, see the docs for Autotranslation.
%%python
from beakerx.object import beakerx
beakerx.sample_size
Now, we’ll use the sample size given above to generate a set of records and load them into HDFS. This cell outputs the exit code of the script, which should be 0 on success.
import sys.process._
val runme = "python generate_example.py somedata " + beakerx.sample_size
val exitCode = runme !
The Jupyter package includes the Spark and Hadoop JARs that we need to run our Spark job. So, you don't need to download them from the internet. Now, let’s add them to the classpath.
%classpath add jar /mnt/mesos/sandbox
%classpath add jar /opt/spark/jars/*
%classpath add jar /opt/hadoop/share/hadoop/common/*
%classpath add jar /opt/hadoop/share/hadoop/common/lib/*
%classpath add jar /opt/hadoop/share/hadoop/hdfs/*
%classpath add jar /opt/hadoop/share/hadoop/hdfs/lib/*
%classpath add jar /opt/hadoop/share/hadoop/yarn/*
%classpath add jar /opt/hadoop/share/hadoop/yarn/lib/*
%classpath add jar /opt/hadoop/share/hadoop/mapreduce/*
%classpath add jar /opt/hadoop/share/hadoop/mapreduce/lib/*
%classpath add jar /opt/hadoop/share/hadoop/tools/lib/*
To run distributed Spark jobs on DC/OS using the BeakerX Spark UI, we need to change some of the default settings. Run the cell below and once the UI loads make the following changes:
Once this is done, hit Start to launch a Spark cluster directly from this notebook. Depending on the specs of your cluster, this will take anywhere from a few seconds to a few minutes. Once it's ready, you'll see a star-shaped Spark logo on a green label button, along with additional buttons.
%%spark
The GUI provides useful metrics for working with Spark and allows you to track the progress of your jobs. It also allows you to easily create, save, and select multiple configurations. You can jump directly to the Spark UI by clicking the green Spark logo.
Now, let's write the Spark job that builds a linear model for our dataset. First, we need to import its dependencies.
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType, IntegerType}
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.ml.classification.LogisticRegression
import scala.math.{ pow, sqrt }
Next, we need to load the data from HDFS that was generated previously.
// Create a schema
val schemaStruct = StructType(
StructField("x1", DoubleType) ::
StructField("x2", DoubleType) ::
StructField("y", IntegerType) :: Nil
)
// Use schema to read data from HDFS using default url
val df = spark.read
.option("header", true)
.schema(schemaStruct)
.csv("hdfs://name-0-node.hdfs.autoip.dcos.thisdcos.directory:9001/somedata.csv")
.na.drop()
Once this is complete, display the results with a table view using df.display()
. This will trigger the Spark job and a progress bar will appear. You can click on the ellipses in the column headers for filtering, sorting, and other options.
df.display()
Next, we’ll run a logistic regression and get results. The output of this cell is the model accuracy.
// Break data into training and test
val Array(trainingData, testData) = df.randomSplit(Array(0.8, 0.2))
val labelColumn = "y"
// Use the VectorAssembler to combine the x1, x2 columns into a single vector column
val assembler = new VectorAssembler()
.setInputCols(Array("x1", "x2"))
.setOutputCol("features")
// logistic regression model
val lr = new LogisticRegression()
.setLabelCol(labelColumn)
.setFeaturesCol("features")
.setPredictionCol(labelColumn + "_pred")
.setMaxIter(100)
.setRegParam(0.1)
.setElasticNetParam(0.8)
// Run simple pipeline
val stages = Array(assembler, lr)
val pipeline = new Pipeline().setStages(stages)
val model = pipeline.fit(trainingData)
// We'll make predictions using the model and the test data and get accuracy
val predictions = model.transform(testData)
val accuracy = predictions.filter("y == y_pred").count() / predictions.count().toDouble
The accuracy is only about 0.5 which is equivalent to flipping a coin, so our model isn't very good.
Let's find out why our model performed so poorly by plotting the data.
// Create function to remove dimensionality of a list
def flatten(l: List[Any]): List[Any] = {
def _flatten(res: List[Any], rem: List[Any]):List[Any] = rem match {
case Nil => res
case (h:List[_])::Nil => _flatten(res, h)
case (h:List[_])::tail => _flatten(res:::h, tail)
case h::tail => _flatten(res:::List(h), tail)
}
_flatten(List(), l)
}
// Split positive and negative cases
val dfPOS = df.filter("y == 1")
val dfNEG = df.filter("y == 0")
// Plot the first 300 records of positve and negative cases
val plotPOSx1 = flatten(dfPOS.select("x1").collect().map(_ (0)).toList).map(_.toString.toDouble)
val plotPOSx2 = flatten(dfPOS.select("x2").collect().map(_ (0)).toList).map(_.toString.toDouble)
val plotNEGx1 = flatten(dfNEG.select("x1").collect().map(_ (0)).toList).map(_.toString.toDouble)
val plotNEGx2 = flatten(dfNEG.select("x2").collect().map(_ (0)).toList).map(_.toString.toDouble)
println(" ")
val plot = new Plot { title = "Plot of raw data" }
val list = List(
new Points {
x = plotPOSx1.take(300)
y = plotPOSx2.take(300)
size = 5.0
color = Color.red
},
new Points {
x = plotNEGx1.take(300)
y = plotNEGx2.take(300)
size = 5.0
color = Color.blue
}
)
plot.add(list)
The data is plotted in a circle. So, it’s easy to see why a linear partition won't split the positive and negative cases. Using the Pythagorean Theorem $ a^2 + b^2 = c^2 $ we can map the data into something we can linearly split.
// Transform data and replot
val ax1 = plotPOSx1.take(300).map(pow(_,2))
val ax2 = plotPOSx2.take(300).map(pow(_,2))
val bx1 = plotNEGx1.take(300).map(pow(_,2))
val bx2 = plotNEGx2.take(300).map(pow(_,2))
val plot = new Plot { title = "Plot of data with features squared" }
val list = List(
new Points {
x = ax1
y = ax2
size = 5.0
color = Color.red
},
new Points {
x = bx1
y = bx2
size = 5.0
color = Color.blue
}
)
plot.add(list)
This should be easy for a logistic regression to classify. Let's redo the model with the transformed data and output the accuracy.
// w1 = x1^2, w2 = x2^2
val dfT = df.withColumn("w1", ((df.col("x1")*df.col("x1")))).withColumn("w2", ((df.col("x2")*df.col("x2"))))
val Array(trainingData, testData) = dfT.randomSplit(Array(0.8, 0.2))
val labelColumn = "y"
// Use the VectorAssembler to combine the w1, w2 columns into a single vector column
val assembler = new VectorAssembler()
.setInputCols(Array("w1", "w2"))
.setOutputCol("features")
// logistic regression model used above
val lr = new LogisticRegression()
.setLabelCol(labelColumn)
.setFeaturesCol("features")
.setPredictionCol(labelColumn + "_pred")
.setMaxIter(100)
.setRegParam(0.1)
.setElasticNetParam(0.8)
// Same pipeline as above
val stages = Array(assembler, lr)
val pipeline = new Pipeline().setStages(stages)
val model = pipeline.fit(trainingData)
// We'll make predictions using the model and the test data
val predictions = model.transform(testData)
val accuracy = predictions.filter("y == y_pred").count() / predictions.count().toDouble
As expected, the model works much better, with accuracy being close to 1.0.
But accuracy is only one useful metric. A confusion matrix will give us more details on how our model performed. You can easily create one using scikit-learn, which is a Python library included in the Jupyter package. We can use the BeakerX Autotranslation feature again to pass the data to Python.
val evalData = predictions.select("y", "y_pred").head(predictions.count().toInt)
beakerx.evalData = evalData
println("Data moved to BeakerX")
Now the data is accessible from Python and we can build and plot our confusion matrix using scikit-learn.
%%python
from beakerx.object import beakerx
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
dfEval = pd.DataFrame(' '.join(beakerx.evalData).replace('[','').replace(']','').split())[0].str.split(',', expand = True)
y_true = pd.to_numeric(dfEval[0], downcast='integer')
y_pred = pd.to_numeric(dfEval[1], downcast='integer')
# Print a text report of precision, recall, f1-score, support
print(classification_report(y_true, y_pred))
# Plot the confusion matrix
cm = confusion_matrix(y_true, y_pred)
ax = plt.subplot()
sns.heatmap(cm, annot=True, ax=ax, fmt='g')
ax.set_xlabel("Predicted labels")
ax.set_ylabel("True labels")
ax.set_title("Confusion Matrix")
ax
In this tutorial, we learned how to create an interactive UI within a notebook to read input parameters from a user. We then launched Spark jobs and tracked their progress using the BeakerX Spark UI without leaving the notebook environment. Lastly, we used multiple programming languages in a single notebook and easily synchronized data between them using BeakerX Autotranslation.