Blaze - Large collections of large CSV files

Large CSV files, or large collections of many CSV files are a common-yet-cumbersome data format. If the data in these files doesn't fit into memory we're usually forced to

  1. Ingest the data into a database
  2. Perform gymnastics while using Pandas

This notebook demonstrates that Blaze operates smoothly on such data. It then shows exactly how Blaze uses Pandas by effectively automating the gynmastics in step 2. We perform an out-of-core split-apply-combine operation on the NYC Taxicab dataset while using a comfortably small amount of space.

All computations in this notebook were done on a personal laptop with smallish memory using a recent version of blaze

conda install -c blaze blaze

or 

pip install blaze

Download Data

Note, this is a lot of data to download. It's also a lot of data to serve. You might consider grabbing this from a torrent instead.

In [1]:
# !wget https://nyctaxitrips.blob.core.windows.net/data/trip_data_{1,2,3,4,5,6,7,8,9,10,11,12}.csv.zip

Inspect with Blaze

We design the blaze.Data constructor to be easy to use. Here we give it a globstring of the files we want to analyze.

It gives us a quick head of the data immediately, even though there are several gigabytes of data. If you're unfamiliar with the data you may want to quickly peruse the columns and values.

In [2]:
from blaze import *

d = Data('trip_data_*.csv')
d
Out[2]:
medallion hack_license vendor_id rate_code store_and_fwd_flag pickup_datetime dropoff_datetime passenger_count trip_time_in_secs trip_distance pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude
0 89D227B655E5C82AECF13C3F540D4CF4 BA96DE419E711691B9445D6A6307C170 CMT 1 N 2013-01-01 15:11:48 2013-01-01 15:18:10 4 382 1.0 -73.978165 40.757977 -73.989838 40.751171
1 0BD7C8F5BA12B88E0B67BED28BEA73D8 9FD8F69F0804BDB5549F40E9DA1BE472 CMT 1 N 2013-01-06 00:18:35 2013-01-06 00:22:54 1 259 1.5 -74.006683 40.731781 -73.994499 40.750660
2 0BD7C8F5BA12B88E0B67BED28BEA73D8 9FD8F69F0804BDB5549F40E9DA1BE472 CMT 1 N 2013-01-05 18:49:41 2013-01-05 18:54:23 1 282 1.1 -74.004707 40.737770 -74.009834 40.726002
3 DFD2202EE08F7A8DC9A57B02ACB81FE2 51EE87E3205C985EF8431D850C786310 CMT 1 N 2013-01-07 23:54:15 2013-01-07 23:58:20 2 244 0.7 -73.974602 40.759945 -73.984734 40.759388
4 DFD2202EE08F7A8DC9A57B02ACB81FE2 51EE87E3205C985EF8431D850C786310 CMT 1 N 2013-01-07 23:25:03 2013-01-07 23:34:24 1 560 2.1 -73.976250 40.748528 -74.002586 40.747868
5 20D9ECB2CA0767CF7A01564DF2844A3E 598CCE5B9C1918568DEE71F43CF26CD2 CMT 1 N 2013-01-07 15:27:48 2013-01-07 15:38:37 1 648 1.7 -73.966743 40.764252 -73.983322 40.743763
6 496644932DF3932605C22C7926FF0FE0 513189AD756FF14FE670D10B92FAF04C CMT 1 N 2013-01-08 11:01:15 2013-01-08 11:08:14 1 418 0.8 -73.995804 40.743977 -74.007416 40.744343
7 0B57B9633A2FECD3D3B1944AFC7471CF CCD4367B417ED6634D986F573A552A62 CMT 1 N 2013-01-07 12:39:18 2013-01-07 13:10:56 3 1898 10.7 -73.989937 40.756775 -73.865250 40.770630
8 2C0E91FF20A856C891483ED63589F982 1DA2F6543A62B8ED934771661A9D2FA0 CMT 1 N 2013-01-07 18:15:47 2013-01-07 18:20:47 1 299 0.8 -73.980072 40.743137 -73.982712 40.735336
9 2D4B95E2FA7B2E85118EC5CA4570FA58 CD2F522EEE1FF5F5A8D8B679E23576B3 CMT 1 N 2013-01-07 15:33:28 2013-01-07 15:49:26 2 957 2.5 -73.977936 40.786983 -73.952919 40.806370
10 E12F6AF991172EAC3553144A0AF75A19 06918214E951FA0003D1CC54955C2AB0 CMT 1 N 2013-01-08 13:11:52 2013-01-08 13:19:50 1 477 1.3 -73.982452 40.773167 -73.964134 40.773815

Our Model Computation

We now compute the average distance and number of rides grouped by the number of passengers riding in the cab.

Looks like single passenger trips are the most common while three passenger trips are surprisingly long distance.

In [3]:
by(d.passenger_count, avg_distance=d.trip_distance.mean(), 
                             count=d.passenger_count.count())
Out[3]:
passenger_count avg_distance count
0 0 0.833625 5035
1 1 9.033823 121959711
2 2 6.992290 23517494
3 3 14.089989 7315829
4 4 5.286269 3582103
5 5 2.995215 10034696
6 6 2.956734 6764789
7 7 2.214286 35
8 8 3.360400 25
9 9 2.226538 26
10 129 0.920000 1

It's useful to note here all the things we didn't do.

  1. We didn't worry about running out of memory
  2. We didn't worry about calling pandas.read_csv with the right arguments to make this fast
  3. We didn't worry about handling each CSV file separately

