In [1]:
!apt-get install openjdk-8-jdk-headless > /dev/null
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install findspark
--2019-08-23 06:00:11--
Resolving (, 2402:f000:1:408:8100::1
Connecting to (||:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 229988313 (219M) [application/octet-stream]
Saving to: ‘spark-2.4.3-bin-hadoop2.7.tgz’

spark-2.4.3-bin-had 100%[===================>] 219.33M  9.84MB/s    in 28s     

2019-08-23 06:00:39 (7.95 MB/s) - ‘spark-2.4.3-bin-hadoop2.7.tgz’ saved [229988313/229988313]

Requirement already satisfied: findspark in /usr/local/lib/python3.6/dist-packages (1.3.0)
In [2]:
We are running these lines because we are operating on Google Colab
from google.colab import drive

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"
os.chdir('/content/gdrive/My Drive/finch/spark/topic_modelling/book_titles')
Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
In [0]:
import nltk
from nltk.corpus import stopwords

import findspark

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, ArrayType
from import CountVectorizer, IDF
from import LDA
In [0]:
In [5]:'stopwords')
stopwords = set(stopwords.words('english')).union({
    'introduction', 'edition', 'series', 'application',
    'approach', 'card', 'access', 'package', 'plus', 'etext',
    'brief', 'vol', 'fundamental', 'guide', 'essential', 'printed',
    'third', 'second', 'fourth'})

sc = SparkContext('local', 'nlp')
lines = sc.textFile('./all_book_titles.txt')
lines = lines \
    .map(lambda line: line.strip().lower()) \
    .map(lambda line: line.split()) \
    .map(lambda words: [w for w in words if w.isalpha()]) \
    .map(lambda words: [w for w in words if len(w) > 3]) \
    .map(lambda words: [w for w in words if w not in stopwords]) \

sess = SparkSession.builder.appName('nlp').getOrCreate()
df = sess.createDataFrame(lines, ['words', 'idx'])

cv = CountVectorizer(inputCol='words',
cv =
df = cv.transform(df)
df = IDF(inputCol='tf',

lda = LDA(k=N_TOPICS,

for i, indices in enumerate(lda.describeTopics(MAX_TERMS).toPandas().termIndices):
    print('Topic %d:'%(i+1), ' '.join([cv.vocabulary[idx] for idx in indices]))
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Topic 1: probability statistics finance management essentials
Topic 2: mechanics physical differential equations problems
Topic 3: computer design data physics engineers
Topic 4: world database asian human molecular
Topic 5: handbook evolution medical general medicine
Topic 6: political structures theater integrated programming
Topic 7: psychology pharmacology insurance thermodynamics biochemistry
Topic 8: philosophy actuarial reader solutions readings
Topic 9: language natural machine international systems
Topic 10: history science human american volume