#!/usr/bin/env python # coding: utf-8 # # Options to Access Data from Spark Enterprise Cluster # # 1. Use data stored in local file system. If this gets slow due to too many users on the system, can try # 2. Pull data from IBM Object Storage. The network connection to IBM Object Storage is quite fast #
# ## 1. Using Local Files on Spark Enterprise SETI cluster # # In[123]: import os import ibmseti import pandas as pd ### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results mydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name if os.path.exists(mydatafolder) is False: os.makedirs(mydatafolder) # In[78]: # Will will use the Index files to retrieve our data # Each line in training set index file contains UUID, SIGNAL_CLASSIFICATION primarySmallIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_v2_small_1june_2017.csv' ) primaryMediumIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_v2_medium_1june_2017.csv') basicIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_basic_v2_26may_2017.csv' ) testSetIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_testset_mini_1june_2017.csv' ) # In[ ]: #define some variables for the different local directories where the data are stored # DO NOT WRITE TO THE 'data' DIRECTORY. MAKE SURE YOU ALWAYS USE YOUR TEAM DIRECTORY YOU CREATED ABOVE TO STORE ANY INTERMEDIATE RESULTS primarySetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_v2') basicSetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_basic_v2') testSetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_test_mini') # In[80]: # define a function that will take a row from the index file, # create a path to the local data file # retreive that data file # take some action def get_data_and_process(row): try: uuid, classification = row.split(',') except: uuid = row #this handles the test data since it doesn't have "SIGNAL_CLASSIFICATION" in index file classification = 'unknown: test data' #create path to local data file filename = uuid + '.dat' filepath = os.path.join(workingDataDir, filename) #retrieve that data file rawdata = open(filepath).read() # take some action aca = ibmseti.compamp.SimCompamp(rawdata) #spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps #do other work here. features = [] ## ?? Or other work you want to do on the file ## You can also save results at this point to your local 'my_team_folder'! #with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout: # fout.write('stuff') try: #catch exception if using testData because it won't have classification information header = aca.header() classfromfile = header['signal_classification'] assert classfromfile == classification #this better match! except: pass #return something useful return (classification, features) # In[81]: # Choose your own Adventure! workingDataDir = primarySetiDataDir # we parallelize the index file across our worker executors rdd = sc.textFile(primarySmallIndex, 120).filter(lambda x: x.startswith('UUID') is False) #the filter removes the header # then we have each worker executor perform the actions in our function defined above. get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_and_process).collect()') # In[82]: # massage the data into a Pandas DF (or we could have directly done this from the Spark RDD :P) and make sure the output makes sense. myresult_classes = map(lambda x: x[0], myresults) # In[83]: res = pd.DataFrame(myresult_classes, columns=['class']) # In[84]: res.groupby(['class']).size() # In[85]: # We do the same thing, but now with the test data set # set the workingDataDir appropriately workingDataDir = testSetiDataDir # parallelize the testSetIndex rdd = sc.textFile(testSetIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header # perform the same process, in this case. You'll probably write a different function to pass the test data into your classifier. get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_and_process).collect()') # In[86]: myresult_classes = map(lambda x: x[0], myresults) # In[87]: res = pd.DataFrame(myresult_classes, columns=['class']) # In[88]: res.groupby(['class']).size() # In[89]: # Same thing for the basic4 data set workingDataDir = basicSetiDataDir rdd = sc.textFile(basicIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_and_process).collect()') # In[90]: myresult_classes = map(lambda x: x[0], myresults) res = pd.DataFrame(myresult_classes, columns=['class']) res.groupby(['class']).size() #
# # ## 2. Retrieve data from IBM Object Storage # # Use this if you feel there are too many groups pulling from the local file system and things are slow. Data processing should take longer than reading the files from the network to Object Storage, so this should not create a bottleneck in your overall workflow on Spark Enterprise. # # #### Basic Same Process As Above # # Except in this case we use data entirely on IBM Object Storage. # In[130]: import os import ibmseti import requests import pandas as pd ### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results mydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name if os.path.exists(mydatafolder) is False: os.makedirs(mydatafolder) baseswiftURL = 'swift2d://dal05.objectstorage.service.networklayer.com/v1/AUTH_cdbef52bdf7a449c96936e1071f0a46b' basehttpURL = 'https://dal05.objectstorage.service.networklayer.com/v1/AUTH_cdbef52bdf7a449c96936e1071f0a46b' primarySmallIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_small_1june_2017.csv' ) primaryMediumIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_medium_1june_2017.csv') primaryFullIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_full_1june_2017.csv' ) basicIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_basic_v2_26may_2017.csv' ) testSetIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_testset_mini_1june_2017.csv' ) # In[131]: primarySetiDataDir = os.path.join( basehttpURL,'simsignals_v2') basicSetiDataDir = os.path.join( basehttpURL,'simsignals_basic_v2') testSetiDataDir = os.path.join( basehttpURL,'simsignals_test_mini') workingDataDir = primarySetiDataDir # In[132]: import requests # In[133]: def get_data_fromOS_and_process(row): try: uuid, classification = row.split(',') except: uuid = row #this handles the test data since it doesn't have "SIGNAL_CLASSIFICATION" in index file classification = 'unknown: test data' filename = uuid + '.dat' filepath = os.path.join(workingDataDir, filename) # We use python requests package to get our data r = requests.get(filepath, timeout=(3.0, 9.0)) #add a timeout just in case try: r.raise_for_status() except: return (r.status_code, []) aca = ibmseti.compamp.SimCompamp(r.content) #spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps #do other work here. features = [] ## ?? Or other work you want to do on the file ## You can also save results at this point to your local 'my_team_folder'! #with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout: # fout.write('stuff') try: #catch exception if using testData because it won't have classification information header = aca.header() classfromfile = header['signal_classification'] assert classfromfile == classification #this better match! except: pass #return something to your map function return (classification, features) # In[121]: # Grab the Basic Data Set workingDataDir = basicSetiDataDir rdd = sc.textFile(basicIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header #%time myresults = rdd.map(build_file_urls).map(pull_data).filter(filterFailed).map(loadibmseti).collect() get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_fromOS_and_process).collect()') # In[122]: myresult_classes = map(lambda x: x[0], myresults) res = pd.DataFrame(myresult_classes, columns=['class']) res.groupby(['class']).size() # In[134]: # Reading the entire 70k (20 GB) Primary Medium Data Set over Object Storage only takes 3 minutes!!! workingDataDir = primarySetiDataDir rdd = sc.textFile(primaryMediumIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header #%time myresults = rdd.map(build_file_urls).map(pull_data).filter(filterFailed).map(loadibmseti).collect() get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_fromOS_and_process).collect()') # In[135]: myresult_classes = map(lambda x: x[0], myresults) res = pd.DataFrame(myresult_classes, columns=['class']) res.groupby(['class']).size() # # Access Data from IBM PowerAI Deep Learning Platform on Nimbix Cloud # # # We have only one real option in this case because network connection from Nimbix Cloud to IBM Object Storage is not as fast. # # ## Use data stored in shared Nimbix Cloud Vault: `/data` # In[126]: import os import ibmseti import pandas as pd # Make sure to initialize Spark if you're going to use it on PowerAI systems import findspark findspark.init() import pyspark sc = pyspark.SparkContext(appName='seti') ### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results mydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name if os.path.exists(mydatafolder) is False: os.makedirs(mydatafolder) ## REMEMBER, on Nimbix, your local file space is destroyed when your cloud machine is shut down. So be sure to commit/save your work! # Index Files are in different location primarySmallIndex = '/data/seti/simsignals_files/public_list_primary_v2_small_1june_2017.csv' primaryMediumIndex = '/data/seti/simsignals_files/public_list_primary_v2_medium_1june_2017.csv' basicIndex = '/data/seti/simsignals_files/public_list_basic_v2_26may_2017.csv' testSetIndex = '/data/seti/simsignals_files/public_list_primary_testset_mini_1june_2017.csv' # In[129]: primarySetiDataDir = '/data/seti/simsignals_v2' #THIS ONLY CONTAINS THE SMALL AND MEDIUM DATA FILES! # Ask Adam, Patrick or Joseph on Saturday evening if you want the full data set. Hint: It's in simsignals_v2_full_N, for N=1,2,3,4 basicSetiDataDir = '/data/seti/simsignals_basic_v2' testSetiDataDir = '/data/seti/simsignals_test_mini' workingDataDir = basicSetiDataDir # In[128]: # define a function that will take a row from the index file, # create a path to the local data file # retreive that data file # take some action def get_data_and_process(row): try: uuid, classification = row.split(',') except: uuid = row #this handles the test data since it doesn't have "SIGNAL_CLASSIFICATION" in index file classification = 'unknown: test data' #create path to local data file filename = uuid + '.dat' filepath = os.path.join(workingDataDir, filename) #retrieve that data file rawdata = open(filepath).read() # take some action aca = ibmseti.compamp.SimCompamp(rawdata) #spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps #do work here. features = [] ## ?? Or other work you want to do on the file ## You could also save results at this point to your local mydatafolder #with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout: # fout.write('stuff') try: #catch exception if using testData because it won't have classification information header = aca.header() classfromfile = header['signal_classification'] assert classfromfile == classification #this better match! except: pass #return something useful return (classification, features) # In[ ]: # we parallelize the index file across our worker executors rdd = sc.textFile(primarySmallIndex).filter(lambda x: x.startswith('UUID') is False) #the filter removes the header # then we have each worker executor perform the actions in our function defined above. get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_and_process).collect()') # In[ ]: myresult_classes = map(lambda x: x[0], myresults) res = pd.DataFrame(myresult_classes, columns=['class']) res.groupby(['class']).size()