#!/usr/bin/env python # coding: utf-8 # # RePlay recommender models comparison # # ### Dataset # We will compare RePlay models on __MovieLens 1m__. # # ### Dataset preprocessing: # Ratings greater than or equal to 3 are considered as positive interactions. # # ### Data split # Dataset is split by date so that 20% of the last interactions as are placed in the test part. Cold items and users are dropped. # # ### Predict: # We will predict top-10 most relevant films for each user. # # ### Metrics # Quality metrics used:__ndcg@k, hitrate@k, map@k, mrr@k__ for k = 1, 5, 10 # Additional metrics used: __coverage@k__ and __surprisal@k__. # In[1]: get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') # In[2]: get_ipython().run_line_magic('config', 'Completer.use_jedi = False') # In[3]: import warnings from optuna.exceptions import ExperimentalWarning warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=ExperimentalWarning) # In[4]: import logging import pandas as pd import time from pyspark.sql import functions as sf, types as st from pyspark.sql.types import IntegerType from replay.data_preparator import DataPreparator from replay.experiment import Experiment from replay.metrics import Coverage, HitRate, MRR, MAP, NDCG, Surprisal from replay.models import ( ALSWrap, ADMMSLIM, KNN, LightFMWrap, MultVAE, NeuroMF, SLIM, PopRec, RandomRec, Wilson, Word2VecRec ) from replay.models.base_rec import HybridRecommender from replay.session_handler import State from replay.splitters import DateSplitter from replay.utils import get_log_info from rs_datasets import MovieLens # `State` object allows passing existing Spark session or create a new one, which will be used by the all RePlay modules. # # To create session with custom parameters ``spark.driver.memory`` and ``spark.sql.shuffle.partitions`` use function `get_spark_session` from `session_handler` module. # In[5]: spark = State().session spark # In[6]: logger = logging.getLogger("replay") # In[7]: K = 10 K_list_metrics = [1, 5, 10] BUDGET = 20 SEED = 12345 # ## 0. Preprocessing # ### 0.1 Data loading # In[8]: data = MovieLens("1m") data.info() # #### log preprocessing # In[10]: preparator = DataPreparator() log, _, _ = preparator(data.ratings, mapping={"relevance": "rating"}) print(get_log_info(log)) # In[11]: # will consider ratings >= 3 as positive feedback. A positive feedback is treated with relevance = 1 only_positives_log = log.filter(sf.col('relevance') >= 3).withColumn('relevance', sf.lit(1)) only_positives_log.count() # In[12]: user_features=None item_features=None # ### 0.2. Data split # In[13]: # train/test split train_spl = DateSplitter( test_start=0.2, drop_cold_items=True, drop_cold_users=True, ) train, test = train_spl.split(only_positives_log) print('train info:\n', get_log_info(train)) print('test info:\n', get_log_info(test)) # In[14]: # train/test split for hyperparameters selection opt_train, opt_val = train_spl.split(train) opt_train.count(), opt_val.count() # In[15]: # negative feedback will be used for Wilson models only_negatives_log = log.filter(sf.col('relevance') < 3).withColumn('relevance', sf.lit(0.)) test_start = test.agg(sf.min('timestamp')).collect()[0][0] # train with both positive and negative feedback pos_neg_train=(train .withColumn('relevance', sf.lit(1)) .union(only_negatives_log.filter(sf.col('timestamp') < test_start)) ) pos_neg_train.count() # In[16]: train.show(2) # # 1. Metrics definition # In[17]: # experiment is used for metrics calculation e = Experiment(test, {MAP(): K, NDCG(): K, HitRate(): K_list_metrics, Coverage(train): K, Surprisal(train): K, MRR(): K}) # # 2. Models training # In[18]: def fit_predict_add_res(name, model, experiment, train, suffix=''): """ Run fit_predict for the `model`, measure time on fit_predict and evaluate metrics """ start_time=time.time() fit_predict_params = {'log': train, 'k': K, 'users': test.select('user_idx').distinct()} if isinstance(model, Wilson): fit_predict_params['log'] = pos_neg_train if isinstance(model, HybridRecommender): fit_predict_params['item_features'] = item_features fit_predict_params['user_features'] = user_features pred=model.fit_predict(**fit_predict_params) pred.count() fit_predict_time = time.time() - start_time experiment.add_result(name + suffix, pred) experiment.results.loc[name + suffix, 'fit_pred_time'] = fit_predict_time print(experiment.results[['NDCG@{}'.format(K), 'MRR@{}'.format(K), 'Coverage@{}'.format(K), 'fit_pred_time']].sort_values('NDCG@{}'.format(K), ascending=False)) # In[19]: def full_pipeline(models, experiment, train, suffix='', budget=BUDGET): """ For each model: - if required: run hyperparameters search, set best params and save param values to `experiment` - pass model to `fit_predict_add_res` """ for name, [model, params] in models.items(): model.logger.info(msg='{} started'.format(name)) if params != 'no_opt': model.logger.info(msg='{} optimization started'.format(name)) best_params = model.optimize(opt_train, opt_val, param_borders=params, item_features=item_features, user_features=user_features, k=K, budget=budget) model.set_params(**best_params) logger.info(msg='best params for {} are: {}'.format(name, best_params)) experiment.results.loc[name + suffix, 'params'] = best_params.__repr__() logger.info(msg='{} fit_predict started'.format(name)) fit_predict_add_res(name, model, experiment, train, suffix) # ## 2.1. Non-personalized models # In[20]: non_personalized_models = {'Popular Recommender': [PopRec(), 'no_opt'], 'Random Recommender (uniform)': [RandomRec(seed=SEED, distribution='uniform'), 'no_opt'], 'Random Recommender (popularity-based)': [RandomRec(seed=SEED, distribution='popular_based'), {"alpha": [-0.5, 100]}], 'Wilson Recommender': [Wilson(), 'no_opt']} # In[21]: get_ipython().run_cell_magic('time', '', 'full_pipeline(non_personalized_models, e, train)\n') # In[22]: e.results.sort_values('NDCG@10', ascending=False) # In[23]: e.results.to_csv('res_21_rel_1.csv') # ## 2.2 Personalized models without features # In[24]: common_models = { 'ADMM SLIM': [ADMMSLIM(seed=SEED), {"lambda_1": [1e-6, 10], "lambda_2": [1e-6, 1000]},], 'Implicit ALS': [ALSWrap(seed=SEED), None], 'Explicit ALS': [ALSWrap(seed=SEED, implicit_prefs=False), None], 'KNN': [KNN(), None], 'LightFM': [LightFMWrap(random_state=SEED), {"no_components": [8, 512]}], 'SLIM': [SLIM(seed=SEED), None]} # In[25]: get_ipython().run_cell_magic('time', '', 'full_pipeline(common_models, e, train)\n') # In[26]: e.results.sort_values('NDCG@10', ascending=False) # In[27]: e.results.to_csv('res_22_rel_1.csv') # ## 2.3 Neural models # In[28]: nets = {'MultVAE with default parameters': [MultVAE(), 'no_opt'], 'NeuroMF with default parameters': [NeuroMF(), 'no_opt'], 'Word2Vec with default parameters': [Word2VecRec(seed=SEED), 'no_opt'], 'MultVAE with optimized parameters': [MultVAE(), {"learning_rate": [0.001, 0.5], "dropout": [0, 0.5], "l2_reg": [1e-6, 5] }], 'NeuroMF with optimized parameters': [NeuroMF(), { "learning_rate": [0.001, 0.5], "l2_reg": [1e-6, 5], "count_negative_sample": [1, 20] }], 'Word2Vec with optimized parameters': [Word2VecRec(seed=SEED), None]} # In[29]: get_ipython().run_cell_magic('time', '', 'full_pipeline(nets, e, train, budget=10)\n') # In[30]: e.results.sort_values('NDCG@10', ascending=False) # In[31]: e.results.to_csv('res_23_rel_1.csv') # ## 2.4 Models considering features # ### 2.4.1 item features preprocessing # In[32]: get_ipython().run_cell_magic('time', '', 'preparator = DataPreparator()\nlog, _, item_features = preparator(data.ratings, item_features=data.items, mapping={"relevance": "rating"})\n') # In[33]: item_features.show(2) # In[34]: year = item_features.withColumn('year', sf.substring(sf.col('title'), -5, 4).astype(st.IntegerType())).select('item_idx', 'year') year.show(2) # In[35]: genres = ( spark.createDataFrame(data.items[["item_id", "genres"]].rename({'item_id': 'item_idx'}, axis=1)) .select( "item_idx", sf.split("genres", "\|").alias("genres") ) ) # In[36]: genres_list = ( genres.select(sf.explode("genres").alias("genre")) .distinct().filter('genre <> "(no genres listed)"') .toPandas()["genre"].tolist() ) # In[37]: genres_list # In[38]: item_features = genres for genre in genres_list: item_features = item_features.withColumn( genre, sf.array_contains(sf.col("genres"), genre).astype(IntegerType()) ) item_features = item_features.drop("genres").cache() item_features.count() # In[39]: item_features = item_features.join(year, on='item_idx', how='inner') item_features.count() # In[40]: item_features.cache() # In[41]: item_features.show(3) # ### 2.4.2 Models training # In[42]: models_with_features = {'LightFM with item features': [LightFMWrap(random_state=SEED), {"no_components": [8, 512]}]} # In[43]: get_ipython().run_cell_magic('time', '', 'full_pipeline(models_with_features, e, train)\n') # In[44]: e.results.sort_values('NDCG@10', ascending=False) # In[45]: e.results.to_csv('res_25_rel_1.csv') # In[48]: df = e.results.drop([ 'NeuroMF with optimized parameters', 'MultVAE with default parameters', 'Word2Vec with optimized parameters' ]).rename( index={ 'Popular Recommender': 'PopRec', 'Random Recommender (uniform)': 'RandomRec (uniform)', 'Random Recommender (popularity-based)': 'RandomRec (popular)', 'Wilson Recommender': 'Wilson', 'Implicit ALS': 'ALS (Implicit)', 'Explicit ALS': 'ALS (Explicit)', 'NeuroMF with default parameters': 'NeuroMF', 'MultVAE with optimized parameters': 'MultVAE', 'Word2Vec with default parameters': 'Word2Vec', 'LightFM with item features': 'LightFM (w/ feats)' }).sort_values('NDCG@10', ascending=False) df # In[49]: df.index.name = 'Model' # In[50]: df = df.round(3)[['HitRate@10', 'MAP@10', 'MRR@10', 'NDCG@10', 'Coverage@10', 'Surprisal@10', 'fit_pred_time']] df = df.rename(columns={'HitRate@10': 'HitRate', 'MAP@10': 'MAP', 'MRR@10': 'MRR', 'NDCG@10': 'NDCG', 'Coverage@10': 'Coverage', 'Surprisal@10': 'Surprisal'}) df.to_csv('res_1m.csv') # # 3. Results # The best results by quality and time were shown by the commonly-used models such as ALS, SLIM and LightFM. # In[51]: df.head()