import os,sys,re,pickle, sklearn, numpy, re, base64, zlib
import pandas as pd
from scipy import spatial
home_dir='/home/ubuntu/UCSD_BigData'
sys.path.append(home_dir+'/utils')
from find_waiting_flow import *
from AWS_keypair_management import *
data_dir=home_dir+'/data/weather'
!ls $data_dir
def isNeighbour(a, b):
if(a.rx < b.lx or a.lx > b.rx or a.ry < b.ly or a.ly > b.ry):
return False
return True
def loads(eVal):
""" Decode a string into a value """
return pickle.loads(zlib.decompress(base64.b64decode(eVal)))
def load_stats(loc):
stn_data, l = {}, 0
f = open(loc)
for line in f:
l = l+1
stn, data_enc = line.split()
stn_data[stn[1:-1]] = loads(data_enc[1:-1])
return stn_data
def find_median (X):
#computing cdf
X.sort()
X_sum = sum([x[1] for x in X])
cdf = np.cumsum([x[1]*1.0/X_sum for x in X])
median_idx = np.searchsorted(cdf, 0.5)
median = X[median_idx][0]
return median
def build_kdtree(data, cur_div = 0, leaf_size=1000, cur_id = ''):
#cur_div = 0 for splitting by latitude, 1 for splitting by longitude
(lats, lons, count) = data
#print len(count), leaf_size
tree_id = np.array(['']*len(count), dtype=object)
if(sum(count) <= leaf_size):
return (tree_id, {})
median = find_median(zip(data[cur_div], count))
#print median
left_ids = (data[cur_div]<median)
right_ids = (data[cur_div]>=median)
lats_left, lons_left, count_left = lats[left_ids], lons[left_ids] , count[left_ids]
lats_right, lons_right, count_right = lats[right_ids], lons[right_ids], count[right_ids]
(left_tree_id, left_dict_median) = build_kdtree((lats_left, lons_left, count_left), cur_div^1,\
leaf_size, cur_id= cur_id+'0')
(right_tree_id, right_dict_median) = build_kdtree((lats_right, lons_right, count_right), cur_div^1,\
leaf_size, cur_id= cur_id+'1')
tree_id[left_ids] = '0' + left_tree_id
tree_id[right_ids] = '1' + right_tree_id
dict_median = dict(left_dict_median.items() + right_dict_median.items())
dict_median[cur_id] = median
return (tree_id, dict_median)
def plotmap(lon, lat, val, cbar = False, cmap= cm.autumn_r, alpha = 1, title = ''):
from mpl_toolkits.basemap import Basemap
plt.figure(figsize=(15,10),dpi=300)
m = Basemap(projection='merc',llcrnrlat=-80,urcrnrlat=80,\
llcrnrlon=-180,urcrnrlon=180,lat_ts=20,resolution='i')
m.drawcoastlines()
m.fillcontinents(color='White',lake_color='LightBlue')
# draw parallels and meridians.
parallels = np.arange(-80,81,10.)
# labels = [left,right,top,bottom]
m.drawparallels(parallels,labels=[False,True,True,False])
meridians = np.arange(10.,351.,20.)
m.drawmeridians(meridians,labels=[True,False,False,True])
#m.drawparallels(np.arange(-90.,91.,30.))
#m.drawmeridians(np.arange(-180.,181.,60.))
m.drawmapboundary(fill_color='LightBlue')
# draw map with markers for locations
x, y = m(lon,lat)
cs = m.scatter(x,y,c=val,cmap = cmap, alpha = alpha, lw=0,zorder=2,s=6)
if(cbar): m.colorbar(cs, location='right', pad = "12%")
plt.title(title)
plt.show()
class Rectangle:
""" Point class represents and manipulates rectangle coords. """
def __init__(self, lx, ly, rx, ry):
""" Create a new rectangle """
self.lx = lx #bottomleft x-coordinate
self.ly = ly
self.rx = rx
self.ry = ry #topright y-coordinate
def __repr__(self):
return 'lx = '+ str(self.lx) + ', ly = '+ str(self.ly) + ', rx = '+ str(self.rx) + ', ry = '+ str(self.ry)
group_coord = {}
for g in set(kd_split):
c = Rectangle(-180, -90, 180, 90)
cur_div = 0
for i in range(0, len(g)):
if(cur_div==0 and g[i]=='0') : c.ry = dict_median[g[0:i]]
elif(cur_div==0 and g[i]=='1'): c.ly = dict_median[g[0:i]]
elif(cur_div==1 and g[i]=='0'): c.rx = dict_median[g[0:i]]
elif(cur_div==1 and g[i]=='1'): c.lx = dict_median[g[0:i]]
cur_div = cur_div ^ 1
group_coord[g] = c
def leaves(tree):
if hasattr(tree, 'idx'):#tree is a leafnode
return [tree.idx]
else:
return leaves(tree.less) + leaves(tree.greater)
Partitioning has been done using two methods -
For each station, count of the number of years for which TMIN and TMAX is available for atleast 180 days is found.
stats = load_stats('StationStatistics')
stations=pickle.load(open('stations.pkl', 'rb'))
num_stations = stations.shape[0]
names = np.array(stations.index)
lons = np.array(stations.ix[:,'longitude'])
lats = np.array(stations.ix[:,'latitude'])
stations['count'] = 0
stations['valid_years'] = ''
for stn in names:
cur = stats[stn] if stats.has_key(stn) else {}
cnt = {}
cnt['TMIN'], cnt['TMAX'] = 0,0
year_set = set([])
for (meas,yr) in cur.keys():
if meas in ['TMIN', 'TMAX']:
if(cur[(meas,yr)] > 180):
cnt[meas] = cnt[meas]+1
if(meas=='TMIN' or meas=='TMAX'): year_set.add(yr)
count = 0
valid_years = []
for yr in year_set:
mx_cnt = cur[('TMAX',yr)] if cur.has_key(('TMAX',yr)) else 0
mn_cnt = cur[('TMIN',yr)] if cur.has_key(('TMIN',yr)) else 0
if(mx_cnt>=180 and mn_cnt>=180):
count = count + 1
valid_years.append(yr)
vy = ','.join([str(x) for x in valid_years])
stations['count'][stn] = count
stations['valid_years'][stn] = vy
fig=plt.figure(1,figsize=[4,3],dpi=300)
hist_arr = np.array(stations['count'])
hist(hist_arr, bins=50);
plt.xlabel('number of valid measurements')
plt.ylabel('number of stations')
title('Histogram for valid count' )
<matplotlib.text.Text at 0x6b74ad90>
stations['group_id'] = 0
counts = np.array(stations['count'])
kd_split, dict_median = build_kdtree((lats,lons, counts), leaf_size=2000)
stations['group_id'] = kd_split
graph_id = np.zeros(num_stations, dtype=int)
group2id = {}
groups = list(set(kd_split))
num_groups = len(groups)
cur_id = 1
for i in groups:
graph_id[kd_split==i] = cur_id
group2id[i] = cur_id
cur_id = cur_id + 1
plotmap(lons, lats, graph_id, cmap= cm.hsv, title = '')
Code below finds neighboring partitions for each partition. This is to be used during clustering.
group_coord = {}
for g in set(kd_split):
c = Rectangle(-180, -90, 180, 90)
cur_div = 0
for i in range(0, len(g)):
if(cur_div==0 and g[i]=='0') : c.ry = dict_median[g[0:i]]
elif(cur_div==0 and g[i]=='1'): c.ly = dict_median[g[0:i]]
elif(cur_div==1 and g[i]=='0'): c.rx = dict_median[g[0:i]]
elif(cur_div==1 and g[i]=='1'): c.lx = dict_median[g[0:i]]
cur_div = cur_div ^ 1
group_coord[g] = c
group_neighbours = dict([(i+1, []) for i in range(len(groups))])
for i in range(len(groups)):
g1 = groups[i]
for g2 in groups[i+1:]:
if (isNeighbour(group_coord[g1], group_coord[g2])):
group_neighbours[group2id[g1]].append(group2id[g2])
group_neighbours[group2id[g2]].append(group2id[g1])
f = open('partition_tree.pkl','wb')
pickle.dump((stations, group_neighbours, group2id),f)
f.close()
data = numpy.zeros((num_stations,2))
data[:,0] = lats
data[:,1] = lons
kdt = spatial.KDTree(data, leafsize=1000)
num_groups2 = len(leaves(kdt.tree))
permute = np.arange(num_groups2) #so that nearby groups will have distinct colors
np.random.shuffle(permute)
group_id_2 = np.zeros(num_stations, dtype=int)
c = 0
for i in leaves(kdt.tree):
for j in i: group_id_2[j] = permute[c]
c = c + 1
plotmap(lons, lats, group_id_2, cmap= cm.hsv, title = '')
Code below processes weather.csv to separate information for different partitions.
%%writefile Get_Partitions.py
#!/usr/bin/python
"""
collect the statistics for each station.
"""
import re,pickle,base64,zlib
from sys import stderr
import sys
import numpy as np
sys.path.append('/usr/lib/python2.6/dist-packages') # a hack because anaconda made mrjob unreachable
from mrjob.job import MRJob
from mrjob.protocol import *
import traceback
from functools import wraps
from sys import stderr
"""this decorator is intended for decorating a function, not a
generator. Therefore to use it in the context of mrjob, the generator
should call a function that handles a single input records, and that
function should be decorated.
The reason is that if a generator throws an exception it exits and
cannot process any more records.
"""
def ECatch(func):
f_name=func.__name__
#@wraps(func) cd
def inner(self,*args,**kwargs):
try:
self.increment_counter(self.__class__.__name__,'total in '+f_name,1)
return func(self,*args,**kwargs)
except Exception as e:
self.increment_counter(self.__class__.__name__,'errors in '+f_name,1)
stderr.write('Error:')
stderr.write(str(e))
traceback.print_exc(file=stderr)
stderr.write('Arguments were %s, %s\n'%(args,kwargs))
pass
return inner
"""
Functions for encoding and decoding arbitrary object into ascii
so that they can be passed through the hadoop streaming interface.
"""
def loads(eVal):
""" Decode a string into a value """
return pickle.loads(zlib.decompress(base64.b64decode(eVal)))
def dumps(Value):
""" Encode a value as a string """
return base64.b64encode(zlib.compress(pickle.dumps(Value),9))
meas_required = ['TMIN', 'TMAX', 'PRCP', 'SNWD', 'SNOW'] #core readings
min_year = 1900 #don't consider readings before this year
class Get_Partitions(MRJob):
def configure_options(self):
super(Get_Partitions,self).configure_options()
self.add_file_option('--stations')
def reducer_init(self):
f = open(self.options.stations, "rb" )
self.stations = pickle.load(f)
f.close()
@ECatch
def map_one(self,line):
return line.split(',')
def mapper(self, _, line):
self.increment_counter('MrJob Counters','mapper',1)
elements=self.map_one(line)
if not (elements[0] == 'station'):
stderr.write(elements[0])
print elements[0]
yield(elements[0],elements[1:])
def check_integrity(self,station,meas,year,length):
if year<1000 or year > 2014: return False
if meas not in meas_required: return False
if length != 367: return False
return True
@ECatch
def reduce_one(self,station,S,vector):
meas=vector[0]
year=int(vector[1])
length=len(vector)
assert self.check_integrity(station,meas,year,length)==True
if (meas in meas_required and year>= min_year):
S[(meas,year)]=vector[2:]
def reducer(self, station, vectors):
yield(None,1)
print station
S={}
vyears = self.stations['valid_years'][station]
if not vyears:
pass
else:
vy = [int(n) for n in vyears.split(',')]
stderr.write(str(vy))
D = np.zeros((len(vy),730))
for vector in vectors:
meas=vector[0]
year=int(vector[1])
if (year in vy) and (meas in ['TMIN', 'TMAX']):
self.reduce_one(station,S,vector)
for i in range(len(vy)):
temp, valid, tmin, tmax = S[(u'TMIN',vy[i])], [], [], []
for each in temp:
if each:
valid.append(int(each))
mean = sum(valid)/len(valid)
for each in temp:
if each:
tmin.append(int(each))
else:
tmin.append(mean)
temp, valid = S[(u'TMAX',vy[i])], []
for each in temp:
if each:
valid.append(int(each))
mean = sum(valid)/len(valid)
for each in temp:
if each:
tmax.append(int(each))
else:
tmax.append(mean)
D[i,0:365] = tmin
D[i,365:730] = tmax
#np.savetxt(str(station)+'.csv', D, delimiter=",")
yield(station,dumps(D))
if __name__ == '__main__':
Get_Partitions.run()
Writing Get_Partitions.py
!python Get_Partitions.py --stations stationsall.pkl -r emr --emr-job-flow-id j-31UKS93V80CN7 hdfs:/weather/weather.csv > out