Blaze handles these things for us. It drives Pandas intelligently, breaks up our computation into pieces it can perform in memory, and then shoves data through Pandas as fast as it can.

How fast is fast?

How long did it take to process 16 GB and do an out-of-core split-apply-combine operation?

In [4]:
expr = by(d.passenger_count, avg_distance=d.trip_distance.mean(), 
                                    count=d.passenger_count.count())

%time _ = compute(expr)
CPU times: user 2min 13s, sys: 13.6 s, total: 2min 26s
Wall time: 3min 7s

Not great, but not terrible. This is about as fast as Postgres does it after the data has been loaded into Postgres.

How much memory did we need?

Not very much, A few hundred megabytes by default.

Anecdotally I used to switch to a big machine when I ran out of memory. Now I switch to my big machine when I run out of disk space.

Multiprocessing

Blaze can use many cores to accelerate this work. It still uses pandas in each core but it now just splits apart the computation intelligently and directs different CSV files to different cores.

This gives a significant speedup even on my laptop. On a large workstation with more cores this speedup is more pronounced.

In [5]:
import multiprocessing
pool = multiprocessing.Pool(4)
In [6]:
%time _ = compute(expr, map=pool.map)
CPU times: user 133 ms, sys: 28.8 ms, total: 161 ms
Wall time: 1min 1s

What does Blaze do internally?

Lets say we didn't want to use Blaze but preferred to just use Pandas and some elbow grease.

How can we use Pandas in-memory to handle an out-of-memory dataset?

Simple Example

As an easier example, lets compute the mean of a single column on a single CSV file. We'll use Pandas' chunked CSV reader.

To compute an out-of-core mean we'll compute a running total and running count for each chunk

In [7]:
totals = []
counts = []

for chunk in pd.read_csv('trip_data_1.csv', chunksize=1000000, usecols=['passenger_count']):
    totals.append(chunk.passenger_count.sum())
    counts.append(chunk.passenger_count.count())

Then we perform a second computation on these intermediate results

In [8]:
1.0 * sum(totals) / sum(counts)
Out[8]:
1.6973720977368634

This is exactly what Blaze does when we type the following.

In [9]:
Data('trip_data_1.csv').passenger_count.mean()
Out[9]:
1.6973720977368634

We break computations into pieces

So to perform one computation, mean, on an out of core dataset, we end up performing two different sets of computations

  1. sum and count on each in-memory chunk
  2. sum and division on the aggregated results from step #1

This breakdown works on a surprisingly large class of operations. Split-apply-combine operations are handled similarly. We perform a different split-apply-combine operation on each chunk and then another on the aggregated results.

For more information on this see Blaze's out-of-core docs.

When doesn't this work?

This doesn't work on cases like sort or join nor on any computation for which the intermediate results don't fit in memory. Of course, you can still sort or join computations, just so long as some data reducing step comes first.

Complex Example

OK, so lets go through and solve the entire out-of-core split-apply-combine problem on all of the CSV files.

Feel free to ignore this example. It's mostly here to show explicitly exactly what Blaze does for those who care and to generally impress those who don't.

Hold on to your butts.

In [10]:
%%time
from glob import glob

# Specifying active columns at parse time greatly improves performance
active_columns = ['passenger_count', 'trip_distance']
intermediates = []

# Do a split-apply-combine operation on each chunk of each CSV file
for fn in sorted(glob('trip_data_*.csv')):
    for df in pd.read_csv(fn, usecols=active_columns, 
                          chunksize=1000000, skipinitialspace=True):
        chunk = df.groupby('passenger_count').agg({'passenger_count': ['count'],
                                                   'trip_distance': ['sum', 'count']})
        intermediates.append(chunk)

# Bring those results together.  These are much smaller and so likely fit in memory
df = pd.concat(intermediates, axis=0)
df.columns = ['trip_distance_sum', 'trip_distance_count', 'passenger_count_count']  # Flatten multi-index

# Perform second split-apply-combine operation on those intermediate results
groups = df.groupby(df.index)  # group once for many of the following applies
df2 = pd.concat([groups.trip_distance_sum.sum(),
                 groups.trip_distance_count.sum(),
                 groups.passenger_count_count.sum()],
                axis=1)

df2['avg_distance'] = df2.trip_distance_sum / df2.trip_distance_count
df2['count'] = df2.passenger_count_count

# Select out the columns we want
result = df2[['avg_distance', 'count']]
result
CPU times: user 2min, sys: 8.74 s, total: 2min 9s
Wall time: 2min 49s

Conclusion

Blaze is a general library to bring expert data analysis into the hands of everyday users

The example above is a PITA to do by hand. More than that it has a number of tricks not known to many Pandas users.

Fortunately Blaze automates these tricks, making them routine for a broad class of problems. Moreso it does this from a relatively naive user-focused syntax.

d = Data('trip_data_*.csv')
by(d.passenger_count, avg_distance=d.trip_distance.mean(), 
                             count=d.passenger_count.count())

Hopefully this example helps to explain how Blaze chunks apart computations on large CSV files to operate in memory.

This also highlights the relationship between Blaze and Pandas. Pandas is Blaze's preferred library when it performs in-memory analytics on tabular data. In these cases it's Blaze's job to arrange data well and call Pandas with the right arguments while it's Pandas' job to actually do the computation.

As a reminder, large CSV files are just one application of Blaze. Blaze provides a similar experience and set-of-tricks for SQL, Spark, and Binary storage files.

You can learn more about Blaze at http://blaze.pydata.org/