!apt-get install openjdk-8-jdk-headless > /dev/null
!wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install findspark
"""
We are running these lines because we are operating on Google Colab
"""
from google.colab import drive
drive.mount('/content/gdrive')
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"
os.chdir('/content/gdrive/My Drive/finch/spark/topic_modelling/book_titles')
import nltk
from nltk.corpus import stopwords
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.clustering import LDA
N_TOPICS = 10
MAX_TERMS = 5
nltk.download('stopwords')
stopwords = set(stopwords.words('english')).union({
'introduction', 'edition', 'series', 'application',
'approach', 'card', 'access', 'package', 'plus', 'etext',
'brief', 'vol', 'fundamental', 'guide', 'essential', 'printed',
'third', 'second', 'fourth'})
sc = SparkContext('local', 'nlp')
lines = sc.textFile('./all_book_titles.txt')
lines = lines \
.map(lambda line: line.strip().lower()) \
.map(lambda line: line.split()) \
.map(lambda words: [w for w in words if w.isalpha()]) \
.map(lambda words: [w for w in words if len(w) > 3]) \
.map(lambda words: [w for w in words if w not in stopwords]) \
.zipWithIndex()
sess = SparkSession.builder.appName('nlp').getOrCreate()
df = sess.createDataFrame(lines, ['words', 'idx'])
cv = CountVectorizer(inputCol='words',
outputCol='tf')
cv = cv.fit(df)
df = cv.transform(df)
df = IDF(inputCol='tf',
outputCol='tfidf').fit(df).transform(df)
lda = LDA(k=N_TOPICS,
featuresCol='tfidf',
optimizer='em').fit(df)
for i, indices in enumerate(lda.describeTopics(MAX_TERMS).toPandas().termIndices):
print('Topic %d:'%(i+1), ' '.join([cv.vocabulary[idx] for idx in indices]))