Prerequisite
Run notebook week-3.0-data-prep-for-training
before.
In this excercise, you will use:
This excercise is part of the Scaling Machine Learning with Spark book available on the O'Reilly platform or on Amazon.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master('local[*]') \
.appName("week-3-train-with-mllib") \
.getOrCreate()
df_train = spark.read.parquet("../datasets/classified_train_data")
df_test = spark.read.parquet("../datasets/classified_test_data")
df_train.printSchema()
✅ Task :
Your data is clean and organized. It's your turn to create your first ML model with Spark.
Run the next code; it runs the Frequent Patterns Growth algorithm to extract patterns if those exist.
Tweak the minSupport and minCondifence.
Change the minSupport and minCondifence to 0.1 and see what happens.
from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="description", minSupport=0.01, minConfidence=0.09)
fpGrowth_model = fpGrowth.fit(df_train)
# Display frequent itemsets.
fpGrowth_model.freqItemsets.show()
# Display generated association rules.
fpGrowth_model.associationRules.show()
# transform examines the input items against all the association rules and summarize the
# consequents as prediction
fpGrowth_model.transform(df_train).toPandas().transpose()
What did you get?
When tweaking the minSupport=0.21 and minConfidence=0.1
You get:
items | freq |
---|---|
[and] | 389 |
✅ Task :
It's your turn to create your second ML model with Spark - Linear Regression.
Linear regression is a common Statistical Data Analysis technique. It is used to determine the extent to which there is a linear relationship between a dependent variable and one or more independent variables.
But, before jumping right into it, you should know:
Spark ML Linear Regression input is:
label
of type Double - our classificationfeatures
of type - Vector[Double]
- Vector of Double, turn all columns into one column named features.Hence, you will transform all numeric columns into one Vector.
Leave the description
out as it is not relevant for your next task.
For creating features
column we use Vector Assembler
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features",handleInvalid = "skip")
new_df = vecAssembler.transform(dataFrame)
new_df.show()
Vector has two types in Spark:
1. Dense Vector
2. Sparse Vector
Sparse vector is when you have many values in the vector as zero or null.
Dense vector is when most of the values in the vector are non zero or non-null.
🤔 Question
Have you noticed handleInvalid
param?
handleInvalid = "skip"
What happens if you remove it?
Validate yourself with vecAssembler.show()
Notice! You have a new DataFrame now.
Remember to check yourself and work with the new DataFrame
df_train.printSchema()
from pyspark.ml.feature import VectorAssembler
train = df_train.drop('description')
vecAssembler = VectorAssembler(inputCols=['screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name'], outputCol="features", handleInvalid = "skip")
df_train = vecAssembler.transform(train)
df_train.toPandas().transpose()
✅ Task :
To run ML training phase on the scalar vector you need to create DataFrame out of it.
bot and features are the only columns that we care about.
Do it with drop function:
output = df_train.drop("val1","val2")
output_train = df_train.drop('screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name')
output_train.show()
After turning numeric columns into one features
column and dropping description
:
We got left with creating label
column:
Create a new DataFrame with label
column:
Code sample:
df_for_lr = output_train.selectExpr("features", "bot as label")
df_for_lr.show()
df_for_lr.toPandas().transpose()
Notice that now df_for_lr
is your new DataFrame for creating LinearRegression
model
df_for_lr = output_train.selectExpr("features", "bot as label")
test = df_for_lr.fillna({'label':0})
df_for_lr = test
df_for_lr.show()
Run the next code.
Check out where you use the new DataFrame - df_for_lr
It creates a machine learning model out of linear regression.
Tweak the maxIter
,regParam
and elasticNetParam
!
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(df_for_lr)
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
What did you get?
How does it look like?
What is r2
? r2
is a shortcut for R Square:
R-squared is a statistical measure of how close the data are to the fitted regression line. It is also known as the coefficient of determination, or the coefficient of multiple determination for multiple regression.
What is RMSE?
Root Mean Square Error (RMSE) is the standard deviation of the residuals (prediction errors). Residuals are a measure of how far from the regression line data points are; RMSE is a measure of how spread out these residuals are. In other words, it tells you how concentrated the data is around the line of best fit.
Try to play with the parameters and watch how they change.
RMSE alone is meaningless until we compare with the actual label
value, such as mean, min and max.
After such comparison, our RMSE looks pretty good.
Compare RMSE
and mean
output.
After such comparison, our RMSE looks pretty good.
df_for_lr.describe().show()
You built 2 machine learning models!
However, didn't get the best results.
It's OK!
And absolutly normal.
In chapter 4 you learn how to improve it.
lrModel.save("../models/linearRegression_model")
fpGrowth_model.save("../models/fpGrowth_model")