%pip install wordcloud
from wordcloud import WordCloud
from wordcloud import ImageColorGenerator
from wordcloud import STOPWORDS
import matplotlib.pyplot as plt
import pandas as pd
%pip install sparknlp
import sparknlp
from sparknlp.base import *
from pyspark.ml import Pipeline
# spark-nlp-1.3.0.jar is attached to the cluster. This library was downloaded from the
# spark-packages repository https://spark-packages.org/package/JohnSnowLabs/spark-nlp
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *
# Create pre-processing stages
# Stage 1: DocumentAssembler as entry point
documentAssembler = DocumentAssembler() \
.setInputCol("value") \
.setOutputCol("document")
# Stage 2: Tokenizer
tokenizer = Tokenizer() \
.setInputCols(["document"]) \
.setOutputCol("token")
# Stage 3: Normalizer to lower text and to remove html tags, hyperlinks, twitter handles,
# alphanumeric characters (integers, floats), timestamps in the format hh:mm (e.g. 10:30) and punctuation
cleanUpPatterns = [r"RT", "<[^>]*>", r"www\S+", r"http\S+", "@[^\s]+", "[\d-]", "\d*\.\d+", "\d*\:\d+", "[^\w\d\s]"]
normalizer = Normalizer() \
.setInputCols("token") \
.setOutputCol("normalized") \
.setCleanupPatterns(cleanUpPatterns) \
.setLowercase(True)
# Stage 4: Remove stopwords
stopwords = StopWordsCleaner()\
.setInputCols("normalized")\
.setOutputCol("cleanTokens")\
.setCaseSensitive(False)
# Stage 5: Lemmatizer
lemma = LemmatizerModel.pretrained() \
.setInputCols(["cleanTokens"]) \
.setOutputCol("lemma")
# Stage 6: Stemmer stems tokens to bring it to root form
#.setInputCols(["cleanTokens"]).setOutputCol("stem") \
stemmer = Stemmer() \
.setInputCols(["lemma"]) \
.setInputCols(["cleanTokens"]) \
.setOutputCol("stem")
# Stage 7: Finisher to convert custom document structure to array of tokens
finisher = Finisher() \
.setInputCols(["stem"]) \
.setOutputCols(["token_features"]) \
.setOutputAsArray(True) \
.setCleanAnnotations(False)
# Create pre-processing stages
# Stage 1: DocumentAssembler as entry point
documentAssembler = DocumentAssembler() \
.setInputCol("value") \
.setOutputCol("document")
# Stage 2: Normalizer to lower text and to remove html tags, hyperlinks, twitter handles, alphanumeric characters (integers, floats) and timestamps in the format hh:mm (e.g. 10:30)
cleanUpPatterns = [r"RT", r"\n\n", "<[^>]*>", r"www\S+", r"http\S+", "@[^\s]+", "[\d-]", "\d*\.\d+", "\d*\:\d+"]
documentNormalizer = DocumentNormalizer() \
.setInputCols("document") \
.setOutputCol("normalizedDocument") \
.setAction("clean") \
.setPatterns(cleanUpPatterns) \
.setReplacement("") \
.setPolicy("pretty_all") \
.setLowercase(True)
DF = (
spark.read \
.format("delta") \
.load("/tmp/delta-tweet-table") \
.createOrReplaceTempView("tweet_data")
)
tweet_df = spark.sql("SELECT * FROM tweet_data")
# Check pre-processing pipeline
cleanup_pipeline = Pipeline(stages=[documentAssembler, documentNormalizer])
empty_df = spark.createDataFrame([['']]).toDF("value")
prep_pipeline_model = cleanup_pipeline.fit(empty_df)
result = prep_pipeline_model.transform(tweet_df)
result.select('normalizedDocument.result').show(truncate=False)
# Check pre-processing pipeline
prep_pipeline = Pipeline(stages=[documentAssembler, tokenizer, normalizer, stopwords, lemma, stemmer, finisher])
empty_df = spark.createDataFrame([['']]).toDF("value")
prep_pipeline_model = prep_pipeline.fit(empty_df)
result = prep_pipeline_model.transform(tweet_df)
# Word cloud for negative tweets
neg = tweet_df.filter(tweet_df.sentiment == "Negative").toPandas()
text = " ".join(i for i in neg.processed_text)
text = text.lower()
stopwords = set(STOPWORDS)
wordcloud = WordCloud(stopwords=stopwords, background_color="white").generate(text)
plt.figure( figsize=(15,10))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()
# Word cloud for negative tweets
pos = tweet_df.filter(tweet_df.sentiment == "Positive").toPandas()
text = " ".join(i for i in pos.processed_text)
text = text.lower()
stopwords = set(STOPWORDS)
wordcloud = WordCloud(stopwords=stopwords, background_color="white").generate(text)
plt.figure( figsize=(15,10))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()