Basics of PySpark ML Lib module
!git clone https://github.com/PacktPublishing/Mastering-Big-Data-Analytics-with-PySpark
Cloning into 'Mastering-Big-Data-Analytics-with-PySpark'... remote: Enumerating objects: 145, done. remote: Counting objects: 100% (145/145), done. remote: Compressing objects: 100% (96/96), done. remote: Total 145 (delta 42), reused 136 (delta 38), pack-reused 0 Receiving objects: 100% (145/145), 333.95 KiB | 9.03 MiB/s, done. Resolving deltas: 100% (42/42), done.
%cd Mastering-Big-Data-Analytics-with-PySpark/
/content/Mastering-Big-Data-Analytics-with-PySpark
!python download_data.py
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz
# !tar -xvf spark-3.0.2-bin-hadoop3.2.tgz
# !pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/Mastering-Big-Data-Analytics-with-PySpark/spark-3.0.2-bin-hadoop3.2"
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HelloWorldApp").getOrCreate()
# Using Spark SQL, we create a dataframe which holds our `hello world` data
df = spark.sql('SELECT "hello world" as c1')
# We can then use the `show()` method to see what the DataFrame we just created looks like
df.show()
+-----------+ | c1| +-----------+ |hello world| +-----------+
spark.stop()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("31LoadingDataFromCSV").getOrCreate()
RATINGS_CSV_LOCATION = "/content/Mastering-Big-Data-Analytics-with-PySpark/data-sets/ml-latest-small/ratings.csv"
df = spark.read.csv(RATINGS_CSV_LOCATION)
df.show()
df.printSchema()
+------+-------+------+---------+ | _c0| _c1| _c2| _c3| +------+-------+------+---------+ |userId|movieId|rating|timestamp| | 1| 1| 4.0|964982703| | 1| 3| 4.0|964981247| | 1| 6| 4.0|964982224| | 1| 47| 5.0|964983815| | 1| 50| 5.0|964982931| | 1| 70| 3.0|964982400| | 1| 101| 5.0|964980868| | 1| 110| 4.0|964982176| | 1| 151| 5.0|964984041| | 1| 157| 5.0|964984100| | 1| 163| 5.0|964983650| | 1| 216| 5.0|964981208| | 1| 223| 3.0|964980985| | 1| 231| 5.0|964981179| | 1| 235| 4.0|964980908| | 1| 260| 5.0|964981680| | 1| 296| 3.0|964982967| | 1| 316| 3.0|964982310| | 1| 333| 5.0|964981179| +------+-------+------+---------+ only showing top 20 rows root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) |-- _c3: string (nullable = true)
What you can see, is that the data is being loaded, but it does not quite appear to be right. Additionally, all the columns appear to be cast as a StringType - which is not ideal. We can fix the aformentioned issues by giving the read.csv() method the correct settings.
To parse the CSV correctly, we are going to need to set the following on our read.csv() method:
# Loading CSV file with proper parsing and inferSchema
df = spark.read.csv(
path=RATINGS_CSV_LOCATION,
sep=",",
header=True,
quote='"',
encoding="UTF-8",
inferSchema=True,
)
# Displaying results of the load
df.show()
df.printSchema()
+------+-------+------+---------+ |userId|movieId|rating|timestamp| +------+-------+------+---------+ | 1| 1| 4.0|964982703| | 1| 3| 4.0|964981247| | 1| 6| 4.0|964982224| | 1| 47| 5.0|964983815| | 1| 50| 5.0|964982931| | 1| 70| 3.0|964982400| | 1| 101| 5.0|964980868| | 1| 110| 4.0|964982176| | 1| 151| 5.0|964984041| | 1| 157| 5.0|964984100| | 1| 163| 5.0|964983650| | 1| 216| 5.0|964981208| | 1| 223| 3.0|964980985| | 1| 231| 5.0|964981179| | 1| 235| 4.0|964980908| | 1| 260| 5.0|964981680| | 1| 296| 3.0|964982967| | 1| 316| 3.0|964982310| | 1| 333| 5.0|964981179| | 1| 349| 4.0|964982563| +------+-------+------+---------+ only showing top 20 rows root |-- userId: integer (nullable = true) |-- movieId: integer (nullable = true) |-- rating: double (nullable = true) |-- timestamp: integer (nullable = true)
# Type safe loading of ratings.csv file
df = spark.read.csv(
path=RATINGS_CSV_LOCATION,
sep=",",
header=True,
quote='"',
encoding="UTF-8",
schema="userId INT, movieId INT, rating DOUBLE, timestamp INT",
)
# Displaying results of the load
df.show()
df.printSchema()
df.describe().show()
df.explain()
+------+-------+------+---------+ |userId|movieId|rating|timestamp| +------+-------+------+---------+ | 1| 1| 4.0|964982703| | 1| 3| 4.0|964981247| | 1| 6| 4.0|964982224| | 1| 47| 5.0|964983815| | 1| 50| 5.0|964982931| | 1| 70| 3.0|964982400| | 1| 101| 5.0|964980868| | 1| 110| 4.0|964982176| | 1| 151| 5.0|964984041| | 1| 157| 5.0|964984100| | 1| 163| 5.0|964983650| | 1| 216| 5.0|964981208| | 1| 223| 3.0|964980985| | 1| 231| 5.0|964981179| | 1| 235| 4.0|964980908| | 1| 260| 5.0|964981680| | 1| 296| 3.0|964982967| | 1| 316| 3.0|964982310| | 1| 333| 5.0|964981179| | 1| 349| 4.0|964982563| +------+-------+------+---------+ only showing top 20 rows root |-- userId: integer (nullable = true) |-- movieId: integer (nullable = true) |-- rating: double (nullable = true) |-- timestamp: integer (nullable = true) +-------+------------------+----------------+------------------+--------------------+ |summary| userId| movieId| rating| timestamp| +-------+------------------+----------------+------------------+--------------------+ | count| 100836| 100836| 100836| 100836| | mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9| | stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8| | min| 1| 1| 0.5| 828124615| | max| 610| 193609| 5.0| 1537799250| +-------+------------------+----------------+------------------+--------------------+ == Physical Plan == FileScan csv [userId#97,movieId#98,rating#99,timestamp#100] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/Mastering-Big-Data-Analytics-with-PySpark/data-sets/ml-latest-sma..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<userId:int,movieId:int,rating:double,timestamp:int>
from pyspark.sql import functions as f
ratings = (
spark.read.csv(
path=RATINGS_CSV_LOCATION,
sep=",",
header=True,
quote='"',
schema="userId INT, movieId INT, rating DOUBLE, timestamp INT",
)
.withColumnRenamed("timestamp", "timestamp_unix")
.withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp_unix")))
)
ratings.show(5)
ratings.printSchema()
ratings.drop("timestamp_unix", "foobar").show(5)
+------+-------+------+--------------+-------------------+ |userId|movieId|rating|timestamp_unix| timestamp| +------+-------+------+--------------+-------------------+ | 1| 1| 4.0| 964982703|2000-07-30 18:45:03| | 1| 3| 4.0| 964981247|2000-07-30 18:20:47| | 1| 6| 4.0| 964982224|2000-07-30 18:37:04| | 1| 47| 5.0| 964983815|2000-07-30 19:03:35| | 1| 50| 5.0| 964982931|2000-07-30 18:48:51| +------+-------+------+--------------+-------------------+ only showing top 5 rows root |-- userId: integer (nullable = true) |-- movieId: integer (nullable = true) |-- rating: double (nullable = true) |-- timestamp_unix: integer (nullable = true) |-- timestamp: timestamp (nullable = true) +------+-------+------+-------------------+ |userId|movieId|rating| timestamp| +------+-------+------+-------------------+ | 1| 1| 4.0|2000-07-30 18:45:03| | 1| 3| 4.0|2000-07-30 18:20:47| | 1| 6| 4.0|2000-07-30 18:37:04| | 1| 47| 5.0|2000-07-30 19:03:35| | 1| 50| 5.0|2000-07-30 18:48:51| +------+-------+------+-------------------+ only showing top 5 rows
MOVIES_CSV_LOCATION = "/content/Mastering-Big-Data-Analytics-with-PySpark/data-sets/ml-latest-small/movies.csv"
movies = (
spark.read.csv(
path=MOVIES_CSV_LOCATION,
sep=",",
header=True,
quote='"',
schema="movieId INT, title STRING, genres STRING",
)
)
movies.show(15, truncate=False)
movies.printSchema()
+-------+----------------------------------+-------------------------------------------+ |movieId|title |genres | +-------+----------------------------------+-------------------------------------------+ |1 |Toy Story (1995) |Adventure|Animation|Children|Comedy|Fantasy| |2 |Jumanji (1995) |Adventure|Children|Fantasy | |3 |Grumpier Old Men (1995) |Comedy|Romance | |4 |Waiting to Exhale (1995) |Comedy|Drama|Romance | |5 |Father of the Bride Part II (1995)|Comedy | |6 |Heat (1995) |Action|Crime|Thriller | |7 |Sabrina (1995) |Comedy|Romance | |8 |Tom and Huck (1995) |Adventure|Children | |9 |Sudden Death (1995) |Action | |10 |GoldenEye (1995) |Action|Adventure|Thriller | |11 |American President, The (1995) |Comedy|Drama|Romance | |12 |Dracula: Dead and Loving It (1995)|Comedy|Horror | |13 |Balto (1995) |Adventure|Animation|Children | |14 |Nixon (1995) |Drama | |15 |Cutthroat Island (1995) |Action|Adventure|Romance | +-------+----------------------------------+-------------------------------------------+ only showing top 15 rows root |-- movieId: integer (nullable = true) |-- title: string (nullable = true) |-- genres: string (nullable = true)
movies.where(f.col("genres") == "Action").show(5, False)
movies.where("genres == 'Action'").show(5, False)
+-------+-----------------------------------------------------------+------+ |movieId|title |genres| +-------+-----------------------------------------------------------+------+ |9 |Sudden Death (1995) |Action| |71 |Fair Game (1995) |Action| |204 |Under Siege 2: Dark Territory (1995) |Action| |251 |Hunted, The (1995) |Action| |667 |Bloodsport 2 (a.k.a. Bloodsport II: The Next Kumite) (1996)|Action| +-------+-----------------------------------------------------------+------+ only showing top 5 rows +-------+-----------------------------------------------------------+------+ |movieId|title |genres| +-------+-----------------------------------------------------------+------+ |9 |Sudden Death (1995) |Action| |71 |Fair Game (1995) |Action| |204 |Under Siege 2: Dark Territory (1995) |Action| |251 |Hunted, The (1995) |Action| |667 |Bloodsport 2 (a.k.a. Bloodsport II: The Next Kumite) (1996)|Action| +-------+-----------------------------------------------------------+------+ only showing top 5 rows
movie_genre = (
movies
.withColumn("genres_array", f.split("genres", "\|"))
.withColumn("genre", f.explode("genres_array"))
.select("movieId", "title", "genre")
)
movie_genre.show(10, False)
+-------+-----------------------+---------+ |movieId|title |genre | +-------+-----------------------+---------+ |1 |Toy Story (1995) |Adventure| |1 |Toy Story (1995) |Animation| |1 |Toy Story (1995) |Children | |1 |Toy Story (1995) |Comedy | |1 |Toy Story (1995) |Fantasy | |2 |Jumanji (1995) |Adventure| |2 |Jumanji (1995) |Children | |2 |Jumanji (1995) |Fantasy | |3 |Grumpier Old Men (1995)|Comedy | |3 |Grumpier Old Men (1995)|Romance | +-------+-----------------------+---------+ only showing top 10 rows
available_genres = movie_genre.select("genre").distinct()
available_genres.show()
movies_without_genre = movies.where(f.col("genres") == "(no genres listed)")
print(movies_without_genre.count())
movies_without_genre.show()
+------------------+ | genre| +------------------+ | Crime| | Romance| | Thriller| | Adventure| | Drama| | War| | Documentary| | Fantasy| | Mystery| | Musical| | Animation| | Film-Noir| |(no genres listed)| | IMAX| | Horror| | Western| | Comedy| | Children| | Action| | Sci-Fi| +------------------+ 34 +-------+--------------------+------------------+ |movieId| title| genres| +-------+--------------------+------------------+ | 114335| La cravate (1957)|(no genres listed)| | 122888| Ben-hur (2016)|(no genres listed)| | 122896|Pirates of the Ca...|(no genres listed)| | 129250| Superfast! (2015)|(no genres listed)| | 132084| Let It Be Me (1995)|(no genres listed)| | 134861|Trevor Noah: Afri...|(no genres listed)| | 141131| Guardians (2016)|(no genres listed)| | 141866| Green Room (2015)|(no genres listed)| | 142456|The Brand New Tes...|(no genres listed)| | 143410| Hyena Road|(no genres listed)| | 147250|The Adventures of...|(no genres listed)| | 149330|A Cosmic Christma...|(no genres listed)| | 152037| Grease Live (2016)|(no genres listed)| | 155589|Noin 7 veljestä (...|(no genres listed)| | 156605| Paterson|(no genres listed)| | 159161|Ali Wong: Baby Co...|(no genres listed)| | 159779|A Midsummer Night...|(no genres listed)| | 161008|The Forbidden Dan...|(no genres listed)| | 165489|Ethel & Ernest (2...|(no genres listed)| | 166024| Whiplash (2013)|(no genres listed)| +-------+--------------------+------------------+ only showing top 20 rows
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
spark = SparkSession.builder.appName("join_tests").getOrCreate()
schema = StructType(
[StructField("id", IntegerType()), StructField("value", StringType())]
)
A = spark.createDataFrame(
schema=schema, data=[
(1, "A"),
(2, "B"),
(3, "C"),
(4, "D"),
(5, "E"),
(None, "Z")
]
)
B = spark.createDataFrame(
schema=schema, data=[
(3, "C"),
(4, "D"),
(5, "E"),
(6, "F"),
(7, "G")
]
)
A.show()
B.show()
+----+-----+ | id|value| +----+-----+ | 1| A| | 2| B| | 3| C| | 4| D| | 5| E| |null| Z| +----+-----+ +---+-----+ | id|value| +---+-----+ | 3| C| | 4| D| | 5| E| | 6| F| | 7| G| +---+-----+
# INNER JOINS
# A.join(B, ["id"], "inner").show()
# CROSS JOINS (CARTESIAN PRODUCT)
# A.crossJoin(B).show()
# FULL JOINS
# A.join(B, ["id"], "outer").show()
# A.join(B, ["id"], "full").show()
# A.join(B, ["id"], "full_outer").show()
# LEFT OUTER
# A.join(B, ["id"], "left").show()
# A.join(B, ["id"], "left_outer").show()
# RIGHT OUTER
# A.join(B, ["id"], "right").show()
# A.join(B, ["id"], "right_outer").show()
# LEFT SPECIAL
# A.join(B, ["id"], "left_semi").show()
# A.join(B, ["id"], "left_anti").show()
links = spark.read.csv(
path="/content/Mastering-Big-Data-Analytics-with-PySpark/data-sets/ml-latest-small/links.csv",
sep=",",
header=True,
quote='"',
schema="movieId INT, imdbId STRING, tmdbId INT",
)
tags = spark.read.csv(
path="/content/Mastering-Big-Data-Analytics-with-PySpark/data-sets/ml-latest-small/tags.csv",
sep=",",
header=True,
quote='"',
inferSchema=True,
schema="userId INT, movieId INT, tag STRING, timestamp INT",
).withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))
links.show(5)
tags.show(5)
+-------+-------+------+ |movieId| imdbId|tmdbId| +-------+-------+------+ | 1|0114709| 862| | 2|0113497| 8844| | 3|0113228| 15602| | 4|0114885| 31357| | 5|0113041| 11862| +-------+-------+------+ only showing top 5 rows +------+-------+---------------+-------------------+ |userId|movieId| tag| timestamp| +------+-------+---------------+-------------------+ | 2| 60756| funny|2015-10-24 19:29:54| | 2| 60756|Highly quotable|2015-10-24 19:29:56| | 2| 60756| will ferrell|2015-10-24 19:29:52| | 2| 89774| Boxing story|2015-10-24 19:33:27| | 2| 89774| MMA|2015-10-24 19:33:20| +------+-------+---------------+-------------------+ only showing top 5 rows
movie_per_genre = movie_genre.groupBy("genre").count()
movie_per_genre.show()
+------------------+-----+ | genre|count| +------------------+-----+ | Crime| 1199| | Romance| 1596| | Thriller| 1894| | Adventure| 1263| | Drama| 4361| | War| 382| | Documentary| 440| | Fantasy| 779| | Mystery| 573| | Musical| 334| | Animation| 611| | Film-Noir| 87| |(no genres listed)| 34| | IMAX| 158| | Horror| 978| | Western| 167| | Comedy| 3756| | Children| 664| | Action| 1828| | Sci-Fi| 980| +------------------+-----+
# opinions = movies.join(tags, movies['movieId'] == tags['movieId'])
# opinions = movies.join(tags, ["movieId"])
opinions = movies.join(tags, ["movieId"], "inner")
opinions.show()
+-------+--------------------+--------------------+------+----------------+-------------------+ |movieId| title| genres|userId| tag| timestamp| +-------+--------------------+--------------------+------+----------------+-------------------+ | 1| Toy Story (1995)|Adventure|Animati...| 567| fun|2018-05-02 18:33:33| | 1| Toy Story (1995)|Adventure|Animati...| 474| pixar|2006-01-14 02:47:05| | 1| Toy Story (1995)|Adventure|Animati...| 336| pixar|2006-02-04 09:36:04| | 2| Jumanji (1995)|Adventure|Childre...| 474| game|2006-01-16 01:39:12| | 2| Jumanji (1995)|Adventure|Childre...| 62| Robin Williams|2018-06-12 22:51:47| | 2| Jumanji (1995)|Adventure|Childre...| 62|magic board game|2018-06-12 22:52:12| | 2| Jumanji (1995)|Adventure|Childre...| 62| fantasy|2018-06-12 22:52:09| | 3|Grumpier Old Men ...| Comedy|Romance| 289| old|2006-03-27 02:01:00| | 3|Grumpier Old Men ...| Comedy|Romance| 289| moldy|2006-03-27 02:01:00| | 4|Waiting to Exhale...|Comedy|Drama|Romance| null| null| null| | 5|Father of the Bri...| Comedy| 474| remake|2006-01-16 01:11:43| | 5|Father of the Bri...| Comedy| 474| pregnancy|2006-01-16 01:11:43| | 6| Heat (1995)|Action|Crime|Thri...| null| null| null| | 7| Sabrina (1995)| Comedy|Romance| 474| remake|2006-01-16 01:40:42| | 8| Tom and Huck (1995)| Adventure|Children| null| null| null| | 9| Sudden Death (1995)| Action| null| null| null| | 10| GoldenEye (1995)|Action|Adventure|...| null| null| null| | 11|American Presiden...|Comedy|Drama|Romance| 474| president|2006-01-16 01:28:24| | 11|American Presiden...|Comedy|Drama|Romance| 474| politics|2006-01-16 01:28:24| | 12|Dracula: Dead and...| Comedy|Horror| null| null| null| +-------+--------------------+--------------------+------+----------------+-------------------+ only showing top 20 rows
opinions = (
movies
.join(tags, ["movieId"], "inner")
.select("userId", "movieId", "title", "tag", "timestamp")
)
opinions.show(5, False)
+------+-------+----------------+--------------+-------------------+ |userId|movieId|title |tag |timestamp | +------+-------+----------------+--------------+-------------------+ |567 |1 |Toy Story (1995)|fun |2018-05-02 18:33:33| |474 |1 |Toy Story (1995)|pixar |2006-01-14 02:47:05| |336 |1 |Toy Story (1995)|pixar |2006-02-04 09:36:04| |474 |2 |Jumanji (1995) |game |2006-01-16 01:39:12| |62 |2 |Jumanji (1995) |Robin Williams|2018-06-12 22:51:47| +------+-------+----------------+--------------+-------------------+ only showing top 5 rows
opinions_ext = opinions.withColumnRenamed("timestamp", "tag_time").join(ratings, ["movieId", "userId"])
opinions_ext.show(5)
+-------+------+----------------+--------------+-------------------+------+--------------+-------------------+ |movieId|userId| title| tag| tag_time|rating|timestamp_unix| timestamp| +-------+------+----------------+--------------+-------------------+------+--------------+-------------------+ | 1| 567|Toy Story (1995)| fun|2018-05-02 18:33:33| 3.5| 1525286001|2018-05-02 18:33:21| | 1| 474|Toy Story (1995)| pixar|2006-01-14 02:47:05| 4.0| 978575760|2001-01-04 02:36:00| | 1| 336|Toy Story (1995)| pixar|2006-02-04 09:36:04| 4.0| 1122227329|2005-07-24 17:48:49| | 2| 474| Jumanji (1995)| game|2006-01-16 01:39:12| 3.0| 1046886814|2003-03-05 17:53:34| | 2| 62| Jumanji (1995)|Robin Williams|2018-06-12 22:51:47| 4.0| 1528843890|2018-06-12 22:51:30| +-------+------+----------------+--------------+-------------------+------+--------------+-------------------+ only showing top 5 rows
ratings.groupBy("movieId").agg(
f.count("*"),
f.min("rating"),
f.min("rating"),
f.avg("rating"),
f.min("timestamp"),
f.max("timestamp"),
).show(5)
+-------+--------+-----------+-----------+-----------------+-------------------+-------------------+ |movieId|count(1)|min(rating)|min(rating)| avg(rating)| min(timestamp)| max(timestamp)| +-------+--------+-----------+-----------+-----------------+-------------------+-------------------+ | 1580| 165| 0.5| 0.5|3.487878787878788|1997-07-07 12:07:18|2018-07-22 13:30:52| | 2366| 25| 1.5| 1.5| 3.64|1999-11-04 15:23:49|2018-02-20 10:20:35| | 3175| 75| 1.0| 1.0| 3.58|1999-12-26 14:01:31|2018-06-25 05:07:19| | 1088| 42| 1.0| 1.0|3.369047619047619|1997-04-07 07:36:08|2018-01-17 01:52:47| | 32460| 4| 3.5| 3.5| 4.25|2011-12-18 19:21:21|2017-04-21 20:12:30| +-------+--------+-----------+-----------+-----------------+-------------------+-------------------+ only showing top 5 rows
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder.appName("als-recommender").getOrCreate()
ratings = (
spark.read.csv(
path="/content/Mastering-Big-Data-Analytics-with-PySpark/data-sets/ml-latest-small/ratings.csv",
sep=",",
header=True,
quote='"',
schema="userId INT, movieId INT, rating DOUBLE, timestamp INT",
)
# .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))
.drop("timestamp")
.cache()
)
from pyspark.ml.recommendation import ALS
model = (
ALS(
userCol="userId",
itemCol="movieId",
ratingCol="rating",
).fit(ratings)
)
predictions = model.transform(ratings)
predictions.show(10, False)
+------+-------+------+----------+ |userId|movieId|rating|prediction| +------+-------+------+----------+ |191 |148 |5.0 |4.921467 | |133 |471 |4.0 |3.1737905 | |597 |471 |2.0 |3.9179363 | |385 |471 |4.0 |2.9745612 | |436 |471 |3.0 |3.6886308 | |602 |471 |4.0 |3.6024406 | |91 |471 |1.0 |2.3480353 | |409 |471 |3.0 |3.8320298 | |372 |471 |3.0 |2.9738774 | |599 |471 |2.5 |2.7476397 | +------+-------+------+----------+ only showing top 10 rows
model.userFactors.show(5)
+---+--------------------+ | id| features| +---+--------------------+ | 10|[-0.028317755, 0....| | 20|[-1.2629544, 0.57...| | 30|[-0.64685774, -0....| | 40|[-1.2631711, 0.46...| | 50|[-0.96600693, -0....| +---+--------------------+ only showing top 5 rows
model.itemFactors.show(5)
+---+--------------------+ | id| features| +---+--------------------+ | 10|[-0.96596885, 0.0...| | 20|[-0.43237323, -0....| | 30|[-0.8408578, -0.2...| | 40|[-0.20028046, -0....| | 50|[-1.3041071, 0.09...| +---+--------------------+ only showing top 5 rows
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
als = ALS(
userCol="userId",
itemCol="movieId",
ratingCol="rating",
)
(training_data, validation_data) = ratings.randomSplit([8.0, 2.0])
evaluator = RegressionEvaluator(
metricName="rmse", labelCol="rating", predictionCol="prediction"
)
model = als.fit(training_data)
predictions = model.transform(validation_data)
predictions.show(10, False)
+------+-------+------+----------+ |userId|movieId|rating|prediction| +------+-------+------+----------+ |218 |471 |4.0 |3.1557236 | |217 |471 |2.0 |2.904364 | |136 |471 |4.0 |3.0653472 | |273 |471 |5.0 |4.178053 | |287 |471 |4.5 |2.9642751 | |469 |471 |5.0 |3.2853742 | |191 |496 |5.0 |NaN | |159 |1088 |4.0 |2.7479868 | |599 |1088 |2.5 |2.4010575 | |169 |1088 |4.5 |4.196451 | +------+-------+------+----------+ only showing top 10 rows
rmse = evaluator.evaluate(predictions.na.drop())
print(rmse)
0.8793350703923346
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
parameter_grid = (
ParamGridBuilder()
.addGrid(als.rank, [1, 5, 10])
.addGrid(als.maxIter, [20])
.addGrid(als.regParam, [0.05, 0.1])
.build()
)
from pprint import pprint
pprint(parameter_grid)
[{Param(parent='ALS_3a101ba5e05e', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='ALS_3a101ba5e05e', name='regParam', doc='regularization parameter (>= 0).'): 0.05, Param(parent='ALS_3a101ba5e05e', name='rank', doc='rank of the factorization'): 1}, {Param(parent='ALS_3a101ba5e05e', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='ALS_3a101ba5e05e', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='ALS_3a101ba5e05e', name='rank', doc='rank of the factorization'): 1}, {Param(parent='ALS_3a101ba5e05e', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='ALS_3a101ba5e05e', name='regParam', doc='regularization parameter (>= 0).'): 0.05, Param(parent='ALS_3a101ba5e05e', name='rank', doc='rank of the factorization'): 5}, {Param(parent='ALS_3a101ba5e05e', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='ALS_3a101ba5e05e', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='ALS_3a101ba5e05e', name='rank', doc='rank of the factorization'): 5}, {Param(parent='ALS_3a101ba5e05e', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='ALS_3a101ba5e05e', name='regParam', doc='regularization parameter (>= 0).'): 0.05, Param(parent='ALS_3a101ba5e05e', name='rank', doc='rank of the factorization'): 10}, {Param(parent='ALS_3a101ba5e05e', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='ALS_3a101ba5e05e', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='ALS_3a101ba5e05e', name='rank', doc='rank of the factorization'): 10}]
crossvalidator = CrossValidator(
estimator=als,
estimatorParamMaps=parameter_grid,
evaluator=evaluator,
numFolds=2,
)
crossval_model = crossvalidator.fit(training_data)
predictions = crossval_model.transform(validation_data)
rmse = evaluator.evaluate(predictions.na.drop())
print(rmse)
1.014127339763445
model = crossval_model.bestModel
Vectors and Matrices
import numpy as np
import scipy.sparse as sps
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.linalg import Matrix, Matrices
spark = SparkSession.builder.getOrCreate()
# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
print("Dense vector 1:", dv1)
print("Dense vector 2:", dv2)
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))
print("Sparse vector 1:", sv1)
print("Sparse vector 2:", sv2)
# Create a dense matrix
dm = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])
# Create a sparse matrix
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
print("Dense matrix:", dm)
print("Sparse matrix:", sm)
Dense vector 1: [1. 0. 3.] Dense vector 2: [1.0, 0.0, 3.0] Sparse vector 1: (3,[0,2],[1.0,3.0]) Sparse vector 2: (0, 0) 1.0 (2, 0) 3.0 Dense matrix: DenseMatrix([[1., 2.], [3., 4.], [5., 6.]]) Sparse matrix: 3 X 2 CSCMatrix (0,0) 9.0 (2,1) 6.0 (1,1) 8.0
Images
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
PATH = "./spark-3.0.2-bin-hadoop3.2/data/mllib/images/origin/kittens"
df = (
spark.read.format("image")
.option("dropInvalid", True)
.load(PATH)
.select("image.origin", "image.height", "image.width", "image.nChannels", "image.mode", "image.data")
)
df.toPandas()
origin | height | width | nChannels | mode | data | |
---|---|---|---|---|---|---|
0 | file:///content/Mastering-Big-Data-Analytics-w... | 311 | 300 | 3 | 16 | [193, 193, 193, 194, 194, 194, 194, 194, 194, ... |
1 | file:///content/Mastering-Big-Data-Analytics-w... | 313 | 199 | 3 | 16 | [208, 229, 237, 202, 223, 231, 210, 231, 239, ... |
2 | file:///content/Mastering-Big-Data-Analytics-w... | 200 | 300 | 3 | 16 | [88, 93, 96, 88, 93, 96, 88, 93, 96, 89, 94, 9... |
3 | file:///content/Mastering-Big-Data-Analytics-w... | 296 | 300 | 3 | 16 | [203, 230, 244, 202, 229, 243, 201, 228, 242, ... |
libSVM
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
PATH = "./spark-3.0.2-bin-hadoop3.2/data/mllib/sample_libsvm_data.txt"
df = spark.read.format("libsvm").option("numFeatures", "780").load(PATH)
df.show()
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|(780,[127,128,129...| | 1.0|(780,[158,159,160...| | 1.0|(780,[124,125,126...| | 1.0|(780,[152,153,154...| | 1.0|(780,[151,152,153...| | 0.0|(780,[129,130,131...| | 1.0|(780,[158,159,160...| | 1.0|(780,[99,100,101,...| | 0.0|(780,[154,155,156...| | 0.0|(780,[127,128,129...| | 1.0|(780,[154,155,156...| | 0.0|(780,[153,154,155...| | 0.0|(780,[151,152,153...| | 1.0|(780,[129,130,131...| | 0.0|(780,[154,155,156...| | 1.0|(780,[150,151,152...| | 0.0|(780,[124,125,126...| | 0.0|(780,[152,153,154...| | 1.0|(780,[97,98,99,12...| | 1.0|(780,[124,125,126...| +-----+--------------------+ only showing top 20 rows
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Prepare training documents, which are labeled.
training_data = [
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0),
]
training = spark.createDataFrame(training_data, ["id", "text", "label"])
print("Dataset used for training (labeled):")
training.show()
# Prepare test documents, which are unlabeled.
test_data = [
(4, "spark i j k"),
(5, "l m n"),
(6, "mapreduce spark"),
(7, "apache hadoop"),
]
test = spark.createDataFrame(test_data, ["id", "text"],)
print("Dataset used for testing (unlabeled):")
test.show()
Dataset used for training (labeled): +---+----------------+-----+ | id| text|label| +---+----------------+-----+ | 0| a b c d e spark| 1.0| | 1| b d| 0.0| | 2| spark f g h| 1.0| | 3|hadoop mapreduce| 0.0| | 4| b spark who| 1.0| | 5| g d a y| 0.0| | 6| spark fly| 1.0| | 7| was mapreduce| 0.0| | 8| e spark program| 1.0| | 9| a e c l| 0.0| | 10| spark compile| 1.0| | 11| hadoop software| 0.0| +---+----------------+-----+ Dataset used for testing (unlabeled): +---+---------------+ | id| text| +---+---------------+ | 4| spark i j k| | 5| l m n| | 6|mapreduce spark| | 7| apache hadoop| +---+---------------+
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = (
ParamGridBuilder()
.addGrid(hashingTF.numFeatures, [10, 100, 1000])
.addGrid(lr.regParam, [0.1, 0.01])
.build()
)
crossval = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2,
) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
selected.show(100, False)
+---+---------------+----------------------------------------+----------+ |id |text |probability |prediction| +---+---------------+----------------------------------------+----------+ |4 |spark i j k |[0.2661287892091301,0.7338712107908699] |1.0 | |5 |l m n |[0.9209302389399868,0.07906976106001318]|0.0 | |6 |mapreduce spark|[0.4429343598469927,0.5570656401530073] |1.0 | |7 |apache hadoop |[0.8583692828862762,0.14163071711372377]|0.0 | +---+---------------+----------------------------------------+----------+
import pandas as pd
from IPython.core.display import display
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
# General settings for display purposes
pd.options.display.max_columns = None
pd.options.display.max_rows = None
pd.options.display.max_colwidth = 144
sns.set(color_codes=True)
spark = SparkSession.builder.getOrCreate()
# Source sentiment140: http://help.sentiment140.com/for-students/
schema = "polarity FLOAT, id LONG, date_time STRING, query STRING, user STRING, text STRING"
spark_reader = spark.read.schema(schema)
# file 1: testdata.manual.2009.06.14.csv
TESTDATA_PATH = ("./data-sets/sentiment-140-training-data/testdata.manual.2009.06.14.csv")
raw_test_data = spark_reader.csv(
TESTDATA_PATH,
quote='"',
header=False,
inferSchema=True,
columnNameOfCorruptRecord="corrupt_data",
).cache()
# file 2: training.1600000.processed.noemoticon.csv
TRAININGDATA_PATH = "./data-sets/sentiment-140-training-data/training.1600000.processed.noemoticon.csv"
raw_training_data = spark_reader.csv(
TRAININGDATA_PATH,
quote='"',
header=False,
inferSchema=True,
columnNameOfCorruptRecord="corrupt_data",
).cache()
# path that we will write our raw data to
OUTPUT_PATH = ("./data-sets/sentiment-140-training-data/RAW")
# Count of data
print(f"Overall data count: {raw_test_data.count()}")
# Data summary
display(raw_test_data.summary().toPandas())
print("Data schema")
raw_test_data.printSchema()
# Let's look at 50 rows of data
display(raw_test_data.limit(50).toPandas())
Overall data count: 498
summary | polarity | id | date_time | query | user | text | |
---|---|---|---|---|---|---|---|
0 | count | 498 | 498 | 498 | 498 | 498 | 498 |
1 | mean | 2.0200803212851404 | 1867.2269076305222 | None | 46.0 | None | None |
2 | stddev | 1.6996858490577658 | 2834.891681137318 | None | 5.163977794943222 | None | None |
3 | min | 0.0 | 3 | Fri May 15 06:45:54 UTC 2009 | """booz allen""" | 5x1llz | """The Republican party is a bunch of anti-abo... |
4 | 25% | 0.0 | 388 | None | 40.0 | None | None |
5 | 50% | 2.0 | 1013 | None | 50.0 | None | None |
6 | 75% | 4.0 | 2367 | None | 50.0 | None | None |
7 | max | 4.0 | 14076 | Wed May 27 23:59:18 UTC 2009 | yankees | zedomax | zomg!!! I have a G2!!!!!!! |
Data schema root |-- polarity: float (nullable = true) |-- id: long (nullable = true) |-- date_time: string (nullable = true) |-- query: string (nullable = true) |-- user: string (nullable = true) |-- text: string (nullable = true)
polarity | id | date_time | query | user | text | |
---|---|---|---|---|---|---|
0 | 4.0 | 3 | Mon May 11 03:17:40 UTC 2009 | kindle2 | tpryan | @stellargirl I loooooooovvvvvveee my Kindle2. ... |
1 | 4.0 | 4 | Mon May 11 03:18:03 UTC 2009 | kindle2 | vcu451 | Reading my kindle2... Love it... Lee childs i... |
2 | 4.0 | 5 | Mon May 11 03:18:54 UTC 2009 | kindle2 | chadfu | Ok, first assesment of the #kindle2 ...it fuck... |
3 | 4.0 | 6 | Mon May 11 03:19:04 UTC 2009 | kindle2 | SIX15 | @kenburbary You'll love your Kindle2. I've had... |
4 | 4.0 | 7 | Mon May 11 03:21:41 UTC 2009 | kindle2 | yamarama | @mikefish Fair enough. But i have the Kindle2... |
5 | 4.0 | 8 | Mon May 11 03:22:00 UTC 2009 | kindle2 | GeorgeVHulme | @richardebaker no. it is too big. I'm quite ha... |
6 | 0.0 | 9 | Mon May 11 03:22:30 UTC 2009 | aig | Seth937 | Fuck this economy. I hate aig and their non lo... |
7 | 4.0 | 10 | Mon May 11 03:26:10 UTC 2009 | jquery | dcostalis | Jquery is my new best friend. |
8 | 4.0 | 11 | Mon May 11 03:27:15 UTC 2009 | PJ_King | Loves twitter | |
9 | 4.0 | 12 | Mon May 11 03:29:20 UTC 2009 | obama | mandanicole | how can you not love Obama? he makes jokes abo... |
10 | 2.0 | 13 | Mon May 11 03:32:42 UTC 2009 | obama | jpeb | Check this video out -- President Obama at the... |
11 | 0.0 | 14 | Mon May 11 03:32:48 UTC 2009 | obama | kylesellers | @Karoli I firmly believe that Obama/Pelosi hav... |
12 | 4.0 | 15 | Mon May 11 03:33:38 UTC 2009 | obama | theviewfans | House Correspondents dinner was last night who... |
13 | 4.0 | 16 | Mon May 11 05:05:58 UTC 2009 | nike | MumsFP | Watchin Espn..Jus seen this new Nike Commerica... |
14 | 0.0 | 17 | Mon May 11 05:06:22 UTC 2009 | nike | vincentx24x | dear nike, stop with the flywire. that shit is... |
15 | 4.0 | 18 | Mon May 11 05:20:15 UTC 2009 | lebron | cameronwylie | #lebron best athlete of our generation, if not... |
16 | 0.0 | 19 | Mon May 11 05:20:28 UTC 2009 | lebron | luv8242 | I was talking to this guy last night and he wa... |
17 | 4.0 | 20 | Mon May 11 05:21:04 UTC 2009 | lebron | mtgillikin | i love lebron. http://bit.ly/PdHur |
18 | 0.0 | 21 | Mon May 11 05:21:37 UTC 2009 | lebron | ursecretdezire | @ludajuice Lebron is a Beast, but I'm still ch... |
19 | 4.0 | 22 | Mon May 11 05:21:45 UTC 2009 | lebron | Native_01 | @Pmillzz lebron IS THE BOSS |
20 | 4.0 | 23 | Mon May 11 05:22:03 UTC 2009 | lebron | princezzcutz | @sketchbug Lebron is a hometown hero to me, lo... |
21 | 4.0 | 24 | Mon May 11 05:22:12 UTC 2009 | lebron | peterlikewhat | lebron and zydrunas are such an awesome duo |
22 | 4.0 | 25 | Mon May 11 05:22:37 UTC 2009 | lebron | emceet | @wordwhizkid Lebron is a beast... nobody in th... |
23 | 4.0 | 26 | Mon May 11 06:02:24 UTC 2009 | iphone app | CocoSavanna | downloading apps for my iphone! So much fun :-... |
24 | 4.0 | 33 | Mon May 11 19:47:29 UTC 2009 | visa | DreambigRadio | good news, just had a call from the Visa offic... |
25 | 4.0 | 34 | Mon May 11 19:49:21 UTC 2009 | fredwilson | andrewwatson | http://twurl.nl/epkr4b - awesome come back fro... |
26 | 4.0 | 35 | Mon May 11 19:50:07 UTC 2009 | fredwilson | fredwilson | In montreal for a long weekend of R&R. Muc... |
27 | 4.0 | 46 | Thu May 14 02:58:07 UTC 2009 | """booz allen""" | JoeSchueller | Booz Allen Hamilton has a bad ass homegrown so... |
28 | 4.0 | 47 | Thu May 14 02:58:23 UTC 2009 | """booz allen""" | scottabel | [#MLUC09] Customer Innovation Award Winner: Bo... |
29 | 4.0 | 49 | Thu May 14 05:24:50 UTC 2009 | 40d | JustMe_D | @SoChi2 I current use the Nikon D90 and love i... |
30 | 2.0 | 50 | Thu May 14 05:25:04 UTC 2009 | 40d | hiteshbagai | need suggestions for a good IR filter for my c... |
31 | 2.0 | 117 | Sat May 16 16:18:36 UTC 2009 | Annimallover | @surfit: I just checked my google for my busin... | |
32 | 4.0 | 118 | Sat May 16 16:19:04 UTC 2009 | J_Holl | @phyreman9 Google is always a good place to lo... | |
33 | 0.0 | 119 | Sat May 16 16:19:24 UTC 2009 | vamsmack | Played with an android google phone. The slide... | |
34 | 0.0 | 120 | Sat May 16 16:25:41 UTC 2009 | aig | schroncd | US planning to resume the military tribunals a... |
35 | 0.0 | 121 | Sat May 16 22:42:07 UTC 2009 | itchy | MarissaLeeD | omg so bored & my tattoooos are so itchy!!... |
36 | 0.0 | 122 | Sat May 16 22:42:25 UTC 2009 | itchy | robloposky | I'm itchy and miserable! |
37 | 0.0 | 123 | Sat May 16 22:42:44 UTC 2009 | itchy | EdwinLValencia | @sekseemess no. I'm not itchy for now. Maybe l... |
38 | 4.0 | 124 | Sat May 16 23:48:15 UTC 2009 | stanford | imusicmash | RT @jessverr I love the nerdy Stanford human b... |
39 | 4.0 | 125 | Sat May 16 23:58:34 UTC 2009 | lyx | drewloewe | @spinuzzi: Has been a bit crazy, with steep le... |
40 | 4.0 | 131 | Sun May 17 15:05:03 UTC 2009 | Danny Gokey | VickyTigger | "I'm listening to ""P.Y.T"" by Danny Gokey <... |
41 | 4.0 | 132 | Sun May 17 17:27:45 UTC 2009 | sleep | babblyabbie | is going to sleep then on a bike ride:] |
42 | 0.0 | 133 | Sun May 17 17:27:49 UTC 2009 | sleep | kisjoaquin | cant sleep... my tooth is aching. |
43 | 0.0 | 134 | Sun May 17 17:28:02 UTC 2009 | sleep | Whacktackular | Blah, blah, blah same old same old. No plans t... |
44 | 0.0 | 135 | Sun May 17 17:29:50 UTC 2009 | san francisco | Adrigonzo | glad i didnt do Bay to Breakers today, it's 10... |
45 | 2.0 | 136 | Sun May 17 17:30:19 UTC 2009 | san francisco | sulu34 | is in San Francisco at Bay to Breakers. |
46 | 2.0 | 137 | Sun May 17 17:30:23 UTC 2009 | san francisco | schuyler | just landed at San Francisco |
47 | 2.0 | 138 | Sun May 17 17:30:56 UTC 2009 | san francisco | MattBragoni | San Francisco today. Any suggestions? |
48 | 0.0 | 139 | Sun May 17 17:32:00 UTC 2009 | aig | KennyTRoland | ?Obama Administration Must Stop Bonuses to AIG... |
49 | 0.0 | 140 | Sun May 17 17:32:30 UTC 2009 | aig | aMild | started to think that Citi is in really deep s... |
# Count of data
print(f"Overall data count: {raw_training_data.count()}")
# Data summary
display(raw_training_data.summary().toPandas())
print("Data schema")
raw_training_data.printSchema()
# Let's look at 50 rows of data
display(raw_training_data.limit(50).toPandas())
Overall data count: 1600000
summary | polarity | id | date_time | query | user | text | |
---|---|---|---|---|---|---|---|
0 | count | 1600000 | 1600000 | 1600000 | 1600000 | 1600000 | 1600000 |
1 | mean | 2.0 | 1.9988175522956276E9 | None | None | 4.325887521835714E9 | None |
2 | stddev | 2.0000006250002933 | 1.9357607362269536E8 | None | None | 5.1627332184548904E10 | None |
3 | min | 0.0 | 1467810369 | Fri Apr 17 20:30:31 PDT 2009 | NO_QUERY | 000catnap000 | exh... |
4 | 25% | 0.0 | 1956912288 | None | None | 32508.0 | None |
5 | 50% | 0.0 | 2002093413 | None | None | 130587.0 | None |
6 | 75% | 4.0 | 2177045846 | None | None | 1100101.0 | None |
7 | max | 4.0 | 2329205794 | Wed May 27 07:27:38 PDT 2009 | NO_QUERY | zzzzeus111 | �����ߧ�ǿ�����ж���� <<----I DID NOT KNOW... |
Data schema root |-- polarity: float (nullable = true) |-- id: long (nullable = true) |-- date_time: string (nullable = true) |-- query: string (nullable = true) |-- user: string (nullable = true) |-- text: string (nullable = true)
polarity | id | date_time | query | user | text | |
---|---|---|---|---|---|---|
0 | 0.0 | 1467810369 | Mon Apr 06 22:19:45 PDT 2009 | NO_QUERY | _TheSpecialOne_ | @switchfoot http://twitpic.com/2y1zl - Awww, t... |
1 | 0.0 | 1467810672 | Mon Apr 06 22:19:49 PDT 2009 | NO_QUERY | scotthamilton | is upset that he can't update his Facebook by ... |
2 | 0.0 | 1467810917 | Mon Apr 06 22:19:53 PDT 2009 | NO_QUERY | mattycus | @Kenichan I dived many times for the ball. Man... |
3 | 0.0 | 1467811184 | Mon Apr 06 22:19:57 PDT 2009 | NO_QUERY | ElleCTF | my whole body feels itchy and like its on fire |
4 | 0.0 | 1467811193 | Mon Apr 06 22:19:57 PDT 2009 | NO_QUERY | Karoli | @nationwideclass no, it's not behaving at all.... |
5 | 0.0 | 1467811372 | Mon Apr 06 22:20:00 PDT 2009 | NO_QUERY | joy_wolf | @Kwesidei not the whole crew |
6 | 0.0 | 1467811592 | Mon Apr 06 22:20:03 PDT 2009 | NO_QUERY | mybirch | Need a hug |
7 | 0.0 | 1467811594 | Mon Apr 06 22:20:03 PDT 2009 | NO_QUERY | coZZ | @LOLTrish hey long time no see! Yes.. Rains a... |
8 | 0.0 | 1467811795 | Mon Apr 06 22:20:05 PDT 2009 | NO_QUERY | 2Hood4Hollywood | @Tatiana_K nope they didn't have it |
9 | 0.0 | 1467812025 | Mon Apr 06 22:20:09 PDT 2009 | NO_QUERY | mimismo | @twittera que me muera ? |
10 | 0.0 | 1467812416 | Mon Apr 06 22:20:16 PDT 2009 | NO_QUERY | erinx3leannexo | spring break in plain city... it's snowing |
11 | 0.0 | 1467812579 | Mon Apr 06 22:20:17 PDT 2009 | NO_QUERY | pardonlauren | I just re-pierced my ears |
12 | 0.0 | 1467812723 | Mon Apr 06 22:20:19 PDT 2009 | NO_QUERY | TLeC | @caregiving I couldn't bear to watch it. And ... |
13 | 0.0 | 1467812771 | Mon Apr 06 22:20:19 PDT 2009 | NO_QUERY | robrobbierobert | @octolinz16 It it counts, idk why I did either... |
14 | 0.0 | 1467812784 | Mon Apr 06 22:20:20 PDT 2009 | NO_QUERY | bayofwolves | @smarrison i would've been the first, but i di... |
15 | 0.0 | 1467812799 | Mon Apr 06 22:20:20 PDT 2009 | NO_QUERY | HairByJess | @iamjazzyfizzle I wish I got to watch it with ... |
16 | 0.0 | 1467812964 | Mon Apr 06 22:20:22 PDT 2009 | NO_QUERY | lovesongwriter | Hollis' death scene will hurt me severely to w... |
17 | 0.0 | 1467813137 | Mon Apr 06 22:20:25 PDT 2009 | NO_QUERY | armotley | about to file taxes |
18 | 0.0 | 1467813579 | Mon Apr 06 22:20:31 PDT 2009 | NO_QUERY | starkissed | @LettyA ahh ive always wanted to see rent lov... |
19 | 0.0 | 1467813782 | Mon Apr 06 22:20:34 PDT 2009 | NO_QUERY | gi_gi_bee | @FakerPattyPattz Oh dear. Were you drinking ou... |
20 | 0.0 | 1467813985 | Mon Apr 06 22:20:37 PDT 2009 | NO_QUERY | quanvu | @alydesigns i was out most of the day so didn'... |
21 | 0.0 | 1467813992 | Mon Apr 06 22:20:38 PDT 2009 | NO_QUERY | swinspeedx | one of my friend called me, and asked to meet ... |
22 | 0.0 | 1467814119 | Mon Apr 06 22:20:40 PDT 2009 | NO_QUERY | cooliodoc | @angry_barista I baked you a cake but I ated it |
23 | 0.0 | 1467814180 | Mon Apr 06 22:20:40 PDT 2009 | NO_QUERY | viJILLante | this week is not going as i had hoped |
24 | 0.0 | 1467814192 | Mon Apr 06 22:20:41 PDT 2009 | NO_QUERY | Ljelli3166 | blagh class at 8 tomorrow |
25 | 0.0 | 1467814438 | Mon Apr 06 22:20:44 PDT 2009 | NO_QUERY | ChicagoCubbie | I hate when I have to call and wake people up |
26 | 0.0 | 1467814783 | Mon Apr 06 22:20:50 PDT 2009 | NO_QUERY | KatieAngell | Just going to cry myself to sleep after watchi... |
27 | 0.0 | 1467814883 | Mon Apr 06 22:20:52 PDT 2009 | NO_QUERY | gagoo | im sad now Miss.Lilly |
28 | 0.0 | 1467815199 | Mon Apr 06 22:20:56 PDT 2009 | NO_QUERY | abel209 | ooooh.... LOL that leslie.... and ok I won't ... |
29 | 0.0 | 1467815753 | Mon Apr 06 22:21:04 PDT 2009 | NO_QUERY | BaptisteTheFool | Meh... Almost Lover is the exception... this t... |
30 | 0.0 | 1467815923 | Mon Apr 06 22:21:07 PDT 2009 | NO_QUERY | fatkat309 | some1 hacked my account on aim now i have to ... |
31 | 0.0 | 1467815924 | Mon Apr 06 22:21:07 PDT 2009 | NO_QUERY | EmCDL | @alielayus I want to go to promote GEAR AND GR... |
32 | 0.0 | 1467815988 | Mon Apr 06 22:21:09 PDT 2009 | NO_QUERY | merisssa | thought sleeping in was an option tomorrow but... |
33 | 0.0 | 1467816149 | Mon Apr 06 22:21:11 PDT 2009 | NO_QUERY | Pbearfox | @julieebaby awe i love you too!!!! 1 am here ... |
34 | 0.0 | 1467816665 | Mon Apr 06 22:21:21 PDT 2009 | NO_QUERY | jsoo | @HumpNinja I cry my asian eyes to sleep at night |
35 | 0.0 | 1467816749 | Mon Apr 06 22:21:20 PDT 2009 | NO_QUERY | scarletletterm | ok I'm sick and spent an hour sitting in the s... |
36 | 0.0 | 1467817225 | Mon Apr 06 22:21:27 PDT 2009 | NO_QUERY | crosland_12 | @cocomix04 ill tell ya the story later not a ... |
37 | 0.0 | 1467817374 | Mon Apr 06 22:21:30 PDT 2009 | NO_QUERY | ajaxpro | @MissXu sorry! bed time came here (GMT+1) ht... |
38 | 0.0 | 1467817502 | Mon Apr 06 22:21:32 PDT 2009 | NO_QUERY | Tmttq86 | @fleurylis I don't either. Its depressing. I d... |
39 | 0.0 | 1467818007 | Mon Apr 06 22:21:39 PDT 2009 | NO_QUERY | Anthony_Nguyen | Bed. Class 8-12. Work 12-3. Gym 3-5 or 6. Then... |
40 | 0.0 | 1467818020 | Mon Apr 06 22:21:39 PDT 2009 | NO_QUERY | itsanimesh | really don't feel like getting up today... but... |
41 | 0.0 | 1467818481 | Mon Apr 06 22:21:46 PDT 2009 | NO_QUERY | lionslamb | He's the reason for the teardrops on my guitar... |
42 | 0.0 | 1467818603 | Mon Apr 06 22:21:49 PDT 2009 | NO_QUERY | kennypham | Sad, sad, sad. I don't know why but I hate thi... |
43 | 0.0 | 1467818900 | Mon Apr 06 22:21:53 PDT 2009 | NO_QUERY | DdubsShellBell | @JonathanRKnight Awww I soo wish I was there t... |
44 | 0.0 | 1467819022 | Mon Apr 06 22:21:56 PDT 2009 | NO_QUERY | hpfangirl94 | Falling asleep. Just heard about that Tracy gi... |
45 | 0.0 | 1467819650 | Mon Apr 06 22:22:05 PDT 2009 | NO_QUERY | antzpantz | @Viennah Yay! I'm happy for you with your job!... |
46 | 0.0 | 1467819712 | Mon Apr 06 22:22:06 PDT 2009 | NO_QUERY | labrt2004 | Just checked my user timeline on my blackberry... |
47 | 0.0 | 1467819812 | Mon Apr 06 22:22:07 PDT 2009 | NO_QUERY | IrisJumbe | Oh man...was ironing @jeancjumbe's fave top to... |
48 | 0.0 | 1467820206 | Mon Apr 06 22:22:13 PDT 2009 | NO_QUERY | peacoats | is strangely sad about LiLo and SamRo breaking... |
49 | 0.0 | 1467820835 | Mon Apr 06 22:22:25 PDT 2009 | NO_QUERY | cyantist | @tea oh! i'm so sorry i didn't think about th... |
Initial Findings
users_mentioned
)hashtags
)<3
and s&^t
)�����ߧ�ǿ�����ж���� <<----I DID NOT KNOW I CUD or HOW TO DO ALL DAT ON MY PHONE TIL NOW. WOW..MY LIFE IS NOW COMPLETE. JK.
)df = raw_training_data.select("polarity").na.drop()
print(f"No of rows with Polarity: {df.count()} / {raw_training_data.count()}")
display(df.groupBy("polarity").count().toPandas())
sns.displot(df.toPandas());
No of rows with Polarity: 1600000 / 1600000
polarity | count | |
---|---|---|
0 | 4.0 | 800000 |
1 | 0.0 | 800000 |
Store our raw data
raw_training_data.repartition(20).write.partitionBy("polarity").csv(OUTPUT_PATH, mode="overwrite")
import html
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
pd.options.display.max_columns = None
pd.options.display.max_rows = 250
pd.options.display.max_colwidth = 150
schema = "polarity FLOAT, id LONG, date_time TIMESTAMP, query STRING, user STRING, text STRING"
timestampformat = "EEE MMM dd HH:mm:ss zzz yyyy"
IN_PATH = "./data-sets/sentiment-140-training-data/RAW"
OUT_PATH = "./data-sets/sentiment-140-training-data/CLEAN"
spark_reader = spark.read.schema(schema)
url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"
user_regex = r"(@\w{1,15})"
hashtag_regex = "(#\w{1,})"
hashtag_replace_regex = "#(\w{1,})"
@f.udf
def html_unescape(s: str):
if isinstance(s, str):
return html.unescape(s)
return s
def clean_data(df):
df = (
df
.withColumn("original_text", f.col("text"))
.withColumn("text", f.regexp_replace(f.col("text"), url_regex, ""))
.withColumn("text", f.regexp_replace(f.col("text"), email_regex, ""))
.withColumn("text", f.regexp_replace(f.col("text"), user_regex, ""))
.withColumn("text", f.regexp_replace(f.col("text"), "#", " "))
.withColumn("text", html_unescape(f.col("text")))
.filter("text != ''")
)
return df
df_raw = spark_reader.csv(IN_PATH, timestampFormat=timestampformat)
df_clean = clean_data(df_raw)
df_clean.write.partitionBy("polarity").parquet(OUT_PATH, mode="overwrite")