%load_ext autoreload
%autoreload 2
%config Completer.use_jedi = False
import warnings
from optuna.exceptions import ExperimentalWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ExperimentalWarning)
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import array_contains, col, explode, split, substring
from replay.data_preparator import DataPreparator
from replay.experiment import Experiment
from replay.metrics import HitRate, NDCG, MAP, Coverage
from replay.models import LightFMWrap
from replay.session_handler import State
from replay.splitters import UserSplitter
from rs_datasets import MovieLens
K=10
SEED=1234
We will use MovieLens 10m dataset from rs_datasets package, which contains a list of recommendations datasets.
data = MovieLens("10m")
data.info()
ratings
user_id | item_id | rating | timestamp | |
---|---|---|---|---|
0 | 1 | 122 | 5.0 | 838985046 |
1 | 1 | 185 | 5.0 | 838983525 |
2 | 1 | 231 | 5.0 | 838983392 |
items
item_id | title | genres | |
---|---|---|---|
0 | 1 | Toy Story (1995) | Adventure|Animation|Children|Comedy|Fantasy |
1 | 2 | Jumanji (1995) | Adventure|Children|Fantasy |
2 | 3 | Grumpier Old Men (1995) | Comedy|Romance |
tags
user_id | item_id | tag | timestamp | |
---|---|---|---|---|
0 | 15 | 4973 | excellent! | 1215184630 |
1 | 20 | 1747 | politics | 1188263867 |
2 | 20 | 1747 | satire | 1188263867 |
preparator = DataPreparator()
log, _, item_features = preparator(data.ratings, item_features=data.items, mapping={"relevance": "rating"})
WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/u19893556/miniconda3/envs/replay/lib/python3.7/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 22/02/27 22:14:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/02/27 22:14:17 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN). 22/02/27 22:14:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 22/02/27 22:14:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 22/02/27 22:14:23 WARN TaskSetManager: Stage 0 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB. 22/02/27 22:14:27 WARN TaskSetManager: Stage 2 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB. 22/02/27 22:14:30 WARN TaskSetManager: Stage 4 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB. 22/02/27 22:14:38 WARN TaskSetManager: Stage 6 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB.
user_random_splitter = UserSplitter(
item_test_size=K,
user_test_size=500,
drop_cold_items=True,
drop_cold_users=True,
shuffle=True,
seed=SEED
)
train, test = user_random_splitter.split(log)
train.count(), test.count()
22/02/27 22:14:47 WARN DAGScheduler: Broadcasting large task binary with size 2004.4 KiB 22/02/27 22:14:47 WARN TaskSetManager: Stage 10 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB. 22/02/27 22:14:49 WARN DAGScheduler: Broadcasting large task binary with size 2011.1 KiB 22/02/27 22:14:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/02/27 22:14:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/02/27 22:14:51 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 22/02/27 22:14:51 WARN TaskSetManager: Stage 15 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB. 22/02/27 22:14:51 WARN DAGScheduler: Broadcasting large task binary with size 2004.7 KiB 22/02/27 22:14:52 WARN TaskSetManager: Stage 13 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB. 22/02/27 22:14:55 WARN DAGScheduler: Broadcasting large task binary with size 2008.8 KiB 22/02/27 22:14:56 WARN DAGScheduler: Broadcasting large task binary with size 2013.9 KiB 22/02/27 22:14:56 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:01 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:01 WARN DAGScheduler: Broadcasting large task binary with size 2004.8 KiB 22/02/27 22:15:01 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 22/02/27 22:15:02 WARN TaskSetManager: Stage 19 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB. 22/02/27 22:15:03 WARN TaskSetManager: Stage 22 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB. 22/02/27 22:15:05 WARN DAGScheduler: Broadcasting large task binary with size 2009.1 KiB 22/02/27 22:15:08 WARN DAGScheduler: Broadcasting large task binary with size 2014.1 KiB 22/02/27 22:15:10 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 22/02/27 22:15:15 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
(9995054, 5000)
train_opt, val_opt = user_random_splitter.split(train)
train_opt.count(), val_opt.count()
22/02/27 22:15:18 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/02/27 22:15:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/02/27 22:15:20 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:21 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:21 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:26 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:26 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:27 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:34 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:39 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:41 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
(9990054, 5000)
year = item_features.withColumn('year', substring(col('title'), -5, 4).astype(IntegerType())).select('item_idx', 'year')
year.show(2)
+--------+----+ |item_idx|year| +--------+----+ | 11|1995| | 117|1995| +--------+----+ only showing top 2 rows
genres = (
State().session.createDataFrame(data.items[["item_id", "genres"]].rename({'item_id': 'item_idx'}, axis=1))
.select(
"item_idx",
split("genres", "\|").alias("genres")
)
)
genres.show()
+--------+--------------------+ |item_idx| genres| +--------+--------------------+ | 1|[Adventure, Anima...| | 2|[Adventure, Child...| | 3| [Comedy, Romance]| | 4|[Comedy, Drama, R...| | 5| [Comedy]| | 6|[Action, Crime, T...| | 7| [Comedy, Romance]| | 8|[Adventure, Child...| | 9| [Action]| | 10|[Action, Adventur...| | 11|[Comedy, Drama, R...| | 12| [Comedy, Horror]| | 13|[Animation, Child...| | 14| [Drama]| | 15|[Action, Adventur...| | 16| [Crime, Drama]| | 17|[Comedy, Drama, R...| | 18|[Comedy, Drama, T...| | 19| [Comedy]| | 20|[Action, Comedy, ...| +--------+--------------------+ only showing top 20 rows
genres_list = (
genres.select(explode("genres").alias("genre"))
.distinct().filter('genre <> "(no genres listed)"')
.toPandas()["genre"].tolist()
)
genres_list
['Documentary', 'IMAX', 'Adventure', 'Animation', 'Comedy', 'Thriller', 'Sci-Fi', 'Musical', 'Horror', 'Action', 'Fantasy', 'War', 'Mystery', 'Drama', 'Film-Noir', 'Crime', 'Western', 'Romance', 'Children']
item_features = genres
for genre in genres_list:
item_features = item_features.withColumn(
genre,
array_contains(col("genres"), genre).astype(IntegerType())
)
item_features = item_features.drop("genres").cache()
item_features.count()
10681
item_features = item_features.join(year, on='item_idx', how='inner')
item_features.cache()
item_features.count()
8316
model_feat = LightFMWrap(random_state=SEED, loss='warp', no_components=128)
%%time
model_feat.fit(train, item_features=item_features)
22/02/27 22:15:44 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:45 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:49 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:49 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:50 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:50 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:52 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:54 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:55 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:57 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:15:58 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:16:00 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:16:00 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:16:02 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:16:03 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:16:10 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
CPU times: user 13h 4min 1s, sys: 53.2 s, total: 13h 4min 54s Wall time: 17min 29s
%%time
recs = model_feat.predict(
k=K,
users=test.select('user_idx').distinct(),
log=train,
filter_seen_items=True,
item_features=item_features
)
27-Feb-22 22:33:13, replay, WARNING: This model can't predict cold users, they will be ignored 27-Feb-22 22:33:13, replay, WARNING: This model can't predict cold users, they will be ignored 22/02/27 22:33:14 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:14 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:14 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:17 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:19 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:20 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:20 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:20 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:25 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:26 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:26 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:27 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:28 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:29 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:29 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:30 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:33 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:35 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:35 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:35 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:36 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
CPU times: user 205 ms, sys: 247 ms, total: 452 ms Wall time: 23.1 s
metrics = Experiment(test, {NDCG(): K,
MAP() : K,
HitRate(): [1, K],
Coverage(train): K})
22/02/27 22:33:37 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:39 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
metrics.add_result("LightFM_item_features", recs)
metrics.results
22/02/27 22:33:40 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:40 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:40 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:40 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:42 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:43 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:47 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:55 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:57 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:57 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:58 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:33:59 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:59 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:59 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:33:59 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:34:01 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:01 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:01 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:03 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:06 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:07 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:08 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:08 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:34:08 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:34:08 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:34:09 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 22/02/27 22:34:10 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:10 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:11 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:13 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 22/02/27 22:34:19 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 22/02/27 22:34:20 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 22/02/27 22:34:21 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 22/02/27 22:34:21 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
Coverage@10 | HitRate@1 | HitRate@10 | MAP@10 | NDCG@10 | |
---|---|---|---|---|---|
LightFM_item_features | 0.066404 | 0.348 | 0.784 | 0.121321 | 0.229198 |