#!/usr/bin/env python
# coding: utf-8
# # Options to Access Data from Spark Enterprise Cluster
#
# 1. Use data stored in local file system. If this gets slow due to too many users on the system, can try
# 2. Pull data from IBM Object Storage. The network connection to IBM Object Storage is quite fast
#
# ## 1. Using Local Files on Spark Enterprise SETI cluster
#
# In[123]:
import os
import ibmseti
import pandas as pd
### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results
mydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name
if os.path.exists(mydatafolder) is False:
os.makedirs(mydatafolder)
# In[78]:
# Will will use the Index files to retrieve our data
# Each line in training set index file contains UUID, SIGNAL_CLASSIFICATION
primarySmallIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_v2_small_1june_2017.csv' )
primaryMediumIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_v2_medium_1june_2017.csv')
basicIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_basic_v2_26may_2017.csv' )
testSetIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_testset_mini_1june_2017.csv' )
# In[ ]:
#define some variables for the different local directories where the data are stored
# DO NOT WRITE TO THE 'data' DIRECTORY. MAKE SURE YOU ALWAYS USE YOUR TEAM DIRECTORY YOU CREATED ABOVE TO STORE ANY INTERMEDIATE RESULTS
primarySetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_v2')
basicSetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_basic_v2')
testSetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_test_mini')
# In[80]:
# define a function that will take a row from the index file,
# create a path to the local data file
# retreive that data file
# take some action
def get_data_and_process(row):
try:
uuid, classification = row.split(',')
except:
uuid = row #this handles the test data since it doesn't have "SIGNAL_CLASSIFICATION" in index file
classification = 'unknown: test data'
#create path to local data file
filename = uuid + '.dat'
filepath = os.path.join(workingDataDir, filename)
#retrieve that data file
rawdata = open(filepath).read()
# take some action
aca = ibmseti.compamp.SimCompamp(rawdata)
#spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps
#do other work here.
features = [] ## ?? Or other work you want to do on the file
## You can also save results at this point to your local 'my_team_folder'!
#with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout:
# fout.write('stuff')
try: #catch exception if using testData because it won't have classification information
header = aca.header()
classfromfile = header['signal_classification']
assert classfromfile == classification #this better match!
except:
pass
#return something useful
return (classification, features)
# In[81]:
# Choose your own Adventure!
workingDataDir = primarySetiDataDir
# we parallelize the index file across our worker executors
rdd = sc.textFile(primarySmallIndex, 120).filter(lambda x: x.startswith('UUID') is False) #the filter removes the header
# then we have each worker executor perform the actions in our function defined above.
get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_and_process).collect()')
# In[82]:
# massage the data into a Pandas DF (or we could have directly done this from the Spark RDD :P) and make sure the output makes sense.
myresult_classes = map(lambda x: x[0], myresults)
# In[83]:
res = pd.DataFrame(myresult_classes, columns=['class'])
# In[84]:
res.groupby(['class']).size()
# In[85]:
# We do the same thing, but now with the test data set
# set the workingDataDir appropriately
workingDataDir = testSetiDataDir
# parallelize the testSetIndex
rdd = sc.textFile(testSetIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header
# perform the same process, in this case. You'll probably write a different function to pass the test data into your classifier.
get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_and_process).collect()')
# In[86]:
myresult_classes = map(lambda x: x[0], myresults)
# In[87]:
res = pd.DataFrame(myresult_classes, columns=['class'])
# In[88]:
res.groupby(['class']).size()
# In[89]:
# Same thing for the basic4 data set
workingDataDir = basicSetiDataDir
rdd = sc.textFile(basicIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header
get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_and_process).collect()')
# In[90]:
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()
#
#
# ## 2. Retrieve data from IBM Object Storage
#
# Use this if you feel there are too many groups pulling from the local file system and things are slow. Data processing should take longer than reading the files from the network to Object Storage, so this should not create a bottleneck in your overall workflow on Spark Enterprise.
#
# #### Basic Same Process As Above
#
# Except in this case we use data entirely on IBM Object Storage.
# In[130]:
import os
import ibmseti
import requests
import pandas as pd
### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results
mydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name
if os.path.exists(mydatafolder) is False:
os.makedirs(mydatafolder)
baseswiftURL = 'swift2d://dal05.objectstorage.service.networklayer.com/v1/AUTH_cdbef52bdf7a449c96936e1071f0a46b'
basehttpURL = 'https://dal05.objectstorage.service.networklayer.com/v1/AUTH_cdbef52bdf7a449c96936e1071f0a46b'
primarySmallIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_small_1june_2017.csv' )
primaryMediumIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_medium_1june_2017.csv')
primaryFullIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_full_1june_2017.csv' )
basicIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_basic_v2_26may_2017.csv' )
testSetIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_testset_mini_1june_2017.csv' )
# In[131]:
primarySetiDataDir = os.path.join( basehttpURL,'simsignals_v2')
basicSetiDataDir = os.path.join( basehttpURL,'simsignals_basic_v2')
testSetiDataDir = os.path.join( basehttpURL,'simsignals_test_mini')
workingDataDir = primarySetiDataDir
# In[132]:
import requests
# In[133]:
def get_data_fromOS_and_process(row):
try:
uuid, classification = row.split(',')
except:
uuid = row #this handles the test data since it doesn't have "SIGNAL_CLASSIFICATION" in index file
classification = 'unknown: test data'
filename = uuid + '.dat'
filepath = os.path.join(workingDataDir, filename)
# We use python requests package to get our data
r = requests.get(filepath, timeout=(3.0, 9.0)) #add a timeout just in case
try:
r.raise_for_status()
except:
return (r.status_code, [])
aca = ibmseti.compamp.SimCompamp(r.content)
#spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps
#do other work here.
features = [] ## ?? Or other work you want to do on the file
## You can also save results at this point to your local 'my_team_folder'!
#with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout:
# fout.write('stuff')
try: #catch exception if using testData because it won't have classification information
header = aca.header()
classfromfile = header['signal_classification']
assert classfromfile == classification #this better match!
except:
pass
#return something to your map function
return (classification, features)
# In[121]:
# Grab the Basic Data Set
workingDataDir = basicSetiDataDir
rdd = sc.textFile(basicIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header
#%time myresults = rdd.map(build_file_urls).map(pull_data).filter(filterFailed).map(loadibmseti).collect()
get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_fromOS_and_process).collect()')
# In[122]:
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()
# In[134]:
# Reading the entire 70k (20 GB) Primary Medium Data Set over Object Storage only takes 3 minutes!!!
workingDataDir = primarySetiDataDir
rdd = sc.textFile(primaryMediumIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header
#%time myresults = rdd.map(build_file_urls).map(pull_data).filter(filterFailed).map(loadibmseti).collect()
get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_fromOS_and_process).collect()')
# In[135]:
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()
# # Access Data from IBM PowerAI Deep Learning Platform on Nimbix Cloud
#
#
# We have only one real option in this case because network connection from Nimbix Cloud to IBM Object Storage is not as fast.
#
# ## Use data stored in shared Nimbix Cloud Vault: `/data`
# In[126]:
import os
import ibmseti
import pandas as pd
# Make sure to initialize Spark if you're going to use it on PowerAI systems
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName='seti')
### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results
mydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name
if os.path.exists(mydatafolder) is False:
os.makedirs(mydatafolder)
## REMEMBER, on Nimbix, your local file space is destroyed when your cloud machine is shut down. So be sure to commit/save your work!
# Index Files are in different location
primarySmallIndex = '/data/seti/simsignals_files/public_list_primary_v2_small_1june_2017.csv'
primaryMediumIndex = '/data/seti/simsignals_files/public_list_primary_v2_medium_1june_2017.csv'
basicIndex = '/data/seti/simsignals_files/public_list_basic_v2_26may_2017.csv'
testSetIndex = '/data/seti/simsignals_files/public_list_primary_testset_mini_1june_2017.csv'
# In[129]:
primarySetiDataDir = '/data/seti/simsignals_v2' #THIS ONLY CONTAINS THE SMALL AND MEDIUM DATA FILES!
# Ask Adam, Patrick or Joseph on Saturday evening if you want the full data set. Hint: It's in simsignals_v2_full_N, for N=1,2,3,4
basicSetiDataDir = '/data/seti/simsignals_basic_v2'
testSetiDataDir = '/data/seti/simsignals_test_mini'
workingDataDir = basicSetiDataDir
# In[128]:
# define a function that will take a row from the index file,
# create a path to the local data file
# retreive that data file
# take some action
def get_data_and_process(row):
try:
uuid, classification = row.split(',')
except:
uuid = row #this handles the test data since it doesn't have "SIGNAL_CLASSIFICATION" in index file
classification = 'unknown: test data'
#create path to local data file
filename = uuid + '.dat'
filepath = os.path.join(workingDataDir, filename)
#retrieve that data file
rawdata = open(filepath).read()
# take some action
aca = ibmseti.compamp.SimCompamp(rawdata)
#spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps
#do work here.
features = [] ## ?? Or other work you want to do on the file
## You could also save results at this point to your local mydatafolder
#with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout:
# fout.write('stuff')
try: #catch exception if using testData because it won't have classification information
header = aca.header()
classfromfile = header['signal_classification']
assert classfromfile == classification #this better match!
except:
pass
#return something useful
return (classification, features)
# In[ ]:
# we parallelize the index file across our worker executors
rdd = sc.textFile(primarySmallIndex).filter(lambda x: x.startswith('UUID') is False) #the filter removes the header
# then we have each worker executor perform the actions in our function defined above.
get_ipython().run_line_magic('time', 'myresults = rdd.map(get_data_and_process).collect()')
# In[ ]:
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()