In [1]:
!apt-get install openjdk-8-jdk-headless > /dev/null
!wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install findspark
--2019-08-23 06:00:11--  http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.8.193, 2402:f000:1:408:8100::1
Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.8.193|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 229988313 (219M) [application/octet-stream]
Saving to: ‘spark-2.4.3-bin-hadoop2.7.tgz’

spark-2.4.3-bin-had 100%[===================>] 219.33M  9.84MB/s    in 28s     

2019-08-23 06:00:39 (7.95 MB/s) - ‘spark-2.4.3-bin-hadoop2.7.tgz’ saved [229988313/229988313]

Requirement already satisfied: findspark in /usr/local/lib/python3.6/dist-packages (1.3.0)
In [2]:
"""
We are running these lines because we are operating on Google Colab
"""
from google.colab import drive
drive.mount('/content/gdrive')

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"
os.chdir('/content/gdrive/My Drive/finch/spark/topic_modelling/book_titles')
Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
In [0]:
import nltk
from nltk.corpus import stopwords

import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.clustering import LDA
In [0]:
N_TOPICS = 10
MAX_TERMS = 5
In [5]:
nltk.download('stopwords')
stopwords = set(stopwords.words('english')).union({
    'introduction', 'edition', 'series', 'application',
    'approach', 'card', 'access', 'package', 'plus', 'etext',
    'brief', 'vol', 'fundamental', 'guide', 'essential', 'printed',
    'third', 'second', 'fourth'})

sc = SparkContext('local', 'nlp')
lines = sc.textFile('./all_book_titles.txt')
lines = lines \
    .map(lambda line: line.strip().lower()) \
    .map(lambda line: line.split()) \
    .map(lambda words: [w for w in words if w.isalpha()]) \
    .map(lambda words: [w for w in words if len(w) > 3]) \
    .map(lambda words: [w for w in words if w not in stopwords]) \
    .zipWithIndex()

sess = SparkSession.builder.appName('nlp').getOrCreate()
df = sess.createDataFrame(lines, ['words', 'idx'])

cv = CountVectorizer(inputCol='words',
                     outputCol='tf')
cv = cv.fit(df)
df = cv.transform(df)
df = IDF(inputCol='tf',
         outputCol='tfidf').fit(df).transform(df)

lda = LDA(k=N_TOPICS,
          featuresCol='tfidf',
          optimizer='em').fit(df)

for i, indices in enumerate(lda.describeTopics(MAX_TERMS).toPandas().termIndices):
    print('Topic %d:'%(i+1), ' '.join([cv.vocabulary[idx] for idx in indices]))
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Topic 1: probability statistics finance management essentials
Topic 2: mechanics physical differential equations problems
Topic 3: computer design data physics engineers
Topic 4: world database asian human molecular
Topic 5: handbook evolution medical general medicine
Topic 6: political structures theater integrated programming
Topic 7: psychology pharmacology insurance thermodynamics biochemistry
Topic 8: philosophy actuarial reader solutions readings
Topic 9: language natural machine international systems
Topic 10: history science human american volume