#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
*A Component-by-Component Introduction to TensorFlow Extended (TFX)*
Note: We recommend running this tutorial in a Colab notebook, with no setup required! Just click "Run in Google Colab".
This Colab-based tutorial will interactively walk through each built-in component of TensorFlow Extended (TFX).
It covers every step in an end-to-end machine learning pipeline, from data ingestion to pushing a model to serving.
When you're done, the contents of this notebook can be automatically exported as TFX pipeline source code, which you can orchestrate with Apache Airflow and Apache Beam.
Note: This notebook demonstrates the use of native Keras models in TFX pipelines. TFX only supports the TensorFlow 2 version of Keras.
This notebook demonstrates how to use TFX in a Jupyter/Colab environment. Here, we walk through the Chicago Taxi example in an interactive notebook.
Working in an interactive notebook is a useful way to become familiar with the structure of a TFX pipeline. It's also useful when doing development of your own pipelines as a lightweight development environment, but you should be aware that there are differences in the way interactive notebooks are orchestrated, and how they access metadata artifacts.
In a production deployment of TFX, you will use an orchestrator such as Apache Airflow, Kubeflow Pipelines, or Apache Beam to orchestrate a pre-defined pipeline graph of TFX components. In an interactive notebook, the notebook itself is the orchestrator, running each TFX component as you execute the notebook cells.
In a production deployment of TFX, you will access metadata through the ML Metadata (MLMD) API. MLMD stores metadata properties in a database such as MySQL or SQLite, and stores the metadata payloads in a persistent store such as on your filesystem. In an interactive notebook, both properties and payloads are stored in an ephemeral SQLite database in the /tmp
directory on the Jupyter notebook or Colab server.
First, we install and import the necessary packages, set up paths, and download data.
To avoid upgrading Pip in a system when running locally, check to make sure that we're running in Colab. Local systems can of course be upgraded separately.
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
Note: In Google Colab, because of package updates, the first time you run this cell you must restart the runtime (Runtime > Restart runtime ...).
!pip install -U tfx
If you are using Google Colab, the first time that you run the cell above, you must restart the runtime (Runtime > Restart runtime ...). This is because of the way that Colab loads packages.
We import necessary packages, including standard TFX component classes.
import os
import pprint
import tempfile
import urllib
import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()
from tfx import v1 as tfx
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip
Let's check the library versions.
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))
# This is the root directory for your TFX pip package installation.
_tfx_root = tfx.__path__[0]
# This is the directory containing the TFX Chicago Taxi Pipeline example.
_taxi_root = os.path.join(_tfx_root, 'examples/chicago_taxi_pipeline')
# This is the path where your model will be pushed for serving.
_serving_model_dir = os.path.join(
tempfile.mkdtemp(), 'serving_model/taxi_simple')
# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)
We download the example dataset for use in our TFX pipeline.
The dataset we're using is the Taxi Trips dataset released by the City of Chicago. The columns in this dataset are:
pickup_community_area | fare | trip_start_month |
trip_start_hour | trip_start_day | trip_start_timestamp |
pickup_latitude | pickup_longitude | dropoff_latitude |
dropoff_longitude | trip_miles | pickup_census_tract |
dropoff_census_tract | payment_type | company |
trip_seconds | dropoff_community_area | tips |
With this dataset, we will build a model that predicts the tips
of a trip.
_data_root = tempfile.mkdtemp(prefix='tfx-data')
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv'
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)
Take a quick look at the CSV file.
!head {_data_filepath}
Disclaimer: This site provides applications using data that has been modified for use from its original source, www.cityofchicago.org, the official website of the City of Chicago. The City of Chicago makes no claims as to the content, accuracy, timeliness, or completeness of any of the data provided at this site. The data provided at this site is subject to change at any time. It is understood that the data provided at this site is being used at one’s own risk.
Last, we create an InteractiveContext, which will allow us to run TFX components interactively in this notebook.
# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
context = InteractiveContext()
In the cells that follow, we create TFX components one-by-one, run each of them, and visualize their output artifacts.
The ExampleGen
component is usually at the start of a TFX pipeline. It will:
tf.Example
format (learn more here)_tfx_root
directory for other components to accessExampleGen
takes as input the path to your data source. In our case, this is the _data_root
path that contains the downloaded CSV.
Note: In this notebook, we can instantiate components one-by-one and run them with InteractiveContext.run()
. By contrast, in a production setting, we would specify all the components upfront in a Pipeline
to pass to the orchestrator (see the Building a TFX Pipeline Guide).
When using the InteractiveContext
in a notebook to develop a pipeline you can control when individual components will cache their outputs. Set enable_cache
to True
when you want to reuse the previous output artifacts that the component generated. Set enable_cache
to False
when you want to recompute the output artifacts for a component, if you are making changes to the code for example.
example_gen = tfx.components.CsvExampleGen(input_base=_data_root)
context.run(example_gen, enable_cache=True)
Let's examine the output artifacts of ExampleGen
. This component produces two artifacts, training examples and evaluation examples:
artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)
We can also take a look at the first three training examples:
# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'Split-train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
Now that ExampleGen
has finished ingesting the data, the next step is data analysis.
The StatisticsGen
component computes statistics over your dataset for data analysis, as well as for use in downstream components. It uses the TensorFlow Data Validation library.
StatisticsGen
takes as input the dataset we just ingested using ExampleGen
.
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs['examples'])
context.run(statistics_gen, enable_cache=True)
After StatisticsGen
finishes running, we can visualize the outputted statistics. Try playing with the different plots!
context.show(statistics_gen.outputs['statistics'])
The SchemaGen
component generates a schema based on your data statistics. (A schema defines the expected bounds, types, and properties of the features in your dataset.) It also uses the TensorFlow Data Validation library.
Note: The generated schema is best-effort and only tries to infer basic properties of the data. It is expected that you review and modify it as needed.
SchemaGen
will take as input the statistics that we generated with StatisticsGen
, looking at the training split by default.
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False)
context.run(schema_gen, enable_cache=True)
After SchemaGen
finishes running, we can visualize the generated schema as a table.
context.show(schema_gen.outputs['schema'])
Each feature in your dataset shows up as a row in the schema table, alongside its properties. The schema also captures all the values that a categorical feature takes on, denoted as its domain.
To learn more about schemas, see the SchemaGen documentation.
The ExampleValidator
component detects anomalies in your data, based on the expectations defined by the schema. It also uses the TensorFlow Data Validation library.
ExampleValidator
will take as input the statistics from StatisticsGen
, and the schema from SchemaGen
.
example_validator = tfx.components.ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
context.run(example_validator, enable_cache=True)
After ExampleValidator
finishes running, we can visualize the anomalies as a table.
context.show(example_validator.outputs['anomalies'])
In the anomalies table, we can see that there are no anomalies. This is what we'd expect, since this the first dataset that we've analyzed and the schema is tailored to it. You should review this schema -- anything unexpected means an anomaly in the data. Once reviewed, the schema can be used to guard future data, and anomalies produced here can be used to debug model performance, understand how your data evolves over time, and identify data errors.
The Transform
component performs feature engineering for both training and serving. It uses the TensorFlow Transform library.
Transform
will take as input the data from ExampleGen
, the schema from SchemaGen
, as well as a module that contains user-defined Transform code.
Let's see an example of user-defined Transform code below (for an introduction to the TensorFlow Transform APIs, see the tutorial). First, we define a few constants for feature engineering:
Note: The %%writefile
cell magic will save the contents of the cell as a .py
file on disk. This allows the Transform
component to load your code as a module.
_taxi_constants_module_file = 'taxi_constants.py'
%%writefile {_taxi_constants_module_file}
NUMERICAL_FEATURES = ['trip_miles', 'fare', 'trip_seconds']
BUCKET_FEATURES = [
'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
'dropoff_longitude'
]
# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10
CATEGORICAL_NUMERICAL_FEATURES = [
'trip_start_hour', 'trip_start_day', 'trip_start_month',
'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
'dropoff_community_area'
]
CATEGORICAL_STRING_FEATURES = [
'payment_type',
'company',
]
# Number of vocabulary terms used for encoding categorical features.
VOCAB_SIZE = 1000
# Count of out-of-vocab buckets in which unrecognized categorical are hashed.
OOV_SIZE = 10
# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'
def t_name(key):
"""
Rename the feature keys so that they don't clash with the raw keys when
running the Evaluator component.
Args:
key: The original feature key
Returns:
key with '_xf' appended
"""
return key + '_xf'
Next, we write a preprocessing_fn
that takes in raw data as input, and returns transformed features that our model can train on:
_taxi_transform_module_file = 'taxi_transform.py'
%%writefile {_taxi_transform_module_file}
import tensorflow as tf
import tensorflow_transform as tft
# Imported files such as taxi_constants are normally cached, so changes are
# not honored after the first import. Normally this is good for efficiency, but
# during development when we may be iterating code it can be a problem. To
# avoid this problem during development, reload the file.
import taxi_constants
import sys
if 'google.colab' in sys.modules: # Testing to see if we're doing development
import importlib
importlib.reload(taxi_constants)
_NUMERICAL_FEATURES = taxi_constants.NUMERICAL_FEATURES
_BUCKET_FEATURES = taxi_constants.BUCKET_FEATURES
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_CATEGORICAL_NUMERICAL_FEATURES = taxi_constants.CATEGORICAL_NUMERICAL_FEATURES
_CATEGORICAL_STRING_FEATURES = taxi_constants.CATEGORICAL_STRING_FEATURES
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FARE_KEY = taxi_constants.FARE_KEY
_LABEL_KEY = taxi_constants.LABEL_KEY
def _make_one_hot(x, key):
"""Make a one-hot tensor to encode categorical features.
Args:
X: A dense tensor
key: A string key for the feature in the input
Returns:
A dense one-hot tensor as a float list
"""
integerized = tft.compute_and_apply_vocabulary(x,
top_k=_VOCAB_SIZE,
num_oov_buckets=_OOV_SIZE,
vocab_filename=key, name=key)
depth = (
tft.experimental.get_vocabulary_size_by_name(key) + _OOV_SIZE)
one_hot_encoded = tf.one_hot(
integerized,
depth=tf.cast(depth, tf.int32),
on_value=1.0,
off_value=0.0)
return tf.reshape(one_hot_encoded, [-1, depth])
def _fill_in_missing(x):
"""Replace missing values in a SparseTensor.
Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
Args:
x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1
in the second dimension.
Returns:
A rank 1 tensor where missing values of `x` have been filled in.
"""
if not isinstance(x, tf.sparse.SparseTensor):
return x
default_value = '' if x.dtype == tf.string else 0
return tf.squeeze(
tf.sparse.to_dense(
tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
default_value),
axis=1)
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
outputs = {}
for key in _NUMERICAL_FEATURES:
# If sparse make it dense, setting nan's to 0 or '', and apply zscore.
outputs[taxi_constants.t_name(key)] = tft.scale_to_z_score(
_fill_in_missing(inputs[key]), name=key)
for key in _BUCKET_FEATURES:
outputs[taxi_constants.t_name(key)] = tf.cast(tft.bucketize(
_fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT, name=key),
dtype=tf.float32)
for key in _CATEGORICAL_STRING_FEATURES:
outputs[taxi_constants.t_name(key)] = _make_one_hot(_fill_in_missing(inputs[key]), key)
for key in _CATEGORICAL_NUMERICAL_FEATURES:
outputs[taxi_constants.t_name(key)] = _make_one_hot(tf.strings.strip(
tf.strings.as_string(_fill_in_missing(inputs[key]))), key)
# Was this passenger a big tipper?
taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
tips = _fill_in_missing(inputs[_LABEL_KEY])
outputs[_LABEL_KEY] = tf.where(
tf.math.is_nan(taxi_fare),
tf.cast(tf.zeros_like(taxi_fare), tf.int64),
# Test if the tip was > 20% of the fare.
tf.cast(
tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))
return outputs
Now, we pass in this feature engineering code to the Transform
component and run it to transform your data.
transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_taxi_transform_module_file))
context.run(transform, enable_cache=True)
Let's examine the output artifacts of Transform
. This component produces two types of outputs:
transform_graph
is the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).transformed_examples
represents the preprocessed training and evaluation data.transform.outputs
Take a peek at the transform_graph
artifact. It points to a directory containing three subdirectories.
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)
The transformed_metadata
subdirectory contains the schema of the preprocessed data. The transform_fn
subdirectory contains the actual preprocessing graph. The metadata
subdirectory contains the schema of the original data.
We can also take a look at the first three transformed examples:
# Get the URI of the output artifact representing the transformed examples, which is a directory
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
After the Transform
component has transformed your data into features, and the next step is to train a model.
The Trainer
component will train a model that you define in TensorFlow. Default Trainer support Estimator API, to use Keras API, you need to specify Generic Trainer by setup custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor)
in Trainer's contructor.
Trainer
takes as input the schema from SchemaGen
, the transformed data and graph from Transform
, training parameters, as well as a module that contains user-defined model code.
Let's see an example of user-defined model code below (for an introduction to the TensorFlow Keras APIs, see the tutorial):
_taxi_trainer_module_file = 'taxi_trainer.py'
%%writefile {_taxi_trainer_module_file}
from typing import Dict, List, Text
import os
import glob
from absl import logging
import datetime
import tensorflow as tf
import tensorflow_transform as tft
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_transform import TFTransformOutput
# Imported files such as taxi_constants are normally cached, so changes are
# not honored after the first import. Normally this is good for efficiency, but
# during development when we may be iterating code it can be a problem. To
# avoid this problem during development, reload the file.
import taxi_constants
import sys
if 'google.colab' in sys.modules: # Testing to see if we're doing development
import importlib
importlib.reload(taxi_constants)
_LABEL_KEY = taxi_constants.LABEL_KEY
_BATCH_SIZE = 40
def _input_fn(file_pattern: List[Text],
data_accessor: tfx.components.DataAccessor,
tf_transform_output: tft.TFTransformOutput,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for tuning/training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
tf_transform_output: A TFTransformOutput.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
tf_transform_output.transformed_metadata.schema)
def _get_tf_examples_serving_signature(model, tf_transform_output):
"""Returns a serving signature that accepts `tensorflow.Example`."""
# We need to track the layers in the model in order to save it.
# TODO(b/162357359): Revise once the bug is resolved.
model.tft_layer_inference = tf_transform_output.transform_features_layer()
@tf.function(input_signature=[
tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
])
def serve_tf_examples_fn(serialized_tf_example):
"""Returns the output to be used in the serving signature."""
raw_feature_spec = tf_transform_output.raw_feature_spec()
# Remove label feature since these will not be present at serving time.
raw_feature_spec.pop(_LABEL_KEY)
raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
transformed_features = model.tft_layer_inference(raw_features)
logging.info('serve_transformed_features = %s', transformed_features)
outputs = model(transformed_features)
# TODO(b/154085620): Convert the predicted labels from the model using a
# reverse-lookup (opposite of transform.py).
return {'outputs': outputs}
return serve_tf_examples_fn
def _get_transform_features_signature(model, tf_transform_output):
"""Returns a serving signature that applies tf.Transform to features."""
# We need to track the layers in the model in order to save it.
# TODO(b/162357359): Revise once the bug is resolved.
model.tft_layer_eval = tf_transform_output.transform_features_layer()
@tf.function(input_signature=[
tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
])
def transform_features_fn(serialized_tf_example):
"""Returns the transformed_features to be fed as input to evaluator."""
raw_feature_spec = tf_transform_output.raw_feature_spec()
raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
transformed_features = model.tft_layer_eval(raw_features)
logging.info('eval_transformed_features = %s', transformed_features)
return transformed_features
return transform_features_fn
def export_serving_model(tf_transform_output, model, output_dir):
"""Exports a keras model for serving.
Args:
tf_transform_output: Wrapper around output of tf.Transform.
model: A keras model to export for serving.
output_dir: A directory where the model will be exported to.
"""
# The layer has to be saved to the model for keras tracking purpases.
model.tft_layer = tf_transform_output.transform_features_layer()
signatures = {
'serving_default':
_get_tf_examples_serving_signature(model, tf_transform_output),
'transform_features':
_get_transform_features_signature(model, tf_transform_output),
}
model.save(output_dir, save_format='tf', signatures=signatures)
def _build_keras_model(tf_transform_output: TFTransformOutput
) -> tf.keras.Model:
"""Creates a DNN Keras model for classifying taxi data.
Args:
tf_transform_output: [TFTransformOutput], the outputs from Transform
Returns:
A keras Model.
"""
feature_spec = tf_transform_output.transformed_feature_spec().copy()
feature_spec.pop(_LABEL_KEY)
inputs = {}
for key, spec in feature_spec.items():
if isinstance(spec, tf.io.VarLenFeature):
inputs[key] = tf.keras.layers.Input(
shape=[None], name=key, dtype=spec.dtype, sparse=True)
elif isinstance(spec, tf.io.FixedLenFeature):
# TODO(b/208879020): Move into schema such that spec.shape is [1] and not
# [] for scalars.
inputs[key] = tf.keras.layers.Input(
shape=spec.shape or [1], name=key, dtype=spec.dtype)
else:
raise ValueError('Spec type is not supported: ', key, spec)
output = tf.keras.layers.Concatenate()(tf.nest.flatten(inputs))
output = tf.keras.layers.Dense(100, activation='relu')(output)
output = tf.keras.layers.Dense(70, activation='relu')(output)
output = tf.keras.layers.Dense(50, activation='relu')(output)
output = tf.keras.layers.Dense(20, activation='relu')(output)
output = tf.keras.layers.Dense(1)(output)
return tf.keras.Model(inputs=inputs, outputs=output)
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
tf_transform_output, _BATCH_SIZE)
eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
tf_transform_output, _BATCH_SIZE)
model = _build_keras_model(tf_transform_output)
model.compile(
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
metrics=[tf.keras.metrics.BinaryAccuracy()])
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=fn_args.model_run_dir, update_freq='batch')
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps,
callbacks=[tensorboard_callback])
# Export the model.
export_serving_model(tf_transform_output, model, fn_args.serving_model_dir)
Now, we pass in this model code to the Trainer
component and run it to train the model.
trainer = tfx.components.Trainer(
module_file=os.path.abspath(_taxi_trainer_module_file),
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=tfx.proto.TrainArgs(num_steps=10000),
eval_args=tfx.proto.EvalArgs(num_steps=5000))
context.run(trainer, enable_cache=True)
Take a peek at the trainer artifact. It points to a directory containing the model subdirectories.
model_artifact_dir = trainer.outputs['model'].get()[0].uri
pp.pprint(os.listdir(model_artifact_dir))
model_dir = os.path.join(model_artifact_dir, 'Format-Serving')
pp.pprint(os.listdir(model_dir))
Optionally, we can connect TensorBoard to the Trainer to analyze our model's training curves.
model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri
%load_ext tensorboard
%tensorboard --logdir {model_run_artifact_dir}
The Evaluator
component computes model performance metrics over the evaluation set. It uses the TensorFlow Model Analysis library. The Evaluator
can also optionally validate that a newly trained model is better than the previous model. This is useful in a production pipeline setting where you may automatically train and validate a model every day. In this notebook, we only train one model, so the Evaluator
automatically will label the
model as "good".
Evaluator
will take as input the data from ExampleGen
, the trained model from Trainer
, and slicing configuration. The slicing configuration allows you to slice your metrics on feature values (e.g. how does your model perform on taxi trips that start at 8am versus 8pm?). See an example of this configuration below:
# Imported files such as taxi_constants are normally cached, so changes are
# not honored after the first import. Normally this is good for efficiency, but
# during development when we may be iterating code it can be a problem. To
# avoid this problem during development, reload the file.
import taxi_constants
import sys
if 'google.colab' in sys.modules: # Testing to see if we're doing development
import importlib
importlib.reload(taxi_constants)
eval_config = tfma.EvalConfig(
model_specs=[
# This assumes a serving model with signature 'serving_default'. If
# using estimator based EvalSavedModel, add signature_name: 'eval' and
# remove the label_key.
tfma.ModelSpec(
signature_name='serving_default',
label_key=taxi_constants.LABEL_KEY,
preprocessing_function_names=['transform_features'],
)
],
metrics_specs=[
tfma.MetricsSpec(
# The metrics added here are in addition to those saved with the
# model (assuming either a keras model or EvalSavedModel is used).
# Any metrics added into the saved model (for example using
# model.compile(..., metrics=[...]), etc) will be computed
# automatically.
# To add validation thresholds for metrics saved with the model,
# add them keyed by metric name to the thresholds map.
metrics=[
tfma.MetricConfig(class_name='ExampleCount'),
tfma.MetricConfig(class_name='BinaryAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.5}),
# Change threshold will be ignored if there is no
# baseline model resolved from MLMD (first run).
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10})))
]
)
],
slicing_specs=[
# An empty slice spec means the overall slice, i.e. the whole dataset.
tfma.SlicingSpec(),
# Data can be sliced along a feature column. In this case, data is
# sliced along feature column trip_start_hour.
tfma.SlicingSpec(
feature_keys=['trip_start_hour'])
])
Next, we give this configuration to Evaluator
and run it.
# Use TFMA to compute a evaluation statistics over features of a model and
# validate them against a baseline.
# The model resolver is only required if performing model validation in addition
# to evaluation. In this case we validate against the latest blessed model. If
# no model has been blessed before (as in this case) the evaluator will make our
# candidate the first blessed model.
model_resolver = tfx.dsl.Resolver(
strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
model_blessing=tfx.dsl.Channel(
type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
'latest_blessed_model_resolver')
context.run(model_resolver, enable_cache=True)
evaluator = tfx.components.Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
eval_config=eval_config)
context.run(evaluator, enable_cache=True)
Now let's examine the output artifacts of Evaluator
.
evaluator.outputs
Using the evaluation
output we can show the default visualization of global metrics on the entire evaluation set.
context.show(evaluator.outputs['evaluation'])
To see the visualization for sliced evaluation metrics, we can directly call the TensorFlow Model Analysis library.
import tensorflow_model_analysis as tfma
# Get the TFMA output result path and load the result.
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
tfma_result = tfma.load_eval_result(PATH_TO_RESULT)
# Show data sliced along feature column trip_start_hour.
tfma.view.render_slicing_metrics(
tfma_result, slicing_column='trip_start_hour')
This visualization shows the same metrics, but computed at every feature value of trip_start_hour
instead of on the entire evaluation set.
TensorFlow Model Analysis supports many other visualizations, such as Fairness Indicators and plotting a time series of model performance. To learn more, see the tutorial.
Since we added thresholds to our config, validation output is also available. The precence of a blessing
artifact indicates that our model passed validation. Since this is the first validation being performed the candidate is automatically blessed.
blessing_uri = evaluator.outputs['blessing'].get()[0].uri
!ls -l {blessing_uri}
Now can also verify the success by loading the validation result record:
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
print(tfma.load_validation_result(PATH_TO_RESULT))
The Pusher
component is usually at the end of a TFX pipeline. It checks whether a model has passed validation, and if so, exports the model to _serving_model_dir
.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=_serving_model_dir)))
context.run(pusher, enable_cache=True)
Let's examine the output artifacts of Pusher
.
pusher.outputs
In particular, the Pusher will export your model in the SavedModel format, which looks like this:
push_uri = pusher.outputs['pushed_model'].get()[0].uri
model = tf.saved_model.load(push_uri)
for item in model.signatures.items():
pp.pprint(item)
We're finished our tour of built-in TFX components!