from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('treecode').getOrCreate()
data = spark.read.csv("dbfs:/FileStore/shared_uploads/dizhen@hsph.harvard.edu/College.csv",inferSchema=True,header=True)
data.printSchema()
root |-- School: string (nullable = true) |-- Private: string (nullable = true) |-- Apps: integer (nullable = true) |-- Accept: integer (nullable = true) |-- Enroll: integer (nullable = true) |-- Top10perc: integer (nullable = true) |-- Top25perc: integer (nullable = true) |-- F_Undergrad: integer (nullable = true) |-- P_Undergrad: integer (nullable = true) |-- Outstate: integer (nullable = true) |-- Room_Board: integer (nullable = true) |-- Books: integer (nullable = true) |-- Personal: integer (nullable = true) |-- PhD: integer (nullable = true) |-- Terminal: integer (nullable = true) |-- S_F_Ratio: double (nullable = true) |-- perc_alumni: integer (nullable = true) |-- Expend: integer (nullable = true) |-- Grad_Rate: integer (nullable = true)
data.head()
Out[4]: Row(School='Abilene Christian University', Private='Yes', Apps=1660, Accept=1232, Enroll=721, Top10perc=23, Top25perc=52, F_Undergrad=2885, P_Undergrad=537, Outstate=7440, Room_Board=3300, Books=450, Personal=2200, PhD=70, Terminal=78, S_F_Ratio=18.1, perc_alumni=12, Expend=7041, Grad_Rate=60)
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
data.columns
Out[6]: ['School', 'Private', 'Apps', 'Accept', 'Enroll', 'Top10perc', 'Top25perc', 'F_Undergrad', 'P_Undergrad', 'Outstate', 'Room_Board', 'Books', 'Personal', 'PhD', 'Terminal', 'S_F_Ratio', 'perc_alumni', 'Expend', 'Grad_Rate']
assembler = VectorAssembler(
inputCols=['Apps',
'Accept',
'Enroll',
'Top10perc',
'Top25perc',
'F_Undergrad',
'P_Undergrad',
'Outstate',
'Room_Board',
'Books',
'Personal',
'PhD',
'Terminal',
'S_F_Ratio',
'perc_alumni',
'Expend',
'Grad_Rate'],
outputCol="features")
output = assembler.transform(data)
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Private", outputCol="PrivateIndex")
output_fixed = indexer.fit(output).transform(output)
final_data = output_fixed.select("features",'PrivateIndex')
train_data,test_data = final_data.randomSplit([0.7,0.3])
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline
dtc = DecisionTreeClassifier(labelCol='PrivateIndex',featuresCol='features')
rfc = RandomForestClassifier(labelCol='PrivateIndex',featuresCol='features')
gbt = GBTClassifier(labelCol='PrivateIndex',featuresCol='features')
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator(labelCol="PrivateIndex", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)
print("Results:")
print('A single decision tree had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('A ensemble using GBT had an accuracy of: {0:2.2f}%'.format(gbt_acc*100))
Results: A single decision tree had an accuracy of: 91.39% A random forest ensemble had an accuracy of: 95.69% A ensemble using GBT had an accuracy of: 94.74%