from itertools import izip import pandas as pd import numpy as np import tables as tb print 'PyTables version:', tb.__version__ print 'Pandas version:', pd.__version__ n_particles = 15 time_chunk_size = 2**18 n_iter = 20 time_size = n_iter*time_chunk_size print 'time size: %.1f 10^6' % (time_size*1e-6) print 'emission size: %.1f MB' % (4*n_particles*time_size/1024./1024.) def generate_emission(shape): """Generate fake emission.""" emission = np.random.randn(*shape).astype('float32') - 1 emission.clip(0, 1e6, out=emission) assert (emission >=0).all() return emission def compute_counts(emission): """Fake counts computation""" return np.trunc(emission*1.5).astype('i1') %%timeit -r1 -n1 # generate simulated data test = [] for i in range(n_iter): emission_chunk = generate_emission((time_chunk_size, n_particles)) test.append(emission_chunk) #data_file.close() data_file = tb.open_file('emission_pytables.hdf', mode = "w") %%timeit -r1 -n1 # 1) create a new emission file, compressing as we go comp_filter = tb.Filters(complib='blosc', complevel=5) emission = data_file.create_earray('/', 'emission', atom=tb.Float32Atom(), shape = (n_particles, 0), chunkshape = (n_particles, time_chunk_size), expectedrows = n_iter*time_chunk_size, filters = comp_filter) # generate simulated emission data for i in range(n_iter): emission_chunk = generate_emission((n_particles, time_chunk_size)) emission.append(emission_chunk) emission.flush() print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.) emission = data_file.root.emission print 'Compression ratio: %.1f x' % ((1.*emission.size_in_memory/emission.size_on_disk)) %%timeit -r1 -n1 # 1) create a new emission file, compressing as we go emission = pd.HDFStore('emission_pandas.hdf', mode='w', complib='blosc', complevel=5) # generate simulated data for i in range(n_iter): # Generate fake emission emission_chunk = np.random.randn(time_chunk_size, n_particles) - 1 emission_chunk.clip(0, 1e6, out=emission_chunk) assert (emission_chunk >=0).all() df = pd.DataFrame(emission_chunk, dtype='float32') # create a globally unique index (time) # http://stackoverflow.com/questions/16997048/how-does-one-append-large- # amounts-of-data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397 try: nrows = emission.get_storer('df').nrows except: nrows = 0 df.index = pd.Series(df.index) + nrows emission.append('df', df) emission.close() #emission.close() counts_ram = compute_counts(emission.read()) %%timeit -r1 -n1 counts_ram = compute_counts(emission.read()) fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9] scale = 10 max_counts = 4 t_list = [[] for i in xrange(n_particles)] timestamps_ram = [] %%timeit -r1 -n1 for p_i, counts_p_i in enumerate(counts_ram.copy()): t_list[p_i].append(counts_p_i.nonzero()[0]*scale) for frac, v in izip(fractions, range(2, max_counts + 1)): counts_p_i -= 1 np.clip(counts_p_i, 0, 127, out=counts_p_i) t_list[p_i].append(counts_p_i.nonzero()[0]*scale + frac) for t in t_list: timestamps_ram.append(np.hstack(t)) [t.sort(kind='mergesort') for t in timestamps_ram] par_index = [p_i*np.ones(t.size) for p_i, t in enumerate(timestamps_ram)] timestamps_all_ram = np.hstack(timestamps_ram) par_index_all_ram = np.hstack(par_index) index_sort = timestamps_all_ram.argsort(kind='mergesort') timestamps_all_ram = timestamps_all_ram[index_sort] par_index_all_ram = par_index_all_ram[index_sort] print [t.shape for t in timestamps_ram] for p_i in xrange(n_particles): print (timestamps_ram[p_i] == timestamps_all_ram[par_index_all_ram == p_i]).all(), idx_ram = [t/scale for t in timestamps_ram] idx_ram print [(np.clip(c, 0, max_counts).sum() == t.size) for c, t in zip(counts_ram, timestamps_ram)] [c[i] for c, i in zip(counts_ram, idx_ram)] counts_names = ["counts_p%d" % i for i in xrange(n_particles)] #for i_p in xrange(n_particles): data_file.remove_node('/c', counts_names[i_p]) data_file.close() data_file = tb.open_file('emission_pytables.hdf', mode="a") emission = data_file.root.emission print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.) # Create the tables for the counts comp_filter = tb.Filters(complib='blosc', complevel=5) table_uint8 = np.dtype([('counts', 'u1')]) counts_p = [] for i_p in xrange(n_particles): # 2) create counts table counts_p.append(data_file.create_table('/c', counts_names[i_p], createparents=True, description=table_uint8, chunkshape = time_chunk_size, expectedrows = n_iter*time_chunk_size, filters = comp_filter) ) counts_p[0] %%timeit -n1 -r1 # Extract the counts from the emission for i_chunk in xrange(n_iter): emission_chunk = emission[:, i_chunk*time_chunk_size:i_chunk*time_chunk_size+time_chunk_size] #print emission_chunk.shape for p_i in xrange(n_particles): counts_chunk_p_i = compute_counts(emission_chunk[p_i]) counts_p[p_i].append(counts_chunk_p_i) data_file.flush() print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.) counts_p[0] counts_tb = np.vstack([c.read() for c in counts_p]).astype('i1') counts_tb.sum(1) counts_ram.sum(1) (counts_tb == counts_ram).all() # Compute timestamps from the counts fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9] scale = 10 max_counts = 4 timestamps_tb = [] %%timeit -n1 -r1 for counts_p_i in counts_p: PH = [counts_p_i.get_where_list('counts >= 1')] PH[0] *= scale for frac, v in izip(fractions, range(2, max_counts + 1)): PH.append(counts_p_i.get_where_list('counts >= %d' % v)*scale) PH[-1] += frac t = np.hstack(PH).astype(np.int64) t.sort(kind='mergesort') timestamps_tb.append(t) print [(t1.size == t2.size) for t1, t2 in zip(timestamps_tb, timestamps_ram)] print [(t1 == t2).all() for t1, t2 in zip(timestamps_tb, timestamps_ram)] fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9] scale = 10 max_counts = 4 times_list = [[] for i in xrange(n_particles)] %%timeit -r1 -n1 # Load emission in chunks, and save only the final timestamps for i_chunk in xrange(n_iter): i_start = i_chunk*time_chunk_size emission_chunk = emission[:, i_start:i_start + time_chunk_size] counts_chunk = compute_counts(emission_chunk) index = np.arange(0, counts_chunk.shape[1]) # Loop for each particle to compute timestamps for p_i, counts_chunk_p_i in enumerate(counts_chunk.copy()): times_c_i = [(index[counts_chunk_p_i >= 1] + i_start)*scale] for frac, v in izip(fractions, range(2, max_counts + 1)): times_c_i.append((index[counts_chunk_p_i >= v] + i_start)*scale + frac) t = np.hstack(times_c_i) t.sort(kind='mergesort') times_list[p_i].append(t) timestamps_tb2 = [np.hstack(t) for t in times_list] print [(t1.size == t2.size) for t1, t2 in zip(timestamps_tb2, timestamps_ram)] print [(t1 != t2).sum() for t1, t2 in zip(timestamps_tb2, timestamps_ram)] fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9] scale = 10 max_counts = 4 timestamps_list_all_tb = [] par_list_all_tb = [] %%timeit -r1 -n1 # Load emission in chunks, and save only the final timestamps for i_chunk in xrange(n_iter): i_start = i_chunk*time_chunk_size emission_chunk = emission[:, i_start:i_start + time_chunk_size] counts_chunk = compute_counts(emission_chunk) index = np.arange(0, counts_chunk.shape[1]) # Loop for each particle to compute timestamps times_chunk = [[] for i in xrange(n_particles)] par_index_chunk = [[] for i in xrange(n_particles)] for p_i, counts_chunk_p_i in enumerate(counts_chunk.copy()): times_c_i = [(index[counts_chunk_p_i >= 1] + i_start)*scale] for frac, v in izip(fractions, range(2, max_counts + 1)): times_c_i.append((index[counts_chunk_p_i >= v] + i_start)*scale + frac) # Stack the arrays of different "counts" t = np.hstack(times_c_i) times_chunk[p_i] = t par_index_chunk[p_i] = p_i*np.ones(t.size, dtype='u1') # Merge the arrays of different particles times_all = np.hstack(times_chunk) par_index_all = np.hstack(par_index_chunk) # Sort timestamps inside the merged chunk index_sort = times_all.argsort(kind='mergesort') times_all = times_all[index_sort] par_index_all = par_index_all[index_sort] # Save timestamps and particles timestamps_list_all_tb.append(times_all) par_list_all_tb.append(par_index_all) # Make union of the chunks timestamps_all_tb = np.hstack(timestamps_list_all_tb) par_all_tb = np.hstack(par_list_all_tb) timestamps_all_tb.shape, timestamps_all_ram.shape print (timestamps_all_tb == timestamps_all_ram).all() print (par_all_tb == par_index_all_ram).all() #data_file.remove_node('/ts', 'timestamps_all') #data_file.remove_node('/ts', 'par_all') data_file.close() data_file = tb.open_file('emission_pytables.hdf', mode="a") emission = data_file.root.emission print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.) # Create the table for the timestamps_all comp_filter = tb.Filters(complib='blosc', complevel=5) table_int64 = np.dtype([('times', 'i8')]) table_uint8 = np.dtype([('particle', 'i1')]) timestamps_all_table = data_file.create_table('/ts', 'timestamps_all', createparents=True, description=table_int64, filters = comp_filter) par_all_table = data_file.create_table('/ts', 'par_all', createparents=True, description=table_uint8, filters = comp_filter) fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9] scale = 10 max_counts = 4 %%timeit -r1 -n1 # Load emission in chunks, and save only the final timestamps for i_chunk in xrange(n_iter): i_start = i_chunk*time_chunk_size emission_chunk = emission[:, i_start:i_start + time_chunk_size] counts_chunk = compute_counts(emission_chunk) index = np.arange(0, counts_chunk.shape[1]) # Loop for each particle to compute timestamps times_chunk = [[] for i in xrange(n_particles)] par_index_chunk = [[] for i in xrange(n_particles)] for p_i, counts_chunk_p_i in enumerate(counts_chunk.copy()): times_c_i = [(index[counts_chunk_p_i >= 1] + i_start)*scale] for frac, v in izip(fractions, range(2, max_counts + 1)): times_c_i.append((index[counts_chunk_p_i >= v] + i_start)*scale + frac) # Stack the arrays of different "counts" t = np.hstack(times_c_i) times_chunk[p_i] = t par_index_chunk[p_i] = p_i*np.ones(t.size, dtype='u1') # Merge the arrays of different particles times_all = np.hstack(times_chunk) par_index_all = np.hstack(par_index_chunk) # Sort timestamps inside the merged chunk index_sort = times_all.argsort(kind='mergesort') times_all = times_all[index_sort] par_index_all = par_index_all[index_sort] # Save timestamps and particles timestamps_all_table.append(times_all) par_all_table.append(par_index_all) # Make union of the chunks timestamps_all_tb3 = timestamps_all_table.read().astype('int64') par_all_tb3 = par_all_table.read().astype('uint8') print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.) timestamps_all_tb3.shape, timestamps_all_ram.shape print (timestamps_all_tb3 == timestamps_all_ram).all() print (par_all_tb3 == par_index_all_ram).all() # generate simulated data for i in range(10): # 2) create counts cs = pd.HDFStore('counts.hdf', mode='w', complib='blosc') # this is an iterator, can be any size for df in pd.read_hdf('emission_pandas.hdf', 'df',chunksize=200): counts = pd.DataFrame(np.random.poisson(lam=df).astype(np.uint8)) # set the index as the same counts.index = df.index # store the sum across all particles (as most are zero this will be a # nice sub-selector # better maybe to have multiple of these sums that divide the particle space # you don't have to do this but prob more efficient # you can do this in another file if you want/need counts['particles_0_4'] = counts.iloc[:,0:4].sum(1).astype('u1') counts['particles_5_9'] = counts.iloc[:,5:9].sum(1).astype('u1') # make the non_zero column indexable cs.append('df', counts,data_columns=['particles_0_4','particles_5_9']) cs.close() # 3) find interesting counts print pd.read_hdf('counts.hdf','df',where='particles_0_4>0') print pd.read_hdf('counts.hdf','df',where='particles_5_9>0')