from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myproj').getOrCreate()
data = spark.read.csv("dbfs:/FileStore/shared_uploads/dizhen@hsph.harvard.edu/titanic.csv",inferSchema=True,header=True)
data.printSchema()
root |-- PassengerId: integer (nullable = true) |-- Survived: integer (nullable = true) |-- Pclass: integer (nullable = true) |-- Name: string (nullable = true) |-- Sex: string (nullable = true) |-- Age: double (nullable = true) |-- SibSp: integer (nullable = true) |-- Parch: integer (nullable = true) |-- Ticket: string (nullable = true) |-- Fare: double (nullable = true) |-- Cabin: string (nullable = true) |-- Embarked: string (nullable = true)
data.columns
Out[5]: ['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']
my_cols = data.select(['Survived',
'Pclass',
'Sex',
'Age',
'SibSp',
'Parch',
'Fare',
'Embarked'])
my_final_data = my_cols.na.drop()
Handle categorical features
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
OneHotEncoder,StringIndexer)
gender_indexer = StringIndexer(inputCol='Sex',outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')
embark_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')
assembler = VectorAssembler(inputCols=['Pclass',
'SexVec',
'Age',
'SibSp',
'Parch',
'Fare',
'EmbarkVec'],outputCol='features')
Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')
pipeline = Pipeline(stages=[gender_indexer,embark_indexer,
gender_encoder,embark_encoder,
assembler,log_reg_titanic])
train_titanic_data, test_titanic_data = my_final_data.randomSplit([0.7,.3])
fit_model = pipeline.fit(train_titanic_data)
results = fit_model.transform(test_titanic_data)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
labelCol='Survived')
results.select('Survived','prediction').show()
+--------+----------+ |Survived|prediction| +--------+----------+ | 0| 1.0| | 0| 1.0| | 0| 1.0| | 0| 1.0| | 0| 1.0| | 0| 1.0| | 0| 0.0| | 0| 0.0| | 0| 1.0| | 0| 1.0| | 0| 0.0| | 0| 0.0| | 0| 0.0| | 0| 0.0| | 0| 1.0| | 0| 0.0| | 0| 0.0| | 0| 0.0| | 0| 0.0| | 0| 0.0| +--------+----------+ only showing top 20 rows
AUC = my_eval.evaluate(results)
AUC
Out[24]: 0.7758413461538461