import os
import ibmseti
import pandas as pd
### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results
mydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name
if os.path.exists(mydatafolder) is False:
os.makedirs(mydatafolder)
# Will will use the Index files to retrieve our data
# Each line in training set index file contains UUID, SIGNAL_CLASSIFICATION
primarySmallIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_v2_small_1june_2017.csv' )
primaryMediumIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_v2_medium_1june_2017.csv')
basicIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_basic_v2_26may_2017.csv' )
testSetIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_testset_mini_1june_2017.csv' )
#define some variables for the different local directories where the data are stored
# DO NOT WRITE TO THE 'data' DIRECTORY. MAKE SURE YOU ALWAYS USE YOUR TEAM DIRECTORY YOU CREATED ABOVE TO STORE ANY INTERMEDIATE RESULTS
primarySetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_v2')
basicSetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_basic_v2')
testSetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_test_mini')
# define a function that will take a row from the index file,
# create a path to the local data file
# retreive that data file
# take some action
def get_data_and_process(row):
try:
uuid, classification = row.split(',')
except:
uuid = row #this handles the test data since it doesn't have "SIGNAL_CLASSIFICATION" in index file
classification = 'unknown: test data'
#create path to local data file
filename = uuid + '.dat'
filepath = os.path.join(workingDataDir, filename)
#retrieve that data file
rawdata = open(filepath).read()
# take some action
aca = ibmseti.compamp.SimCompamp(rawdata)
#spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps
#do other work here.
features = [] ## ?? Or other work you want to do on the file
## You can also save results at this point to your local 'my_team_folder'!
#with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout:
# fout.write('stuff')
try: #catch exception if using testData because it won't have classification information
header = aca.header()
classfromfile = header['signal_classification']
assert classfromfile == classification #this better match!
except:
pass
#return something useful
return (classification, features)
# Choose your own Adventure!
workingDataDir = primarySetiDataDir
# we parallelize the index file across our worker executors
rdd = sc.textFile(primarySmallIndex, 120).filter(lambda x: x.startswith('UUID') is False) #the filter removes the header
# then we have each worker executor perform the actions in our function defined above.
%time myresults = rdd.map(get_data_and_process).collect()
CPU times: user 70.3 ms, sys: 14.3 ms, total: 84.6 ms Wall time: 6.23 s
# massage the data into a Pandas DF (or we could have directly done this from the Spark RDD :P) and make sure the output makes sense.
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()
class brightpixel 1000 narrowband 1000 narrowbanddrd 1000 noise 1000 squarepulsednarrowband 1000 squiggle 1000 squigglesquarepulsednarrowband 1000 dtype: int64
# We do the same thing, but now with the test data set
# set the workingDataDir appropriately
workingDataDir = testSetiDataDir
# parallelize the testSetIndex
rdd = sc.textFile(testSetIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header
# perform the same process, in this case. You'll probably write a different function to pass the test data into your classifier.
%time myresults = rdd.map(get_data_and_process).collect()
CPU times: user 57.8 ms, sys: 12.5 ms, total: 70.3 ms Wall time: 1.65 s
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()
class unknown: test data 1405 dtype: int64
# Same thing for the basic4 data set
workingDataDir = basicSetiDataDir
rdd = sc.textFile(basicIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header
%time myresults = rdd.map(get_data_and_process).collect()
CPU times: user 71.7 ms, sys: 13.2 ms, total: 84.9 ms Wall time: 7.17 s
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()
class narrowband 1000 narrowbanddrd 1000 noise 1000 squiggle 1000 dtype: int64
Use this if you feel there are too many groups pulling from the local file system and things are slow. Data processing should take longer than reading the files from the network to Object Storage, so this should not create a bottleneck in your overall workflow on Spark Enterprise.
Except in this case we use data entirely on IBM Object Storage.
import os
import ibmseti
import requests
import pandas as pd
### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results
mydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name
if os.path.exists(mydatafolder) is False:
os.makedirs(mydatafolder)
baseswiftURL = 'swift2d://dal05.objectstorage.service.networklayer.com/v1/AUTH_cdbef52bdf7a449c96936e1071f0a46b'
basehttpURL = 'https://dal05.objectstorage.service.networklayer.com/v1/AUTH_cdbef52bdf7a449c96936e1071f0a46b'
primarySmallIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_small_1june_2017.csv' )
primaryMediumIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_medium_1june_2017.csv')
primaryFullIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_full_1june_2017.csv' )
basicIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_basic_v2_26may_2017.csv' )
testSetIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_testset_mini_1june_2017.csv' )
primarySetiDataDir = os.path.join( basehttpURL,'simsignals_v2')
basicSetiDataDir = os.path.join( basehttpURL,'simsignals_basic_v2')
testSetiDataDir = os.path.join( basehttpURL,'simsignals_test_mini')
workingDataDir = primarySetiDataDir
import requests
def get_data_fromOS_and_process(row):
try:
uuid, classification = row.split(',')
except:
uuid = row #this handles the test data since it doesn't have "SIGNAL_CLASSIFICATION" in index file
classification = 'unknown: test data'
filename = uuid + '.dat'
filepath = os.path.join(workingDataDir, filename)
# We use python requests package to get our data
r = requests.get(filepath, timeout=(3.0, 9.0)) #add a timeout just in case
try:
r.raise_for_status()
except:
return (r.status_code, [])
aca = ibmseti.compamp.SimCompamp(r.content)
#spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps
#do other work here.
features = [] ## ?? Or other work you want to do on the file
## You can also save results at this point to your local 'my_team_folder'!
#with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout:
# fout.write('stuff')
try: #catch exception if using testData because it won't have classification information
header = aca.header()
classfromfile = header['signal_classification']
assert classfromfile == classification #this better match!
except:
pass
#return something to your map function
return (classification, features)
# Grab the Basic Data Set
workingDataDir = basicSetiDataDir
rdd = sc.textFile(basicIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header
#%time myresults = rdd.map(build_file_urls).map(pull_data).filter(filterFailed).map(loadibmseti).collect()
%time myresults = rdd.map(get_data_fromOS_and_process).collect()
CPU times: user 72.1 ms, sys: 18.4 ms, total: 90.5 ms Wall time: 15.2 s
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()
class narrowband 1000 narrowbanddrd 1000 noise 1000 squiggle 1000 dtype: int64
# Reading the entire 70k (20 GB) Primary Medium Data Set over Object Storage only takes 3 minutes!!!
workingDataDir = primarySetiDataDir
rdd = sc.textFile(primaryMediumIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header
#%time myresults = rdd.map(build_file_urls).map(pull_data).filter(filterFailed).map(loadibmseti).collect()
%time myresults = rdd.map(get_data_fromOS_and_process).collect()
CPU times: user 286 ms, sys: 43.8 ms, total: 329 ms Wall time: 3min 19s
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()
class brightpixel 10000 narrowband 10000 narrowbanddrd 10000 noise 10000 squarepulsednarrowband 10000 squiggle 10000 squigglesquarepulsednarrowband 10000 dtype: int64
import os
import ibmseti
import pandas as pd
# Make sure to initialize Spark if you're going to use it on PowerAI systems
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName='seti')
### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results
mydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name
if os.path.exists(mydatafolder) is False:
os.makedirs(mydatafolder)
## REMEMBER, on Nimbix, your local file space is destroyed when your cloud machine is shut down. So be sure to commit/save your work!
# Index Files are in different location
primarySmallIndex = '/data/seti/simsignals_files/public_list_primary_v2_small_1june_2017.csv'
primaryMediumIndex = '/data/seti/simsignals_files/public_list_primary_v2_medium_1june_2017.csv'
basicIndex = '/data/seti/simsignals_files/public_list_basic_v2_26may_2017.csv'
testSetIndex = '/data/seti/simsignals_files/public_list_primary_testset_mini_1june_2017.csv'
primarySetiDataDir = '/data/seti/simsignals_v2' #THIS ONLY CONTAINS THE SMALL AND MEDIUM DATA FILES!
# Ask Adam, Patrick or Joseph on Saturday evening if you want the full data set. Hint: It's in simsignals_v2_full_N, for N=1,2,3,4
basicSetiDataDir = '/data/seti/simsignals_basic_v2'
testSetiDataDir = '/data/seti/simsignals_test_mini'
workingDataDir = basicSetiDataDir
# define a function that will take a row from the index file,
# create a path to the local data file
# retreive that data file
# take some action
def get_data_and_process(row):
try:
uuid, classification = row.split(',')
except:
uuid = row #this handles the test data since it doesn't have "SIGNAL_CLASSIFICATION" in index file
classification = 'unknown: test data'
#create path to local data file
filename = uuid + '.dat'
filepath = os.path.join(workingDataDir, filename)
#retrieve that data file
rawdata = open(filepath).read()
# take some action
aca = ibmseti.compamp.SimCompamp(rawdata)
#spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps
#do work here.
features = [] ## ?? Or other work you want to do on the file
## You could also save results at this point to your local mydatafolder
#with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout:
# fout.write('stuff')
try: #catch exception if using testData because it won't have classification information
header = aca.header()
classfromfile = header['signal_classification']
assert classfromfile == classification #this better match!
except:
pass
#return something useful
return (classification, features)
# we parallelize the index file across our worker executors
rdd = sc.textFile(primarySmallIndex).filter(lambda x: x.startswith('UUID') is False) #the filter removes the header
# then we have each worker executor perform the actions in our function defined above.
%time myresults = rdd.map(get_data_and_process).collect()
myresult_classes = map(lambda x: x[0], myresults)
res = pd.DataFrame(myresult_classes, columns=['class'])
res.groupby(['class']).size()