This notebook is designed to familiarize with the use of RePlay library, including
%load_ext autoreload
%autoreload 2
%config Completer.use_jedi = False
import warnings
from optuna.exceptions import ExperimentalWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ExperimentalWarning)
State
object allows passing existing Spark session or create a new one, which will be used by the all RePlay modules.
To create session with custom parameters spark.driver.memory
and spark.sql.shuffle.partitions
use function get_spark_session
from session_handler
module.
from replay.session_handler import State
spark = State().session
spark
SparkSession - hive
K = 5
SEED=1234
We will use MovieLens 1m as an example.
import pandas as pd
df = pd.read_csv("data/ml1m_ratings.dat", sep="\t", names=["user_id", "item_id", "relevance", "timestamp"])
users = pd.read_csv("data/ml1m_users.dat", sep="\t", names=["user_id", "gender", "age", "occupation", "zip_code"])
An inner data format in RePlay is a spark dataframe.
You can pass spark or pandas dataframe as an input. Columns item_id
and user_id
are required for interaction matrix.
Optional columns for interaction matrix are relevance
and interaction timestamp
.
We implemented DataPreparator class to convert dataframes to spark format and preprocess the data, including renaming/creation of required and optional interaction matrix columns, null check and dates parsing.
To convert pandas dataframe to spark as is use function convert_to_spark
from replay.utils
.
from replay.data_preparator import DataPreparator
log = DataPreparator().transform(
data=df,
columns_names={
"user_id": "user_id",
"item_id": "item_id",
"relevance": "relevance",
"timestamp": "timestamp"
}
)
log.show(3)
+-------+-------+---------+-------------------+ |user_id|item_id|relevance| timestamp| +-------+-------+---------+-------------------+ | 1| 1193| 5.0|2000-12-31 22:12:40| | 1| 661| 3.0|2000-12-31 22:35:09| | 1| 914| 3.0|2000-12-31 22:32:48| +-------+-------+---------+-------------------+ only showing top 3 rows
from replay.utils import convert2spark
users = convert2spark(users)
users.show(3)
+-------+------+---+----------+--------+ |user_id|gender|age|occupation|zip_code| +-------+------+---+----------+--------+ | 1| F| 1| 10| 48067| | 2| M| 56| 16| 70072| | 3| M| 25| 15| 55117| +-------+------+---+----------+--------+ only showing top 3 rows
RePlay provides you with data splitters to reproduce a validation schemas widely-used in recommender systems.
UserSplitter
takes item_test_size
items for user_test_size
user to the test dataset.
from replay.splitters import UserSplitter
splitter = UserSplitter(
drop_cold_items=True,
drop_cold_users=True,
item_test_size=K,
user_test_size=500,
seed=SEED,
shuffle=True
)
train, test = splitter.split(log)
(
train.count(),
test.count()
)
(997709, 2499)
from replay.models import SLIM
slim = SLIM(seed=SEED)
%%time
slim.fit(log=train)
CPU times: user 2.5 s, sys: 239 ms, total: 2.74 s Wall time: 10.2 s
%%time
recs = slim.predict(
k=K,
users=test.select('user_id').distinct(),
log=train,
filter_seen_items=True
)
CPU times: user 955 ms, sys: 308 ms, total: 1.26 s Wall time: 11.4 s
recs.show(2)
+-------+-------+-------------------+ |user_id|item_id| relevance| +-------+-------+-------------------+ | 5207| 1196| 0.3033848470126411| | 5207| 2797|0.24751908929278302| +-------+-------+-------------------+ only showing top 2 rows
RePlay implements some popular recommenders' quality metrics. Use pure metrics or calculate a set of chosen metrics and compare models with the Experiment
class.
from replay.metrics import Coverage, HitRate, NDCG, MAP
from replay.experiment import Experiment
metrics = Experiment(test, {NDCG(): K,
MAP() : K,
HitRate(): [1, K],
Coverage(train): K
})
%%time
metrics.add_result("SLIM", recs)
metrics.results
CPU times: user 182 ms, sys: 59.4 ms, total: 241 ms Wall time: 27.3 s
Coverage@5 | HitRate@1 | HitRate@5 | MAP@5 | NDCG@5 | |
---|---|---|---|---|---|
SLIM | 0.154926 | 0.236 | 0.544 | 0.094453 | 0.163997 |
# data split for hyperparameters optimization
train_opt, val_opt = splitter.split(train)
%%time
best_params = slim.optimize(train_opt, val_opt, criterion=NDCG(), k=K, budget=15)
[I 2021-11-11 10:21:26,213] A new study created in memory with name: no-name-0d9bf027-99b0-46b4-9e68-e4671635cc68 [I 2021-11-11 10:22:00,291] Trial 0 finished with value: 0.17884682905765073 and parameters: {'beta': 0.01, 'lambda_': 0.01}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:22:30,488] Trial 1 finished with value: 0.17838596144637037 and parameters: {'beta': 1.5843648655110386e-05, 'lambda_': 0.0019381432430530966}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:22:58,659] Trial 2 finished with value: 0.17845294084271404 and parameters: {'beta': 0.8747161053651644, 'lambda_': 3.2945196499038316e-06}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:23:42,449] Trial 3 finished with value: 0.17574650496622896 and parameters: {'beta': 1.2583517304821009e-05, 'lambda_': 1.2436091696595869e-05}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:24:07,106] Trial 4 finished with value: 0.1661499674592799 and parameters: {'beta': 0.001965332453889027, 'lambda_': 0.07145609209641049}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:24:35,365] Trial 5 finished with value: 0.17795713017853027 and parameters: {'beta': 0.297138767853366, 'lambda_': 9.797469614943864e-06}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:25:06,846] Trial 6 finished with value: 0.17761079325524287 and parameters: {'beta': 0.044112098666188634, 'lambda_': 6.081467560439423e-05}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:25:32,491] Trial 7 finished with value: 0.17829944074221615 and parameters: {'beta': 0.7678341066890578, 'lambda_': 0.0004048316455289347}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:26:03,155] Trial 8 finished with value: 0.17571677842156821 and parameters: {'beta': 9.439123620168996e-05, 'lambda_': 5.892088158297536e-05}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:26:29,203] Trial 9 finished with value: 0.1700988994153516 and parameters: {'beta': 4.476408332574133, 'lambda_': 0.00036918403058237295}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:26:48,072] Trial 10 finished with value: 0.08233912567963639 and parameters: {'beta': 0.005353176708467078, 'lambda_': 1.8907037300281162}. Best is trial 0 with value: 0.17884682905765073. [I 2021-11-11 10:27:18,756] Trial 11 finished with value: 0.17912933694300678 and parameters: {'beta': 0.005666008275776714, 'lambda_': 1.3248762114391188e-06}. Best is trial 11 with value: 0.17912933694300678. [I 2021-11-11 10:27:42,662] Trial 12 finished with value: 0.17990693905171137 and parameters: {'beta': 0.0009102123564610077, 'lambda_': 0.012004677143063356}. Best is trial 12 with value: 0.17990693905171137. [I 2021-11-11 10:28:07,025] Trial 13 finished with value: 0.1708568065450685 and parameters: {'beta': 0.00035905214868419273, 'lambda_': 0.04686519066227496}. Best is trial 12 with value: 0.17990693905171137. [I 2021-11-11 10:28:29,544] Trial 14 finished with value: 0.11764729285065387 and parameters: {'beta': 1.0330972425382408e-06, 'lambda_': 0.6909563043593303}. Best is trial 12 with value: 0.17990693905171137.
CPU times: user 53 s, sys: 7.62 s, total: 1min Wall time: 7min 3s
best_params
{'beta': 0.0009102123564610077, 'lambda_': 0.012004677143063356}
def fit_predict_evaluate(model, experiment, name):
model.fit(log=train)
recs = model.predict(
k=K,
users=test.select('user_id').distinct(),
log=train,
filter_seen_items=True
)
experiment.add_result(name, recs)
return recs
%%time
recs = fit_predict_evaluate(SLIM(**best_params, seed=SEED), metrics, 'SLIM_optimized')
metrics.results.sort_values('NDCG@5', ascending=False)
CPU times: user 3.68 s, sys: 565 ms, total: 4.24 s Wall time: 44.8 s
Coverage@5 | HitRate@1 | HitRate@5 | MAP@5 | NDCG@5 | |
---|---|---|---|---|---|
SLIM_optimized | 0.156275 | 0.240 | 0.548 | 0.095153 | 0.165387 |
SLIM | 0.154926 | 0.236 | 0.544 | 0.094453 | 0.163997 |
recs_pd = recs.toPandas()
recs_pd.head(2)
user_id | item_id | relevance | |
---|---|---|---|
0 | 5207 | 1196 | 0.304172 |
1 | 5207 | 2797 | 0.248480 |
RePlay allows to save and load fitted models with save
and load
functions of model_handler
module. Model is saved as a folder with all necessary parameters and data.
from replay.model_handler import save, load
save(slim, path='./slim_best_params')
slim_loaded = load('./slim_best_params')
%%time
pred_from_loaded = slim_loaded.predict(k=K,
users=test.select('user_id').distinct(),
log=train,
filter_seen_items=True)
pred_from_loaded.show(2)
+-------+-------+-------------------+ |user_id|item_id| relevance| +-------+-------+-------------------+ | 5207| 1196| 0.3135545764212523| | 5207| 2918|0.24504774750833722| +-------+-------+-------------------+ only showing top 2 rows CPU times: user 1.04 s, sys: 269 ms, total: 1.31 s Wall time: 14.2 s
slim_loaded.beta, slim_loaded.lambda_
(1.0330972425382408e-06, 0.6909563043593303)
Commonly-used matrix factorization algorithm.
from replay.models import ALSWrap
%%time
recs = fit_predict_evaluate(ALSWrap(rank=100, seed=SEED), metrics, 'ALS')
metrics.results.sort_values('NDCG@5', ascending=False)
CPU times: user 1.6 s, sys: 449 ms, total: 2.05 s Wall time: 1min 5s
Coverage@5 | HitRate@1 | HitRate@5 | MAP@5 | NDCG@5 | |
---|---|---|---|---|---|
SLIM_optimized | 0.156275 | 0.240 | 0.548 | 0.095153 | 0.165387 |
SLIM | 0.154926 | 0.236 | 0.544 | 0.094453 | 0.163997 |
ALS | 0.201619 | 0.222 | 0.546 | 0.092473 | 0.161675 |
Commonly-used item-based recommender
from replay.models import KNN
%%time
recs = fit_predict_evaluate(KNN(num_neighbours=100), metrics, 'KNN')
metrics.results.sort_values('NDCG@5', ascending=False)
CPU times: user 1.61 s, sys: 468 ms, total: 2.08 s Wall time: 1min 3s
Coverage@5 | HitRate@1 | HitRate@5 | MAP@5 | NDCG@5 | |
---|---|---|---|---|---|
SLIM_optimized | 0.156275 | 0.240 | 0.548 | 0.095153 | 0.165387 |
SLIM | 0.154926 | 0.236 | 0.544 | 0.094453 | 0.163997 |
ALS | 0.201619 | 0.222 | 0.546 | 0.092473 | 0.161675 |
KNN | 0.053441 | 0.158 | 0.394 | 0.056690 | 0.104892 |
To easily evaluate recommendations obtained from other sources, read and pass these recommendations to Experiment
user_id - item_id - relevance
¶from pyspark.sql.functions import rand
recs.withColumn('relevance', rand(seed=123)).toPandas().to_csv("recs.csv", index=False)
recs = DataPreparator().transform(
path="recs.csv",
columns_names={
"user_id": "user_id",
"item_id": "item_id",
"relevance": "relevance"
},
reader_kwargs={"header":True},
format_type="csv"
)
metrics.add_result("my_model", recs)
metrics.results.sort_values("NDCG@5", ascending=False)
Coverage@5 | HitRate@1 | HitRate@5 | MAP@5 | NDCG@5 | |
---|---|---|---|---|---|
SLIM_optimized | 0.156275 | 0.240 | 0.548 | 0.095153 | 0.165387 |
SLIM | 0.154926 | 0.236 | 0.544 | 0.094453 | 0.163997 |
ALS | 0.201619 | 0.222 | 0.546 | 0.092473 | 0.161675 |
KNN | 0.053441 | 0.158 | 0.394 | 0.056690 | 0.104892 |
my_model | 0.053441 | 0.116 | 0.394 | 0.049950 | 0.096634 |