from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
wine_df = spark.read.csv('winequality-red.csv', header = True, inferSchema=True, sep =';')
wine_df.take(1)
[Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5)]
wine_df.select("alcohol","quality").describe().show()
+-------+------------------+------------------+ |summary| alcohol| quality| +-------+------------------+------------------+ | count| 1599| 1599| | mean|10.422983114446502|5.6360225140712945| | stddev|1.0656675818473935|0.8075694397347051| | min| 8.4| 3| | max| 14.9| 8| +-------+------------------+------------------+
import six
for i in wine_df.columns:
if not( isinstance(wine_df.select(i).take(1)[0][0], six.string_types)):
print( "Correlation to MPG for ", i, wine_df.stat.corr('quality',i))
Correlation to MPG for fixed acidity 0.12405164911322263 Correlation to MPG for volatile acidity -0.3905577802640061 Correlation to MPG for citric acid 0.22637251431804048 Correlation to MPG for residual sugar 0.013731637340065798 Correlation to MPG for chlorides -0.12890655993005293 Correlation to MPG for free sulfur dioxide -0.05065605724427597 Correlation to MPG for total sulfur dioxide -0.18510028892653774 Correlation to MPG for density -0.17491922778336474 Correlation to MPG for pH -0.0577313912053826 Correlation to MPG for sulphates 0.25139707906925995 Correlation to MPG for alcohol 0.4761663240011364 Correlation to MPG for quality 1.0
wine_df = wine_df.drop("residual sugar").drop("free sulfur dioxide") \
.drop("pH").drop("density") \
.drop("chlorides").drop('fixed acidity')
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['volatile acidity', 'citric acid', 'total sulfur dioxide', 'sulphates', 'alcohol'], outputCol = 'features')
vwine_df = vectorAssembler.transform(wine_df)
vwine_df.take(1)
[Row(volatile acidity=0.7, citric acid=0.0, total sulfur dioxide=34.0, sulphates=0.56, alcohol=9.4, quality=5, features=DenseVector([0.7, 0.0, 34.0, 0.56, 9.4]))]
splits = vwine_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='quality', maxIter=10)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
Coefficients: [-1.134803300789282,-0.011468089424519127,-0.0024666032938323525,0.8259746568202124,0.29643567763999223] Intercept: 2.7325981169381355
predictions = lr_model.transform(test_df)
predictions.select("prediction","quality","features").show()
+------------------+-------+--------------------+ | prediction|quality| features| +------------------+-------+--------------------+ | 6.321948236211139| 6|[0.16,0.64,52.0,0...| | 6.383392525710061| 6|[0.18,0.37,109.0,...| | 6.260948682710943| 6|[0.18,0.51,23.0,0...| | 6.260948682710943| 6|[0.18,0.51,23.0,0...| | 5.361153532922475| 5|[0.19,0.21,135.0,...| | 6.328172896188763| 6|[0.19,0.42,30.0,0...| | 6.183009588200807| 6|[0.22,0.24,28.0,0...| | 6.183009588200807| 6|[0.22,0.24,28.0,0...| | 5.553702448141423| 6|[0.22,0.48,60.0,0...| |5.5401318299853965| 4|[0.23,0.37,36.0,0...| | 6.226681409372491| 7|[0.24,0.35,27.0,0...| | 6.469942162899238| 7|[0.24,0.42,22.0,1...| | 6.447170802911483| 7|[0.24,0.46,21.0,1...| | 6.541619485323386| 6|[0.24,0.49,20.0,1...| | 6.711646680769182| 7|[0.25,0.39,10.0,0...| | 6.167801780409271| 6|[0.25,0.46,42.0,0...| | 5.813409467513248| 6|[0.26,0.42,27.0,0...| | 5.603333877120943| 5|[0.26,0.45,49.0,0...| | 6.35712538058033| 6|[0.26,0.48,10.0,0...| | 6.35712538058033| 6|[0.26,0.48,10.0,0...| +------------------+-------+--------------------+ only showing top 20 rows
#Find R2 for Linear Regression
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="quality",metricName="r2")
evaluator.evaluate(predictions)
0.3359148546269666
sc.stop()