#!/usr/bin/env python # coding: utf-8 # In[1]: import ipyparallel as ipp rc = ipp.Client() everyone = rc[:] everyone # Step 1: Initialize the engine state. # In this case, that's importing pandas and # loading the sessions.csv into a data frame. # In[4]: get_ipython().run_cell_magic('px', '', "import pandas as pd\n\n# users = pd.read_csv('users.csv')\n# Each row is the details of one user action. \n# There is several rows with the same user ID\nsessions = pd.read_csv('sessions.csv')\n") # Define our task function, to be called on the engines. # This will resolve `sessions` in the engine namespaces, # avoiding passing the data frame as an argument on every call. # In[15]: def process(user): # Locate all the user sessions in the *global* sessions dataframe user_session = sessions.loc[sessions['user_id'] == user] user_session_data = pd.Series() # Make calculations and append to user_session_data return user_session_data # Locally, get the unique session IDs: # In[16]: import pandas as pd sessions = pd.read_csv('sessions.csv') user_ids = sessions['user_id'].unique() # Next, make a load-balanced view and map tasks on unique user IDs # In[17]: view = rc.load_balanced_view() # In[18]: ar = view.map(process, user_ids) for user_id, result in zip(user_ids, ar): print(user_id, result)