import pandas as pd
import numpy as np
import dask.dataframe as dd
import matplotlib.pyplot as plt
import seaborn as sns
Script to download the data: download.py
# %load download.py
"""
Download taxi data from S3 to local
"""
from pathlib import Path
import sys
import argparse
import s3fs
from distributed import Client, wait
def parse_args(args=None):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-s', '--scheduler', default=None,
help='Scheduler address')
return parser.parse_args(args)
def fetch(key):
fs = s3fs.S3FileSystem(anon=True)
dest = Path('data').joinpath(Path(key).name)
dest.parent.mkdir(exist_ok=True)
fs.get(key, str(dest))
return key
def main(args=None):
args = parse_args(args)
client = Client(args.scheduler)
keys = [
f'nyc-tlc/trip data/yellow_tripdata_2009-{m:0>2}.csv'
for m in range(1, 13)
]
results = client.map(fetch, keys)
wait(results)
if __name__ == '__main__':
sys.exit(main())
pd.options.display.max_rows = 10
%matplotlib inline
ls -lh data/*.csv
Load the first DataFrame
into memory.
%%time
dtype = {
'vendor_name': 'category',
'Payment_Type': 'category',
}
df = pd.read_csv("data/yellow_tripdata_2009-01.csv", dtype=dtype,
parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)
df.head()
Let's predict whether or not the person tips.
X = df.drop("Tip_Amt", axis=1)
y = df['Tip_Amt'] > 0
We're in-memory, so all this is normal.
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y)
len(X_train)
len(X_test)
I notice that there are some minor differences in the spelling on "Payment Type":
df.Payment_Type.cat.categories
We'll consolidate those by just lower-casing them:
df.Payment_Type.str.lower()
And since we're good sci-kittens, we'll package all this up in a pipeline.
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer
class ColumnSelector(TransformerMixin):
"Select `columns` from `X`"
def __init__(self, columns):
self.columns = columns
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return X[self.columns]
class HourExtractor(TransformerMixin):
"Transform each datetime64 column in `columns` to integer hours"
def __init__(self, columns):
self.columns = columns
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return X.assign(**{col: lambda x: x[col].dt.hour for col in self.columns})
def payment_lowerer(X):
"""Lowercase all the Payment_Type values"""
return X.assign(Payment_Type=X.Payment_Type.str.lower())
class CategoricalEncoder(TransformerMixin):
"""Convert to Categorical with specific `categories`"""
def __init__(self, categories):
self.categories = categories
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
for col, categories in self.categories.items():
X[col] = X[col].astype('category').cat.set_categories(categories)
return X
class StandardScaler(TransformerMixin):
"Scale a subset of the columns in a DataFrame"
def __init__(self, columns):
self.columns = columns
def fit(self, X, y=None):
self.μs = X[self.columns].mean()
self.σs = X[self.columns].std()
return self
def transform(self, X, y=None):
X = X.copy()
X[self.columns] = X[self.columns].sub(self.μs).div(self.σs)
return X
# The columns at the start of the pipeline
columns = ['vendor_name', 'Trip_Pickup_DateTime',
'Passenger_Count', 'Trip_Distance',
'Payment_Type', 'Fare_Amt', 'surcharge']
# The mapping of {column: set of categories}
categories = {
'vendor_name': ['CMT', 'DDS', 'VTS'],
'Payment_Type': ['cash', 'credit', 'dispute', 'no charge'],
}
scale = ['Trip_Distance', 'Fare_Amt', 'surcharge']
pipe = make_pipeline(
ColumnSelector(columns),
HourExtractor(['Trip_Pickup_DateTime']),
FunctionTransformer(payment_lowerer, validate=False),
CategoricalEncoder(categories),
FunctionTransformer(pd.get_dummies, validate=False),
StandardScaler(scale),
LogisticRegression(),
)
pipe
pipe.steps
%time pipe.fit(X_train, y_train)
pipe.score(X_train, y_train)
pipe.score(X_test, y_test)
def mkpipe():
pipe = make_pipeline(
ColumnSelector(columns),
HourExtractor(['Trip_Pickup_DateTime']),
FunctionTransformer(payment_lowerer, validate=False),
CategoricalEncoder(categories),
FunctionTransformer(pd.get_dummies, validate=False),
StandardScaler(scale),
LogisticRegression(),
)
return pipe
import dask.dataframe as dd
%%time
df = dd.read_csv("data/*.csv", dtype=dtype,
parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)
X = df.drop("Tip_Amt", axis=1)
y = df['Tip_Amt'] > 0
Since the scikit-learn world isn't really "dask-aware" at the moment, we'll use the map_partitions
method. This is a good escape hatch for dealing with non-daskified code.
yhat = X.map_partitions(lambda x: pd.Series(pipe.predict_proba(x)[:, 1], name='yhat'),
meta=('yhat', 'f8'))
%time yhat.to_frame().to_parquet("data/predictions.parq")