This notebook is adapted from a Keras tutorial. It uses Chicago Taxi dataset and a DNN Keras model to predict whether a trip may generate a big tip.
In this example, we showcase how to achieve the same tasks as the Keras Tutorial using Ray AIR, covering every step from data ingestion to pushing a model to serving.
Uncomment and run the following line in order to install all the necessary dependencies:
# ! pip install "tensorflow>=2.8.0" "ray[air]>=2.0.0"
We will use ray.init()
to initialize a local cluster. By default, this cluster will be composed of only the machine you are running this notebook on. If you wish to attach to an existing Ray cluster, you can do so through ray.init(address="auto")
.
from pprint import pprint
import ray
ray.shutdown()
ray.init()
2022-11-08 22:33:29,918 INFO worker.py:1528 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
Python version: | 3.8.6 |
Ray version: | 3.0.0.dev0 |
Dashboard: | http://127.0.0.1:8265 |
We can check the resources our cluster is composed of. If you are running this notebook on your local machine or Google Colab, you should see the number of CPU cores and GPUs available on the said machine.
pprint(ray.cluster_resources())
{'CPU': 16.0, 'memory': 6000536781.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 2147483648.0}
Let's start with defining a helper function to get the data to work with. Some columns are dropped for simplicity.
import pandas as pd
INPUT = "input"
LABEL = "is_big_tip"
def get_data() -> pd.DataFrame:
"""Fetch the taxi fare data to work on."""
_data = pd.read_csv(
"https://raw.githubusercontent.com/tensorflow/tfx/master/"
"tfx/examples/chicago_taxi_pipeline/data/simple/data.csv"
)
_data[LABEL] = _data["tips"] / _data["fare"] > 0.2
# We drop some columns here for the sake of simplicity.
return _data.drop(
[
"tips",
"fare",
"dropoff_latitude",
"dropoff_longitude",
"pickup_latitude",
"pickup_longitude",
"pickup_census_tract",
],
axis=1,
)
data = get_data()
Now let's take a look at the data. Notice that some values are missing. This is exactly where preprocessing comes into the picture. We will come back to this in the preprocessing session below.
data.head(5)
pickup_community_area | trip_start_month | trip_start_hour | trip_start_day | trip_start_timestamp | trip_miles | dropoff_census_tract | payment_type | company | trip_seconds | dropoff_community_area | is_big_tip | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | NaN | 5 | 19 | 6 | 1400269500 | 0.0 | NaN | Credit Card | Chicago Elite Cab Corp. (Chicago Carriag | 0.0 | NaN | False |
1 | NaN | 3 | 19 | 5 | 1362683700 | 0.0 | NaN | Unknown | Chicago Elite Cab Corp. | 300.0 | NaN | False |
2 | 60.0 | 10 | 2 | 3 | 1380593700 | 12.6 | NaN | Cash | Taxi Affiliation Services | 1380.0 | NaN | False |
3 | 10.0 | 10 | 1 | 2 | 1382319000 | 0.0 | NaN | Cash | Taxi Affiliation Services | 180.0 | NaN | False |
4 | 14.0 | 5 | 7 | 5 | 1369897200 | 0.0 | NaN | Cash | Dispatch Taxi Affiliation | 1080.0 | NaN | False |
We continue to split the data into training and test data. For the test data, we separate out the features to run serving on as well as labels to compare serving results with.
import numpy as np
from sklearn.model_selection import train_test_split
from typing import Tuple
def split_data(data: pd.DataFrame) -> Tuple[ray.data.Dataset, pd.DataFrame, np.array]:
"""Split the data in a stratified way.
Returns:
A tuple containing train dataset, test data and test label.
"""
# There is a native offering in Dataset for split as well.
# However, supporting stratification is a TODO there. So use
# scikit-learn equivalent here.
train_data, test_data = train_test_split(
data, stratify=data[[LABEL]], random_state=1113
)
_train_ds = ray.data.from_pandas(train_data)
_test_label = test_data[LABEL].values
_test_df = test_data.drop([LABEL], axis=1)
return _train_ds, _test_df, _test_label
train_ds, test_df, test_label = split_data(data)
print(f"There are {train_ds.count()} samples for training and {test_df.shape[0]} samples for testing.")
There are 11251 samples for training and 3751 samples for testing.
Let's focus on preprocessing first. Usually, input data needs to go through some preprocessing before being fed into model. It is a good idea to package preprocessing logic into a modularized component so that the same logic can be applied to both training data as well as data for online serving or offline batch prediction.
In AIR, this component is a Preprocessor
.
It is constructed in a way that allows easy composition.
Now let's construct a chained preprocessor composed of simple preprocessors, including
Take a look at Preprocessor
for more information on the built-in preprocessors. The output of the preprocessing step goes into model for training.
from ray.data.preprocessors import (
BatchMapper,
Concatenator,
Chain,
OneHotEncoder,
SimpleImputer,
)
def get_preprocessor():
"""Construct a chain of preprocessors."""
imputer1 = SimpleImputer(
["dropoff_census_tract"], strategy="most_frequent"
)
imputer2 = SimpleImputer(
["pickup_community_area", "dropoff_community_area"],
strategy="most_frequent",
)
imputer3 = SimpleImputer(["payment_type"], strategy="most_frequent")
imputer4 = SimpleImputer(
["company"], strategy="most_frequent")
imputer5 = SimpleImputer(
["trip_start_timestamp", "trip_miles", "trip_seconds"], strategy="mean"
)
ohe = OneHotEncoder(
columns=[
"trip_start_hour",
"trip_start_day",
"trip_start_month",
"dropoff_census_tract",
"pickup_community_area",
"dropoff_community_area",
"payment_type",
"company",
],
max_categories={
"dropoff_census_tract": 25,
"pickup_community_area": 20,
"dropoff_community_area": 20,
"payment_type": 2,
"company": 7,
},
)
def batch_mapper_fn(df):
df["trip_start_year"] = pd.to_datetime(df["trip_start_timestamp"], unit="s").dt.year
df = df.drop(["trip_start_timestamp"], axis=1)
return df
chained_pp = Chain(
imputer1,
imputer2,
imputer3,
imputer4,
imputer5,
ohe,
BatchMapper(batch_mapper_fn, batch_format="pandas"),
# Concatenate all columns, except LABEL into a single tensor column with name INPUT.
Concatenator(output_column_name=INPUT, exclude=[LABEL])
)
return chained_pp
Now let's define some constants for clarity.
# Note that `INPUT_SIZE` here is corresponding to the dimension
# of the previously created tensor column during preprocessing.
# This is used to specify the input shape of Keras model.
INPUT_SIZE = 120
# The global training batch size. Based on `NUM_WORKERS`, each worker
# will get its own share of this batch size. For example, if
# `NUM_WORKERS = 2`, each worker will work on 4 samples per batch.
BATCH_SIZE = 8
# Number of epoch. Adjust it based on how quickly you want the run to be.
EPOCH = 1
# Number of training workers.
# Adjust this accordingly based on the resources you have!
NUM_WORKERS = 2
Let's starting with defining a simple Keras model for the classification task.
import tensorflow as tf
def build_model():
model = tf.keras.models.Sequential()
model.add(tf.keras.Input(shape=(INPUT_SIZE,)))
model.add(tf.keras.layers.Dense(50, activation="relu"))
model.add(tf.keras.layers.Dense(1, activation="sigmoid"))
return model
Now let's define the training loop. This code will be run on each training worker in a distributed fashion. See more details here.
from ray.air import session, Checkpoint
from ray.train.tensorflow import TensorflowCheckpoint
def train_loop_per_worker():
dataset_shard = session.get_dataset_shard("train")
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
model = build_model()
model.compile(
loss="binary_crossentropy",
optimizer="adam",
metrics=["accuracy"],
)
for epoch in range(EPOCH):
tf_dataset = dataset_shard.to_tf(feature_columns=INPUT, label_columns=LABEL, batch_size=BATCH_SIZE, drop_last=True)
model.fit(tf_dataset, verbose=0)
# This saves checkpoint in a way that can be used by Ray Serve coherently.
session.report(
{},
checkpoint=TensorflowCheckpoint.from_model(model),
)
Now let's define a trainer that takes in the training loop, the training dataset as well the preprocessor that we just defined.
And run it!
Notice that you can tune how long you want the run to be by changing EPOCH
.
from ray.train.tensorflow import TensorflowTrainer
from ray.air.config import ScalingConfig
trainer = TensorflowTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=NUM_WORKERS),
datasets={"train": train_ds},
preprocessor=get_preprocessor(),
)
result = trainer.fit()
We will use Ray Serve to serve the trained model. A core concept of Ray Serve is a Deployment. It allows you to define and update your business logic or models that will handle incoming requests as well as how this is exposed over HTTP or in Python.
In the case of serving a model, ray.serve.air_integrations.Predictor
and ray.serve.air_integrations.PredictorDeployment
wrap a ray.air.checkpoint.Checkpoint
into a Ray Serve deployment that can readily serve HTTP requests.
Note, Checkpoint
captures both model and preprocessing steps in a way compatible with Ray Serve and ensures that the ML workload can transition seamlessly between training and
serving.
This removes the boilerplate code and minimizes the effort to bring your model to production!
Let's first wrap our checkpoint in a serve endpoint that exposes a URL to where requests can be sent to.
Our Serve endpoint will take in JSON data as input, so we also specify an adapter to convert the JSON data to a Pandas Dataframe so it can be inputted to the TensorflowPredictor
from ray import serve
from ray.air.checkpoint import Checkpoint
from ray.train.tensorflow import TensorflowPredictor
from ray.serve import PredictorDeployment
from ray.serve.http_adapters import pandas_read_json
def serve_model(checkpoint: Checkpoint, model_definition, name="Model") -> str:
"""Expose a serve endpoint.
Returns:
serve URL.
"""
serve.run(
PredictorDeployment.options(name=name).bind(
TensorflowPredictor,
checkpoint,
model_definition=model_definition,
http_adapter=pandas_read_json,
)
)
return f"http://localhost:8000/"
import ray
# Generally speaking, training and serving are done in totally different ray clusters.
# To simulate that, let's shutdown the old ray cluster in preparation for serving.
ray.shutdown()
endpoint_uri = serve_model(result.checkpoint, build_model)
Let's write a helper function to send requests to this endpoint and compare the results with labels.
import json
import requests
import pandas as pd
import numpy as np
NUM_SERVE_REQUESTS = 10
def send_requests(df: pd.DataFrame, label: np.array):
for i in range(NUM_SERVE_REQUESTS):
one_row = df.iloc[[i]].to_dict()
serve_result = requests.post(endpoint_uri, data=json.dumps(one_row), headers={"Content-Type": "application/json"}).json()
print(
f"request{i} prediction: {serve_result[0]['predictions']} "
f"- label: {str(label[i])}"
)
send_requests(test_df, test_label)