import pandas as pd
import numpy as np
import sklearn as sk
import pickle
print 'pandas version: ',pd.__version__
print 'numpy version:',np.__version__
print 'sklearn version:',sk.__version__
pandas version: 0.13.1 numpy version: 1.8.1 sklearn version: 0.14.1
import sys
home_dir='/home/ubuntu/UCSD_BigData'
sys.path.append(home_dir+'/utils')
from find_waiting_flow import *
from AWS_keypair_management import *
!pwd
/home/ubuntu/UCSD_BigData/notebooks/weather.mapreduce
It can get confusing with the AWS credentials. So I created a few new scripts that should make the process easier and error-free.
Before continuing, you need to get the credentials from AWS by using the IAM console.
The credentials are downloaded into files with names such as credentials.csv, credentials (2).csv etc.
The LaunchNotebookServer.py script has a command that can help you get these files to your EC2 instance, the option you need is -A and it is used as
LaunchNotebookServer.py -A ~/Downloads/credentials\*.csv
The \*
, rather than *
is intentional.
The files will be copied through scp
to the directory \home\ubuntu\Vault
on your ec2 instance.
You can then run the following command that will read through the files and check which of the credentials is active. It will then
generate a dictionary Cred
that holds all of the key pairs that are currently active.
In the following cells all of your secret pairs are printed out. Make sure that the cell output is deleted before you commit it to version to github.
# creds = pickle.load(open('/home/ubuntu/Vault/Creds.pkl'))
# config = creds['mrjob']
# key_id = config['key_id']
# secret_key = config['secret_key']
# s3_bucket = config['s3_logs']
# s3_scratch= config['s3_scratch']
# print config
# print key_id
# print secret_key
# print s3_bucket
# print s3_scratch
def get_job_flow_id():
job_flow_id=find_waiting_flow(key_id,secret_key)
return job_flow_id
# get_job_flow_id()
DEBUG = True # if debug == true, remove all existing files, and regenerate
%%writefile totalNumber.py
from mrjob.job import MRJob
class totalNumberOfStations(MRJob):
def mapper(self, _, line):
self.increment_counter('MrJob Counters','mapper',1)
elements=line.split(',')
if len(elements) != 368:
yield 'corrupted data', 1
else:
yield 'useful data', 1
yield(elements[0],1)
def combiner(self, key, counts):
self.increment_counter('MrJob Counters','combiner',1)
yield key, sum(counts)
def reducer(self, key, counts):
self.increment_counter('MrJob Counters','reducer',1)
yield key, sum(counts)
if __name__ == '__main__':
totalNumberOfStations.run()
Overwriting totalNumber.py
# try to run this job on local machien
import os
if not os.path.exists("all_station_count"):
!python totalNumber.py -r emr --emr-job-flow-id=$job_flow_id hdfs:/weather/weather.csv > all_station_count
!head -10 all_station_count
"AJ000037668" 28 "AJ000037679" 29 "AJ000037734" 56 "AJ000037756" 105 "AJ000037844" 110 "AJ000037866" 85 "AJ000037888" 27 "AJ000037899" 94 "AJ000037907" 154 "AM000037627" 52
import os,sys,re,pickle,coding
from numpy import *
!gunzip stations.pkl.gz
stations=pickle.load(open("stations.pkl", 'rb'))
!gzip stations.pkl
stations.shape
type(stations)
pandas.core.frame.DataFrame
stations.head()
latitude | longitude | elevation | state | name | GSNFLAG | HCNFLAG | WMOID | |
---|---|---|---|---|---|---|---|---|
ACW00011604 | 17.1167 | -61.7833 | 10.1 | NaN | ST JOHNS COOLIDGE FLD | NaN | NaN | NaN |
ACW00011647 | 17.1333 | -61.7833 | 19.2 | NaN | ST JOHNS | NaN | NaN | NaN |
AE000041196 | 25.3330 | 55.5170 | 34.0 | NaN | SHARJAH INTER. AIRP | GSN | NaN | 41196 |
AF000040930 | 35.3170 | 69.0170 | 3366.0 | NaN | NORTH-SALANG | GSN | NaN | 40930 |
AG000060390 | 36.7167 | 3.2500 | 24.0 | NaN | ALGER-DAR EL BEIDA | GSN | NaN | 60390 |
5 rows × 8 columns
print stations.shape
(85284, 8)
# experiment: how to get a single row from stations
stations.loc['ACW00011604']
var = 'ACW00011604'
if var in stations:
print True
else:
print False
False
# now correlate the count of stations with the station table
from pandas import DataFrame
source = {} # create a dictionary for later use
ins = open("all_station_count", "r")
for line in ins:
line = line.strip().split('\t')
name= line[0][1:-1]
count= line[1]
if name != "useful data":
source[name] = count
record_count = DataFrame.from_dict(source, orient="index").rename(columns={0:'count'})
record_count.head()
count | |
---|---|
CA004035200 | 87 |
USS0006H19S | 134 |
USC00390043 | 1061 |
UY000001086 | 20 |
UY000001084 | 20 |
5 rows × 1 columns
#now join the station table and record_count table
all_record = record_count.join(other=stations, how="left", lsuffix="lf", rsuffix="rf").rename(columns={ 0:'count'})
# # then we can perform the partitioning work
# all_record = all_record.sort(columns='latitude')
all_record.head()
count | latitude | longitude | elevation | state | name | GSNFLAG | HCNFLAG | WMOID | |
---|---|---|---|---|---|---|---|---|---|
CA004035200 | 87 | 49.1700 | -104.5800 | 695.0 | NaN | MINTON | NaN | NaN | NaN |
USS0006H19S | 134 | 41.3333 | -106.5000 | 2572.5 | Y | SOUTH BRUSH CREEK | NaN | NaN | NaN |
USC00390043 | 1061 | 43.4892 | -99.0631 | 512.1 | D | ACADEMY 2NE | NaN | HCN | NaN |
UY000001086 | 20 | -30.6500 | -56.1700 | 70.0 | NaN | CHARQUEADA | NaN | NaN | NaN |
UY000001084 | 20 | -30.6500 | -56.3800 | 190.0 | NaN | GUAYUBIRA | NaN | NaN | NaN |
5 rows × 9 columns
print all_record.shape[0]
85284
# create the treenode class
class TreeNode:
#ALL_RECORD_TEST = DataFrame(randn(33, 3), columns=['latitude', 'longitude', 'dummy_val'])
STATION_LIMIT = 100 # determine the number of stations in the leaf
def __init__(self, _parent=None):
self.parent = _parent
self.leftChild = None
self.rightChild = None
self.stationList = None
self.isLeaf = False
self.color = None
# pca related data
self.id = None
self.k = -1
self.compare = None
partition_data = all_record.loc[:, ['latitude','longitude','count']]
partition_data.head()
latitude | longitude | count | |
---|---|---|---|
CA004035200 | 49.1700 | -104.5800 | 87 |
USS0006H19S | 41.3333 | -106.5000 | 134 |
USC00390043 | 43.4892 | -99.0631 | 1061 |
UY000001086 | -30.6500 | -56.1700 | 20 |
UY000001084 | -30.6500 | -56.3800 | 20 |
5 rows × 3 columns
print partition_data.shape
(85284, 3)
Yoav: What is the goal of the following cell?
# # try to extract rows, and it works
# test = stations[0:3]
# test.shape[0]
count = partition_data.sum(axis=0)['latitude'] # demo usage
print dtype(count)
tmp = partition_data.iloc[1]['latitude']
print tmp
print partition_data.sum(axis=0)['count']
float64 41.3333 inf
Yoav: What does the code in the next cells do? How do you know whether it does what it is supposed to do correctly?
# this implementation might be buggy, check the logic
def partition(df_local, leafList, direction="lat", _parent=None):
if df_local.shape[0] <= TreeNode.STATION_LIMIT:
stations = []
for index, row in df_local.iterrows():
stations.append(index)
curNode = TreeNode()
curNode.stationList = stations
curNode.parent = _parent
curNode.isLeaf = True
leafList.append(curNode)
return curNode
elif direction == "lat":
df_local = df_local.sort(columns='latitude')
startRowIndex = 0
endRowIndex = df_local.shape[0] - 1
# the weight-based partition
start = startRowIndex - 1
end = endRowIndex + 1
left = 0
right= 0
while start < end:
if left == right:
start += 1
end -= 1
left = int(df_local.iloc[start]['count'])
right= int(df_local.iloc[end]['count'])
elif left < right:
start +=1
end = end
right = right - left
left = int(df_local.iloc[start]['count'])
else: # left > right
start = start
end -= 1
left = left - right
right = int(df_local.iloc[end]['count'])
midRowIndex = start
# make it into a binary tree
curNode = TreeNode()
curNode.leftChild = partition(df_local[startRowIndex: midRowIndex+1], leafList, direction="long", _parent = curNode)
curNode.rightChild = partition(df_local[midRowIndex+1: endRowIndex+1], leafList, direction="long", _parent = curNode)
curNode.parent = _parent
return curNode
else: # for 'long' direction
df_local = df_local.sort(columns='longitude')
startRowIndex = 0
endRowIndex = df_local.shape[0] - 1
# the weight-based partition
start = startRowIndex - 1
end = endRowIndex + 1
left = 0
right= 0
while start < end:
if left == right:
start += 1
end -= 1
left = int(df_local.iloc[start]['count'])
right= int(df_local.iloc[end]['count'])
elif left < right:
start +=1
end = end
right = right - left
left = int(df_local.iloc[start]['count'])
else: # left > right
start = start
end -= 1
left = left - right
right = int(df_local.iloc[end]['count'])
midRowIndex = start
# create the partition
curNode = TreeNode()
curNode.leftChild = partition(df_local[startRowIndex: midRowIndex+1], leafList, direction="lat", _parent = curNode)
curNode.rightChild = partition(df_local[midRowIndex+1: endRowIndex+1], leafList, direction="lat", _parent = curNode)
curNode.parent = _parent
return curNode
print os.path.exists("partition_yue.pkl")
True
def get_all_leaves(root):
retList = []
# using level order traversal
curLevel = [root]
nextLevel = []
while len(curLevel) != 0:
for node in curLevel:
if node.leftChild != None:
nextLevel.append(node.leftChild)
if node.rightChild != None:
nextLevel.append(node.rightChild)
if node.leftChild == None and node.rightChild == None:
retList.append(node)
# prepare for next iteration
curLevel = nextLevel[:]
nextLevel = []
return retList
leaf_list = []
global_root = None
if os.path.exists("partition_yue.pkl"): # should name it with pkl extension actually
print "loading partition"
global_root = pickle.load(open("partition_yue.pkl", "rb"))
print "populating leaf list"
leaf_list = get_all_leaves(global_root)
else:
print "regenerating partition"
global_root = partition(partition_data, leaf_list)
# verify that the leaf_list is storing all the leaves--> done
partition_success = True
for itr in leaf_list:
if itr.isLeaf != True:
partition_success = False
# store the partition upon success
# does this writing preserve the entire tree?
if partition_success == True:
print "writing partition to file"
pickle.dump(global_root, open("partition_yue.pkl","wb"))
loading partition populating leaf list
stations.loc['AYW00090001']['latitude']
-90.0
Yoav: The map below looks bad. The regions tend to be tall and narrow. Probably a bug in the code above.
from mpl_toolkits.basemap import Basemap
import matplotlib.pyplot as plt
lonmin=-180;lonmax=180;latsmin=-80;latsmax=80;
plt.figure(figsize=(15,10),dpi=300)
m = Basemap(projection='merc',llcrnrlat=latsmin,urcrnrlat=latsmax,\
llcrnrlon=lonmin,urcrnrlon=lonmax,lat_ts=20,resolution='i')
m.drawcoastlines(linewidth=0.5)
# draw parallels and meridians.
parallels = np.arange(-80,81,10.)
m.drawparallels(parallels,labels=[False,True,True,False])
meridians = np.arange(10.,351.,20.)
m.drawmeridians(meridians,labels=[True,False,False,True])
lat_list = []
log_list = []
count_list = []
for node in leaf_list:
for station in node.stationList:
try:
entry = stations.loc[station] # using the name
lat = entry['latitude']
log = entry['longitude']
log_list.append(log)
lat_list.append(lat)
count_list.append(len(node.stationList))
except Exception:
a = 1
x, y = m(log_list,lat_list)
m.scatter(x, y, c=count_list, s=20, cmap=plt.cm.jet, edgecolors='None', alpha=0.75)
cbar = plt.colorbar()
plt.title('partition visual')
plt.show()
step1: extract the max-min concatenated vector for each station
step2: perform the pca on each node and determine k. (this step works on small sets of data locally and on cluster, but when feeding in large data it fails. i tried to make the mapper and reducer perform cell-wise add in a naive way to save memory, but still got the code 137 error. )
# we need some serialize/de- to turn list into string, notice this is not pickle
def serialize(inputList):
ret_string = ""
for i in inputList:
ret_string = ret_string + "," + str(i)
return ret_string[1:]
def deserialize(line):
ret_array_string = line[1:-1].strip().split(",")
ret_array_val = [None] * len(ret_array_string)
for i in range(0, len(ret_array_string)):
ret_array_val[i] = int(ret_array_string[i])
return ret_array_val
Yoav: Again, not enough documentation.
%%writefile max_min_collector.py
from mrjob.job import MRJob
class max_min(MRJob):
# helper - linear insertion
# the return value is always [] + boolean, with the boolean indicating the linear insertion is successful or not
# in the rare case that it fails, the original list is completely useless
def sanitize(self, inputList, target):
lastIndex = -1
i = 0
while i < len(inputList):
if inputList[i] == target:
i = i + 1
else:
# set the thing in between
if lastIndex == -1:
# set everything in between to be input[i]
set_to = inputList[i]
for j in range(lastIndex + 1, i):
inputList[j] = set_to
else:
slope = (inputList[i] - inputList[lastIndex]) / (i-1 - lastIndex + 1)
base_val = inputList[lastIndex]
for j in range(lastIndex + 1, i):
inputList[j] = base_val + slope * (j - lastIndex)
# find the next
j = i + 1
for j in range(i + 1, len(inputList)):
if inputList[j] == target: break
else: j += 1
if j == len(inputList):
i = len(inputList)
else:
lastIndex = j - 1
i = j
# it is possible that last sequence is not set
if inputList[-1] != target:
return inputList
elif inputList[-1] == target and lastIndex != -1:
set_to = inputList[lastIndex]
for i in range(lastIndex + 1, len(inputList)):
inputList[i] = set_to
return inputList
else:
return []
# helper function: serialize a list of numbers into a comma separated string
def serialize(self, inputList):
ret_string = ""
for i in inputList:
ret_string = ret_string + "," + str(i)
return ret_string[1:]
# the standard workflow for map-reduce
def mapper(self, _, line):
elements=line.split(',')
if len(elements) != 368:
dummy = 1
elif elements[1]=='TMAX' or elements[1]=='TMIN': # only collectingor temperatures
one_year = elements[3:]
yield elements[0], (elements[1], one_year) # yielding in the format: station -> (max/min, reading for 365 days)
def combiner(self, key, value):# value is a list
max_value = [-10000] * 365
min_value = [+10000] * 365
for value_itr in value:
if value_itr[0] == 'TMAX': # updating the max_value array
data_array = value_itr[1]
for i in range(0, 365):
if data_array[i] == '': continue
else:
if int(data_array[i]) > max_value[i]: max_value[i] = int(data_array[i])
else: # == 'TMIN', updating the min_value array
data_array = value_itr[1]
for i in range(0, 365):
if data_array[i] == '': continue
else:
if int(data_array[i]) < min_value[i]: min_value[i] = int(data_array[i])
yield key, ('TMAX', max_value)
yield key, ('TMIN', min_value)
def reducer(self, key, value):
max_value = [-10000] * 365
min_value = [+10000] * 365
for value_itr in value:
if value_itr[0] == 'TMAX': # updating the max_value array
data_array = value_itr[1]
for i in range(0, 365):
if data_array[i] == '': continue
else:
if int(data_array[i]) > max_value[i]: max_value[i] = int(data_array[i])
else: # == 'TMIN', updating the min_value array
data_array = value_itr[1]
for i in range(0, 365):
if data_array[i] == '': continue
else:
if int(data_array[i]) < min_value[i]: min_value[i] = int(data_array[i])
# get rid of corrupted data points, by linear inseration
max_value = self.sanitize(max_value, -10000)
min_value = self.sanitize(min_value, +10000)
if max_value == [] or min_value == []:
dummy = 1
else:
# yielding to the stdout, construct a 365 * 1 * 2 list by appending min readings to max readings
output_string = self.serialize(max_value + min_value)
yield key, output_string
if __name__ == '__main__':
max_min.run()
Overwriting max_min_collector.py
#!rm mapping.txt
# execute the map-reduce work flow and write to a file called mapping.txt
if os.path.exists("mapping.txt") == True:
print "mapping.txt already exists! Do nothing!"
else:
job_flow_id = get_job_flow_id()
#!python totalNumber.py -r emr --emr-job-flow-id=$job_flow_id hdfs:/weather/weather.csv > all_station_count # working reference
!python max_min_collector.py -r emr --emr-job-flow-id=$job_flow_id hdfs:/weather/weather.csv > mapping.txt
<boto.emr.emrobject.JobFlow object at 0xa3dc8d0> no_script.yoavfreund.20140517.080731.371759 j-2EFV3O64LF1U2 WAITING <boto.emr.emrobject.JobFlow object at 0xa33df10> no_script.yoavfreund.20140525.174308.746673 j-LTOJMJ14G840 WAITING using configs in /home/ubuntu/.mrjob.conf creating tmp directory /tmp/max_min_collector.ubuntu.20140525.221302.218896 Copying non-input files into s3://yuf/scratch/max_min_collector.ubuntu.20140525.221302.218896/files/ Adding our job to existing job flow j-LTOJMJ14G840 Job launched 30.5s ago, status RUNNING: Running step (max_min_collector.ubuntu.20140525.221302.218896: Step 1 of 1) Job launched 61.0s ago, status RUNNING: Running step (max_min_collector.ubuntu.20140525.221302.218896: Step 1 of 1) Job launched 91.6s ago, status RUNNING: Running step (max_min_collector.ubuntu.20140525.221302.218896: Step 1 of 1) Job launched 122.2s ago, status RUNNING: Running step (max_min_collector.ubuntu.20140525.221302.218896: Step 1 of 1) Job launched 152.9s ago, status RUNNING: Running step (max_min_collector.ubuntu.20140525.221302.218896: Step 1 of 1) Job launched 183.3s ago, status RUNNING: Running step (max_min_collector.ubuntu.20140525.221302.218896: Step 1 of 1) Job completed. Running time was 166.0s (not counting time spent waiting for the EC2 instances) ec2_key_pair_file not specified, going to S3 Fetching counters from S3... Waiting 5.0s for S3 eventual consistency Counters may not have been uploaded to S3 yet. Try again in 5 minutes with: mrjob fetch-logs --counters j-LTOJMJ14G840 Counters from step 1: (no counters found) Streaming final output from s3://yuf/scratch/max_min_collector.ubuntu.20140525.221302.218896/output/ removing tmp directory /tmp/max_min_collector.ubuntu.20140525.221302.218896 Removing all files in s3://yuf/scratch/max_min_collector.ubuntu.20140525.221302.218896/
!ls -al | grep "mapping*"
-rw-rw-r-- 1 ubuntu ubuntu 83409067 May 25 22:16 mapping.txt
!rm station_2_data.pkl
# the suggested way of reading file: with clause handles opening and closing file, with exception being throwed propoerly
station_2_data = {}
if os.path.exists("station_2_data.pkl"):
station_2_data = pickle.load(open("station_2_data.pkl", "rb"))
else:
with open("mapping.txt") as f:
for line in f:
line = line.strip()
if line == '': continue
else:
key_value = line.split() # split on consequtive whitespaces
#print key_value[0]
station_2_data[key_value[0]] = deserialize(key_value[1])
# backing up the dictionary structure in a file
pickle.dump(station_2_data, open("station_2_data.pkl", "wb") )
print len(station_2_data.keys())
print station_2_data.keys()[0][1:-1]
print global_root.id
print len(station_2_data[station_2_data.keys()[0]])
29088 USC00085973 None 730
Yoav: Why are you keeping these commented out cells around?
# # assign ids to nodes, starting from the global_root, and put the id-> node instance mapping into a dictionary
# def assign_id(curNode):
# id_counter = 0
# id_2_node = {}
# # using level order traversal
# curLevel = [curNode]
# nextLevel = []
# while len(curLevel) != 0:
# for node in curLevel:
# node.id = "NODE_" + str(id_counter) # set the id field in the node!
# id_2_node[node.id] = node
# id_counter = id_counter + 1
# if node.leftChild != None:
# nextLevel.append(node.leftChild)
# if node.rightChild != None:
# nextLevel.append(node.rightChild)
# curLevel = nextLevel[:]
# nextLevel = []
# return id_2_node
# # now get the mapping from id to node in a dictionary
# id_2_node = assign_id(global_root)
#leaf_list = get_all_leaves(global_root)
print len(leaf_list)
print leaf_list[:10]
for i in range(0, len(leaf_list)):
node = leaf_list[i]
node.id = 'LEAF_' + str(i)
print leaf_list[10].id
1764 [<__main__.TreeNode instance at 0x7a97cb0>, <__main__.TreeNode instance at 0x7a2c5a8>, <__main__.TreeNode instance at 0x7a2c560>, <__main__.TreeNode instance at 0x7a97c20>, <__main__.TreeNode instance at 0x7a84dd0>, <__main__.TreeNode instance at 0x7a2c440>, <__main__.TreeNode instance at 0x7a049e0>, <__main__.TreeNode instance at 0x8a13ea8>, <__main__.TreeNode instance at 0x8a13248>, <__main__.TreeNode instance at 0x6464cf8>] LEAF_10
!heat -10 leaf_to_count
LEAF_10
leaf_station_count = {}
for leaf in leaf_list:
leaf_station_count[leaf.id] = len(leaf.stationList)
pickle.dump(leaf_station_count, open("leaf_station_count.pkl", "wb"))
%%writefile dict_wrapper_class.py
class Lookup:
# here we should overwrite the pickle for the tree structure
# this time, with dedicated node_id for the leaves
if not os.path.exists('partition_with_leaf_id.pkl'):
pickle.dump(global_root, open("partition_with_leaf_id.pkl", "wb"))
!ls -al partition*
-rw-rw-r-- 1 ubuntu ubuntu 28858529 May 25 23:08 partition_with_leaf_id.pkl -rw-rw-r-- 1 ubuntu ubuntu 2327396 May 25 19:46 partition_yue.pkl
!rm all_leaf_info.txt
# now writing all the station info (subtracted mean from every station in each node) to a file
# each line is a record for a station, not for a node, but the key is the node id, instead of station name
if os.path.exists('all_leaf_info.txt'):
print "file already there. no need to regenerate"
else:
with open("all_leaf_info.txt", "wb") as f:
count = 0
for child in leaf_list:
child_station_list = child.stationList
# create the 2d array to work on
list_of_list = []
for station in child_station_list:
station = "\"" + station + "\"" # station names stored in nodes do not have double quotes around them
if station in station_2_data:
li = station_2_data[station] # look-up the list from the dictionary
list_of_list.append(li)
else:
#print station
count += 1
if len(list_of_list) == 0:
#print child.id + " has no valid data"
continue
else:
value = np.array(list_of_list)
# calculate the mean value, and store it in both node and file
MEAN = np.mean(value, axis=0)
child.mean = MEAN
# subtract mean from every row in value matrix
for i in range(0, value.shape[0]):
value[i][:] = value[i][:] - MEAN
value_str = serialize(value[i][:])
f.write(child.id + " " + value_str + "\n")
print "cannot find the record for:" + str(count)
file already there. no need to regenerate
# create a subset of data for testing locally
!rm test_all_leaf_info.txt
!head -3000 all_leaf_info.txt > test_all_leaf_info.txt
# this is the reference implementation of commputing mean, and cov for a single node.
# # but now we need to make it work using map-reduce
# M=Dout.loc[:,1:365].transpose()
# M=M.dropna(axis=1)
# (columns,rows)=shape(M)
# Mean=mean(M, axis=1).values
# print (columns,rows), shape(Mean)
# C=np.zeros([columns,columns]) # Sum
# N=np.zeros([columns,columns]) # Counter of non-nan entries
# for i in range(rows):
# if i % 1000==0:
# print i
# row=M.iloc[:,i]-Mean;
# outer=np.outer(row,row)
# valid=isnan(outer)==False
# C[valid]=C[valid]+outer[valid] # update C with the valid location in outer
# N[valid]=N[valid]+1
# valid_outer=np.multiply(1-isnan(N),N>0)
# cov=np.divide(C,N)
%%writefile new_coding.py
import base64,pickle,zlib,sys
import pdb
# define the new helper function
def load(line):
(key,eVal)=line.split('\t')
Value=pickle.loads(zlib.decompress(base64.b64decode(eVal)))
return(key,Value)
def dump(key,Value,out=sys.stdout):
out.write("%s\t%s\n" % (key, base64.b64encode(zlib.compress(pickle.dumps(Value),9))))
def encode(Value):
return base64.b64encode(zlib.compress(pickle.dumps(Value),9))
def decode(line):
return pickle.loads(zlib.decompress(base64.b64decode(line)))
Overwriting new_coding.py
%%writefile compute_stats.py
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol, JSONProtocol, PickleProtocol, RawValueProtocol, RawProtocol
import pickle
import numpy as np
import os
class per_node_calc(MRJob):
INPUT_PROTOCOL = RawValueProtocol
INTERNAL_PROTOCOL = PickleProtocol
OUTPUT_PROTOCOL = JSONProtocol
# helper function
def deserialize(self, line):
arr_str = line.split(",")
arr_val = [None] * len(arr_str)
for i in range(0, len(arr_str)):
arr_val[i] = int(arr_str[i])
return arr_val
# helper function, for associating id to node and put them in a dict
def load_id(self, root):
id_2_node = {}
curLevel = [root]
nextLevel = []
while len(curLevel) != 0:
for node in curLevel:
if node.leftChild != None:
nextLevel.append(node.leftChild)
if node.rightChild != None:
nextLevel.append(node.rightChild)
if node.leftChild == None and node.rightChild == None:
id_2_node[node.id] = node
# prepare for next iteration
curLevel = nextLevel[:]
nextLevel = []
return id_2_node
# if we import:
# 1. the tree structure
# 2. the node.id -> node mapping
# we could use a multi-step function to perform svd, on each of the node we have
# tested
def mapper_sum(self, _, line):
line = line.strip()
if line != '':
arr = line.split()
try:
key = arr[0]
vec = np.multiply(self.deserialize(arr[1]), 0.01)
value = np.outer(vec, vec) # scaling the data so that we don't get overflow
yield key, value
except Exception:
self.increment_counter('group', 'mapper_exception', 1)
def combiner_sum(self, key, values):
local_sum = np.zeros([730,730])
# a memory efficient way of adding
for mat in values:
for i in range(0, mat.shape[0]):
for j in range(0, mat.shape[1]):
local_sum[i][j] += mat[i][j]
yield key, local_sum
def reducer_sum(self, key, values):
local_sum = np.zeros([730,730])
# a memory efficient way
for mat in values:
for i in range(0, mat.shape[0]):
for j in range(0, mat.shape[1]):
local_sum[i][j] += mat[i][j]
yield key, local_sum
# for each key, the values only have a list, which is a singleton
def reducer_svd(self, key, values):
leaf_to_count = pickle.load(open("leaf_station_count.pkl", "rb"))
total_sum = np.zeros([730, 730])
for mat in values:
total_sum += mat
if key in leaf_to_count:
count = leaf_to_count[key]
cov = total_sum / count
# now perform svd and assign the values to node
U, D, V = np.linalg.svd(cov)
# need to scale the D array
D_scale = np.multiply(D[:], 0.1)
try:
acc_sum = np.cumsum(D_scale[:])
acc = np.divide(acc_sum, sum(D_scale))
REQUIREMENT = 0.95
i = 0
for i in range(0, len(acc)):
if acc[i] >= REQUIREMENT: break
yield key, i
except Exception:
yield key, -1 # -1 is the error flag
else:
yield "no_such_id_", 1
def steps(self):
return [self.mr(mapper=self.mapper_sum,
combiner=self.combiner_sum,
reducer=self.reducer_sum),
self.mr(reducer=self.reducer_svd)]
# entry point of program
if __name__ == '__main__':
per_node_calc.run()
Overwriting compute_stats.py
# running the job
#job_flow_id = get_job_flow_id()
job_flow_id1= 'j-LTOJMJ14G840'
!rm this_is_a_test
#!python compute_stats.py -r emr --emr-job-flow-id $job_flow_id1 < test_all_leaf_info.txt > this_is_a_test #is submitting
!python compute_stats.py -r emr --emr-job-flow-id $job_flow_id1 \
--file ./leaf_station_count.pkl \
< all_leaf_info.txt > pca_result.txt #is submitting
# !python compute_stats.py -r local \
# --file ./leaf_station_count.pkl \
# < test_all_leaf_info.txt > this_is_a_test #is submitting
using configs in /home/ubuntu/.mrjob.conf creating tmp directory /tmp/compute_stats.ubuntu.20140526.222143.423830 reading from STDIN Copying non-input files into s3://yuf/scratch/compute_stats.ubuntu.20140526.222143.423830/files/ Adding our job to existing job flow j-LTOJMJ14G840 Job launched 31.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 62.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 93.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 124.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 155.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 186.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 218.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 249.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 280.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 311.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 342.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 374.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 405.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 436.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 467.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 498.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 529.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 561.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 592.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 623.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 654.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 685.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 717.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 748.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 779.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 810.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 841.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 873.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 904.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 935.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 966.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 997.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1028.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1059.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1090.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1122.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1153.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1184.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1215.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1247.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1278.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1309.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1340.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1371.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1403.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1434.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1465.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1496.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1527.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1558.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1589.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1620.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1651.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1682.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1714.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job launched 1745.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140526.222143.423830: Step 1 of 2) Job on job flow j-LTOJMJ14G840 failed with status WAITING: Waiting after step failed Logs are in s3://yoav.hadoop/log/j-LTOJMJ14G840/ ec2_key_pair_file not specified, going to S3 Scanning S3 logs for probable cause of failure Waiting 5.0s for S3 eventual consistency Attempting to terminate job... Traceback (most recent call last): File "compute_stats.py", line 118, in <module> per_node_calc.run() File "/usr/local/lib/python2.7/dist-packages/mrjob/job.py", line 494, in run mr_job.execute() File "/usr/local/lib/python2.7/dist-packages/mrjob/job.py", line 512, in execute super(MRJob, self).execute() File "/usr/local/lib/python2.7/dist-packages/mrjob/launch.py", line 147, in execute self.run_job() File "/usr/local/lib/python2.7/dist-packages/mrjob/launch.py", line 213, in run_job self.stdout.flush() File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 614, in __exit__ self.cleanup() File "/usr/local/lib/python2.7/dist-packages/mrjob/emr.py", line 1010, in cleanup super(EMRJobRunner, self).cleanup(mode=mode) File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 560, in cleanup self._cleanup_job() File "/usr/local/lib/python2.7/dist-packages/mrjob/emr.py", line 1084, in _cleanup_job self._opts['ec2_key_pair_file']) File "/usr/local/lib/python2.7/dist-packages/mrjob/ssh.py", line 200, in ssh_terminate_single_job ssh_bin, address, ec2_key_pair_file, ['hadoop', 'job', '-list'])) File "/usr/local/lib/python2.7/dist-packages/mrjob/ssh.py", line 82, in ssh_run p = Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE) File "/home/ubuntu/anaconda/lib/python2.7/subprocess.py", line 709, in __init__ errread, errwrite) File "/home/ubuntu/anaconda/lib/python2.7/subprocess.py", line 1326, in _execute_child raise child_exception TypeError: execv() arg 2 must contain only strings
Yoav: Did you try to figure out the problem from the stderr files?
# running the job
#job_flow_id = get_job_flow_id()
job_flow_id1= 'j-6T8VIKMY8RHX'
#!python compute_stats.py -r emr --emr-job-flow-id $job_flow_id1 < test_all_leaf_info.txt > this_is_a_test #is submitting
!python compute_stats.py -r emr --emr-job-flow-id $job_flow_id1 \
--file ./leaf_station_count.pkl \
< test_all_leaf_info.txt > pca_result.txt
# !python compute_stats.py -r local \
# --file ./leaf_station_count.pkl \
# < test_all_leaf_info.txt > this_is_a_test #is submitting
using configs in /home/ubuntu/.mrjob.conf creating tmp directory /tmp/compute_stats.ubuntu.20140527.005916.908620 reading from STDIN Copying non-input files into s3://yuf/scratch/compute_stats.ubuntu.20140527.005916.908620/files/ Adding our job to existing job flow j-6T8VIKMY8RHX Job launched 30.7s ago, status RUNNING: Running step Job launched 61.3s ago, status RUNNING: Running step Job launched 92.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 122.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>8aff5612-e53a-11e3-a94d-e755ea6b183b</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 173.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 203.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>bb715f18-e53a-11e3-a7f3-054eed488613</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 254.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 285.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 315.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 346.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 376.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 407.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 438.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 468.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>594fa616-e53b-11e3-b43a-872b56d6dfca</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 519.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>77831fb5-e53b-11e3-b332-21d671b574dc</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 570.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 600.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 631.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 661.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 692.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 723.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 753.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 784.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 814.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 845.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 876.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 907.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 937.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 968.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 999.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>95613190-e53c-11e3-9077-5114ea9dc6a3</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 1049.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>b389fc6a-e53c-11e3-aa87-172672d76e00</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 1100.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1130.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1161.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1192.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1222.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1253.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1283.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1314.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1345.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1375.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1406.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1437.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1467.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1498.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1529.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1559.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1590.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1621.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1651.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1682.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1713.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>3f091689-e53e-11e3-9fde-05f53a330f8a</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 1763.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>5d40af6c-e53e-11e3-9fde-05f53a330f8a</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 1814.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1845.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1875.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1906.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1936.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1967.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 1998.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2029.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2059.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2090.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2121.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2151.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2182.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2213.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2243.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>7b5b05d1-e53f-11e3-af71-7d8b78bb4770</RequestId> </ErrorResponse> Backing off for 20.0 seconds Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>874baa3d-e53f-11e3-af71-7d8b78bb4770</RequestId> </ErrorResponse> Backing off for 30.0 seconds Job launched 2324.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2355.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>bda8c01c-e53f-11e3-80e8-ff6cff06198c</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 2405.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>dbe0a5f0-e53f-11e3-bea8-dbf80bac8b69</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 2456.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2486.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2517.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>1e8ad7de-e540-11e3-bea8-dbf80bac8b69</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 2568.3s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2598.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2629.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2660.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2690.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2721.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>9815f3f1-e540-11e3-8362-0f887c06ae20</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 2772.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2802.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2833.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>dad0523e-e540-11e3-8362-0f887c06ae20</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 2884.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 2914.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>0b540f11-e541-11e3-8362-0f887c06ae20</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 2965.6s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>299592c3-e541-11e3-af71-7d8b78bb4770</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 3016.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3047.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3077.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3108.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3139.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3169.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>a3494601-e541-11e3-acfe-a31808eb12d5</RequestId> </ErrorResponse> Backing off for 20.0 seconds Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>af3a3816-e541-11e3-a7f3-054eed488613</RequestId> </ErrorResponse> Backing off for 30.0 seconds Job launched 3250.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>d360fb76-e541-11e3-a7f3-054eed488613</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 3301.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3331.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3362.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3393.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3423.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>3a8eb4dc-e542-11e3-9fde-05f53a330f8a</RequestId> </ErrorResponse> Backing off for 20.0 seconds Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>46806aa4-e542-11e3-9fde-05f53a330f8a</RequestId> </ErrorResponse> Backing off for 30.0 seconds Job launched 3504.2s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3534.9s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3565.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3596.1s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3626.8s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3657.5s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3688.0s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Got retriable error: EmrResponseError: 400 Bad Request <ErrorResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> <Error> <Type>Sender</Type> <Code>Throttling</Code> <Message>Rate exceeded</Message> </Error> <RequestId>d831ee6c-e542-11e3-b635-b92af58569e9</RequestId> </ErrorResponse> Backing off for 20.0 seconds Job launched 3738.7s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job launched 3769.4s ago, status RUNNING: Running step (compute_stats.ubuntu.20140527.005916.908620: Step 1 of 2) Job on job flow j-6T8VIKMY8RHX failed with status WAITING: Waiting after step failed Logs are in s3://yoav.hadoop/log/j-6T8VIKMY8RHX/ ec2_key_pair_file not specified, going to S3 Scanning S3 logs for probable cause of failure Waiting 5.0s for S3 eventual consistency Attempting to terminate job... Traceback (most recent call last): File "compute_stats.py", line 126, in <module> per_node_calc.run() File "/usr/local/lib/python2.7/dist-packages/mrjob/job.py", line 494, in run mr_job.execute() File "/usr/local/lib/python2.7/dist-packages/mrjob/job.py", line 512, in execute super(MRJob, self).execute() File "/usr/local/lib/python2.7/dist-packages/mrjob/launch.py", line 147, in execute self.run_job() File "/usr/local/lib/python2.7/dist-packages/mrjob/launch.py", line 213, in run_job self.stdout.flush() File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 614, in __exit__ self.cleanup() File "/usr/local/lib/python2.7/dist-packages/mrjob/emr.py", line 1010, in cleanup super(EMRJobRunner, self).cleanup(mode=mode) File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 560, in cleanup self._cleanup_job() File "/usr/local/lib/python2.7/dist-packages/mrjob/emr.py", line 1084, in _cleanup_job self._opts['ec2_key_pair_file']) File "/usr/local/lib/python2.7/dist-packages/mrjob/ssh.py", line 200, in ssh_terminate_single_job ssh_bin, address, ec2_key_pair_file, ['hadoop', 'job', '-list'])) File "/usr/local/lib/python2.7/dist-packages/mrjob/ssh.py", line 82, in ssh_run p = Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE) File "/home/ubuntu/anaconda/lib/python2.7/subprocess.py", line 709, in __init__ errread, errwrite) File "/home/ubuntu/anaconda/lib/python2.7/subprocess.py", line 1326, in _execute_child raise child_exception TypeError: execv() arg 2 must contain only strings
!ls -alh all_leaf_info*
!ls -alh leaf_station*
-rw-rw-r-- 1 ubuntu ubuntu 70M May 26 05:34 all_leaf_info.txt -rw-rw-r-- 1 ubuntu ubuntu 39K May 26 20:51 leaf_station_count.pkl
!head -100 pca_result.txt
"LEAF_0" 1
# function for determining merge or not
def should_merge(child1, child2, parent):
parent.stationList = child1.stationList + child2.stationList
# write the station data to file, just like mapping
parent_station_list = parent.stationList
with open("naive_merge.txt", "wb") as f:
for station in parent_station_list:
if station not in station_2_data: continue
else:
data = station_2_data[station]
data_serial = serialize(data)
f.write('dummy_const_key' + ',' + data_serial)
# perform pca on the file
#!python pca_analysis_merge.py -r local naive_merge.txt > naive_merge_result.txt
# read from the output, and parse the k
new_k = -1
with open('naive_merge_result.txt') as f:
for line in f:
line = line.strip()
if line == '': continue
arr = line.split(',')
assert(len(arr) == 2)
if arr[0] == 'new_k': new_k = arr[1]
# using all three nodes to return a boolean result
child1_metric = len(child1.stationList) * child1.k + (child1.k+1)*2*365
child2_metric = len(child2.stationList) * child2.k + (child2.k+1)*2*365
parent_metric = len(parent.stationList) * new_k + (new_k + 1)*2*365
if child1_metric + child2_metric > parent_metric: return False
else: return True
# naive merging on neighboring pairs
i = 0
while i < len(leaf_list):
if i + 1 < len(leaf_list):
this_node = leaf_list[i]
next_node = leaf_list[i+1]
if this_node.parent == next_node.parent:
if should_merge(this_node, next_node, this_node.parent) == True:
parent = this_node.parent
parent.leftChild = None
parent.rightChild = None
# move on to next pair
i += 2
else:
i += 1
else:
break
merged_leaf_list = []
find_all_leaves(root, merged_leaf_list)
draw_the_map(merged_leaf_list)
The main problem with this notebook is that it contains almost only code, it does not contain enough:
For the one map that does exist the results are clearly incorrect because the regions are very elongated.
Overall Grade 70