!apt-get install openjdk-8-jdk-headless > /dev/null
!wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install findspark
"""
We are running these lines because we are operating on Google Colab
"""
from google.colab import drive
drive.mount('/content/gdrive')
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"
os.chdir('/content/gdrive/My Drive/finch/spark/text_classification/imdb')
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def get_idx2word(_index_from=3):
word2idx = tf.keras.datasets.imdb.get_word_index()
word2idx = {k:(v+_index_from) for k,v in word2idx.items()}
word2idx["<pad>"] = 0
word2idx["<start>"] = 1
word2idx["<unk>"] = 2
idx2word = {idx: w for w, idx in word2idx.items()}
return idx2word
def make_df(x, y):
return sess.createDataFrame(
[(int(y_), [idx2word[idx] for idx in x_]) for x_, y_ in zip(x, y)],
['label', 'words'])
import tensorflow as tf
idx2word = get_idx2word()
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.imdb.load_data(num_words=20000)
sess = SparkSession.builder.appName('imdb').getOrCreate()
pipeline = Pipeline(stages=[
CountVectorizer(inputCol='words', outputCol='tf'),
IDF(inputCol='tf', outputCol='tf_idf'),
LogisticRegression(featuresCol='tf_idf', regParam=1.),
])
df_train = make_df(X_train, y_train)
df_test = make_df(X_test, y_test)
prediction = pipeline.fit(df_train).transform(df_test)
print("Testing Accuracy: {:.3f}".format(
MulticlassClassificationEvaluator(metricName='accuracy').evaluate(prediction)))