#!/usr/bin/env python # coding: utf-8 # In[1]: get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') # In[2]: from replay.session_handler import State spark = State().session spark # In[3]: import pandas as pd df = pd.read_csv("data/ml1m_ratings.dat", sep="\t", names=["user_id", "item_id", "relevance", "timestamp"]) items = pd.read_csv("data/ml1m_items.dat", sep="\t", names=["item_id", "titile", "genres"]) df.head() # In[5]: from replay.data_preparator import DataPreparator log = DataPreparator().transform( data=df, columns_names={ "user_id": "user_id", "item_id": "item_id", "relevance": "relevance", "timestamp": "timestamp" } ) # In[6]: import pandas as pd from sklearn.preprocessing import MultiLabelBinarizer,LabelBinarizer mlb = MultiLabelBinarizer() lb = LabelBinarizer() item_features = pd.DataFrame(mlb.fit_transform(items.genres.apply(lambda x: x.split("|"))), columns=list(map(lambda x: f"genre_{x}",mlb.classes_)), index=items.item_id).reset_index() # In[7]: item_features_spark = DataPreparator().transform( data=item_features, columns_names={ "item_id": "item_id" } ).drop("timestamp") # In[8]: from replay.splitters import UserSplitter second_stage_splitter = UserSplitter( drop_cold_items=True, drop_cold_users=True, item_test_size=10, seed=1234, shuffle=True ) first_stage_splitter = UserSplitter( drop_cold_items=False, item_test_size=0.5, shuffle=True, seed=42 ) # In[9]: from replay.models import ALSWrap # при 98 все падает с Java heap space error first_model = ALSWrap(rank=40) # In[10]: from replay.models import ClassifierRec from pyspark.ml.classification import RandomForestClassifier second_model = ClassifierRec(RandomForestClassifier(seed=47), use_recs_value=True) # ## Двухуровневый сценарий со статистическими фичами # In[12]: from replay.scenarios import TwoStagesScenario from replay.metrics import NDCG, HitRate, Precision, Recall, RocAuc two_stages_with_stat = TwoStagesScenario( second_stage_splitter=second_stage_splitter, second_model=second_model, first_model=first_model, metrics={NDCG(): [1, 5, 10], HitRate(): [1, 5, 10]}, stat_features=True ) # In[13]: get_ipython().run_cell_magic('time', '', 'recs_with_stat = two_stages_with_stat.get_recs(log, 10, item_features=item_features_spark)\ntwo_stages_with_stat.experiment.results\n') # ## Двухуровневый сценарий без статистических фичей # In[16]: get_ipython().run_cell_magic('time', '', 'two_stages_without_stat = TwoStagesScenario(\n second_stage_splitter=second_stage_splitter,\n second_model=second_model,\n first_model=first_model,\n metrics={NDCG(): [1, 5, 10], HitRate(): [1, 5, 10]},\n stat_features=False\n)\nrecs_without_stat = two_stages_without_stat.get_recs(log, 10, item_features=item_features_spark)\ntwo_stages_without_stat.experiment.results\n') # In[17]: two_stages_with_stat.experiment.add_result("two_stages_without_stat", recs_without_stat) two_stages_with_stat.experiment.results # ## Модель первого уровня, обученная на всем train # In[18]: train, test = second_stage_splitter.split(log) first_train, first_test = first_stage_splitter.split(train) # In[19]: get_ipython().run_cell_magic('time', '', 'first_recs_all = first_model.fit_predict(\n log=train,\n k=10,\n users=test.select("user_id").distinct().cache(),\n items=train.select("item_id").distinct().cache(),\n)\n') # In[20]: two_stages_with_stat.experiment.add_result("first_stage_all", first_recs_all) two_stages_with_stat.experiment.results # ## Модель первого уровня, обученная на половине train (как в двухуровневом сценарии) # In[21]: get_ipython().run_cell_magic('time', '', 'first_model.fit(log=first_train)\nfirst_model_half = first_model.predict(\n log=train,\n k=10,\n users=test.select("user_id").distinct().cache(),\n items=train.select("item_id").distinct().cache(),\n)\n\ntwo_stages_with_stat.experiment.add_result("first_stage_half", first_model_half)\n') # In[22]: two_stages_with_stat.experiment.results # ## Двухуровневый сценарий с усиленным классификатором # In[23]: second_model = ClassifierRec(spark_classifier=RandomForestClassifier(numTrees=100, seed=47), use_recs_value=True) # ### Двухуровневый сценарий со статистическими фичами # In[25]: two_stages_with_stat_strong = TwoStagesScenario( second_stage_splitter=second_stage_splitter, second_model=second_model, first_model=first_model, metrics={NDCG(): [1, 5, 10], HitRate(): [1, 5, 10]}, stat_features=True ) # In[26]: get_ipython().run_cell_magic('time', '', 'recs_with_stat = two_stages_with_stat_strong.get_recs(log, 10, item_features=item_features_spark)\ntwo_stages_with_stat.experiment.add_result("two_stages_with_stat_strong", recs_with_stat)\n') # ### Двухуровневый сценарий без статистических фичей # In[27]: get_ipython().run_cell_magic('time', '', 'two_stages_without_stat_strong = TwoStagesScenario(\n second_stage_splitter=second_stage_splitter,\n second_model=second_model,\n first_model=first_model,\n metrics={NDCG(): [1, 5, 10], HitRate(): [1, 5, 10]},\n stat_features=False\n)\nrecs_without_stat = two_stages_without_stat_strong.get_recs(log, 10, item_features=item_features_spark)\ntwo_stages_with_stat.experiment.add_result("two_stages_without_stat_strong", recs_without_stat)\ntwo_stages_without_stat_strong.experiment.results\n') # In[28]: two_stages_with_stat.experiment.add_result("two_stages_without_stat_strong", recs_without_stat) # In[29]: two_stages_with_stat.experiment.results.sort_values('NDCG@10', ascending=False) # Модель первого уровня работает лучше, чем двухуровневый сценарий. Двухуровневый сценарий, использущий статистические признаки, работает лучше, чем без них. # In[ ]: