Chapter 19 – Training and Deploying TensorFlow Models at Scale
This notebook contains all the sample code and solutions to the exercises in chapter 19.
This project requires Python 3.7 or above:
import sys
assert sys.version_info >= (3, 7)
And TensorFlow ≥ 2.8:
from packaging import version
import tensorflow as tf
assert version.parse(tf.__version__) >= version.parse("2.8.0")
If running on Colab or Kaggle, you need to install the Google AI Platform client library, which will be used later in this notebook. You can ignore the warnings about version incompatibilities.
import sys
if "google.colab" in sys.modules or "kaggle_secrets" in sys.modules:
%pip install -q -U google-cloud-aiplatform
This chapter discusses how to run or train a model on one or more GPUs, so let's make sure there's at least one, or else issue a warning:
if not tf.config.list_physical_devices('GPU'):
print("No GPU was detected. Neural nets can be very slow without a GPU.")
if "google.colab" in sys.modules:
print("Go to Runtime > Change runtime and select a GPU hardware "
"accelerator.")
if "kaggle_secrets" in sys.modules:
print("Go to Settings > Accelerator and select GPU.")
Let's start by deploying a model using TF Serving, then we'll deploy to Google Vertex AI.
The first thing we need to do is to build and train a model, and export it to the SavedModel format.
Let's load the MNIST dataset, scale it, and split it.
from pathlib import Path
import tensorflow as tf
# extra code – load and split the MNIST dataset
mnist = tf.keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
# extra code – build & train an MNIST model (also handles image preprocessing)
tf.random.set_seed(42)
tf.keras.backend.clear_session()
model = tf.keras.Sequential([
tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8),
tf.keras.layers.Rescaling(scale=1 / 255),
tf.keras.layers.Dense(100, activation="relu"),
tf.keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),
metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))
model_name = "my_mnist_model"
model_version = "0001"
model_path = Path(model_name) / model_version
model.save(model_path, save_format="tf")
Epoch 1/10 1719/1719 [==============================] - 2s 1ms/step - loss: 0.7012 - accuracy: 0.8241 - val_loss: 0.3715 - val_accuracy: 0.9024 Epoch 2/10 1719/1719 [==============================] - 2s 943us/step - loss: 0.3536 - accuracy: 0.9020 - val_loss: 0.2990 - val_accuracy: 0.9144 Epoch 3/10 1719/1719 [==============================] - 2s 933us/step - loss: 0.3036 - accuracy: 0.9145 - val_loss: 0.2651 - val_accuracy: 0.9272 Epoch 4/10 1719/1719 [==============================] - 2s 965us/step - loss: 0.2736 - accuracy: 0.9231 - val_loss: 0.2436 - val_accuracy: 0.9334 Epoch 5/10 1719/1719 [==============================] - 2s 946us/step - loss: 0.2509 - accuracy: 0.9296 - val_loss: 0.2257 - val_accuracy: 0.9364 Epoch 6/10 1719/1719 [==============================] - 2s 974us/step - loss: 0.2322 - accuracy: 0.9350 - val_loss: 0.2121 - val_accuracy: 0.9396 Epoch 7/10 1719/1719 [==============================] - 2s 959us/step - loss: 0.2161 - accuracy: 0.9400 - val_loss: 0.1970 - val_accuracy: 0.9452 Epoch 8/10 1719/1719 [==============================] - 2s 944us/step - loss: 0.2021 - accuracy: 0.9432 - val_loss: 0.1880 - val_accuracy: 0.9476 Epoch 9/10 1719/1719 [==============================] - 2s 945us/step - loss: 0.1898 - accuracy: 0.9470 - val_loss: 0.1778 - val_accuracy: 0.9524 Epoch 10/10 1719/1719 [==============================] - 2s 940us/step - loss: 0.1793 - accuracy: 0.9494 - val_loss: 0.1685 - val_accuracy: 0.9544 INFO:tensorflow:Assets written to: my_mnist_model/0001/assets
Let's take a look at the file tree (we've discussed what each of these file is used for in chapter 10):
sorted([str(path) for path in model_path.parent.glob("**/*")]) # extra code
['my_mnist_model/0001', 'my_mnist_model/0001/assets', 'my_mnist_model/0001/keras_metadata.pb', 'my_mnist_model/0001/saved_model.pb', 'my_mnist_model/0001/variables', 'my_mnist_model/0001/variables/variables.data-00000-of-00001', 'my_mnist_model/0001/variables/variables.index']
Let's inspect the SavedModel:
!saved_model_cli show --dir '{model_path}'
The given SavedModel contains the following tag-sets: 'serve'
!saved_model_cli show --dir '{model_path}' --tag_set serve
The given SavedModel MetaGraphDef contains SignatureDefs with the following keys: SignatureDef key: "__saved_model_init_op" SignatureDef key: "serving_default"
!saved_model_cli show --dir '{model_path}' --tag_set serve \
--signature_def serving_default
The given SavedModel SignatureDef contains the following input(s): inputs['flatten_input'] tensor_info: dtype: DT_UINT8 shape: (-1, 28, 28) name: serving_default_flatten_input:0 The given SavedModel SignatureDef contains the following output(s): outputs['dense_1'] tensor_info: dtype: DT_FLOAT shape: (-1, 10) name: StatefulPartitionedCall:0 Method name is: tensorflow/serving/predict
For even more details, you can run the following command:
!saved_model_cli show --dir '{model_path}' --all
If you are running this notebook in Colab or Kaggle, TensorFlow Server needs to be installed:
if "google.colab" in sys.modules or "kaggle_secrets" in sys.modules:
url = "https://storage.googleapis.com/tensorflow-serving-apt"
src = "stable tensorflow-model-server tensorflow-model-server-universal"
!echo 'deb {url} {src}' > /etc/apt/sources.list.d/tensorflow-serving.list
!curl '{url}/tensorflow-serving.release.pub.gpg' | apt-key add -
!apt update -q && apt-get install -y tensorflow-model-server
%pip install -q -U tensorflow-serving-api
If tensorflow_model_server
is installed (e.g., if you are running this notebook in Colab), then the following 2 cells will start the server. If your OS is Windows, you may need to run the tensorflow_model_server
command in a terminal, and replace ${MODEL_DIR}
with the full path to the my_mnist_model
directory.
import os
os.environ["MODEL_DIR"] = str(model_path.parent.absolute())
%%bash --bg
tensorflow_model_server \
--port=8500 \
--rest_api_port=8501 \
--model_name=my_mnist_model \
--model_base_path="${MODEL_DIR}" >my_server.log 2>&1
If you are running this notebook on your own machine, and you prefer to install TF Serving using Docker, first make sure Docker is installed, then run the following commands in a terminal. You must replace /path/to/my_mnist_model
with the appropriate absolute path to the my_mnist_model
directory, but do not modify the container path /models/my_mnist_model
.
docker pull tensorflow/serving # downloads the latest TF Serving image
docker run -it --rm -v "/path/to/my_mnist_model:/models/my_mnist_model" \
-p 8500:8500 -p 8501:8501 -e MODEL_NAME=my_mnist_model tensorflow/serving
Next, let's send a REST query to TF Serving:
import json
X_new = X_test[:3] # pretend we have 3 new digit images to classify
request_json = json.dumps({
"signature_name": "serving_default",
"instances": X_new.tolist(),
})
request_json[:100] + "..." + request_json[-10:]
'{"signature_name": "serving_default", "instances": [[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0..., 0, 0]]]}'
Now let's use TensorFlow Serving's REST API to make predictions:
import requests
server_url = "http://localhost:8501/v1/models/my_mnist_model:predict"
response = requests.post(server_url, data=request_json)
response.raise_for_status() # raise an exception in case of error
response = response.json()
import numpy as np
y_proba = np.array(response["predictions"])
y_proba.round(2)
array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ], [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ], [0. , 0.97, 0.01, 0. , 0. , 0. , 0. , 0.01, 0. , 0. ]])
from tensorflow_serving.apis.predict_pb2 import PredictRequest
request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = "serving_default"
input_name = model.input_names[0] # == "flatten_input"
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc
channel = grpc.insecure_channel('localhost:8500')
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
response = predict_service.Predict(request, timeout=10.0)
Convert the response to a tensor:
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
y_proba = tf.make_ndarray(outputs_proto)
y_proba.round(2)
array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ], [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ], [0. , 0.97, 0.01, 0. , 0. , 0. , 0. , 0.01, 0. , 0. ]], dtype=float32)
If your client does not include the TensorFlow library, you can convert the response to a NumPy array like this:
# extra code – shows how to avoid using tf.make_ndarray()
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
shape = [dim.size for dim in outputs_proto.tensor_shape.dim]
y_proba = np.array(outputs_proto.float_val).reshape(shape)
y_proba.round(2)
array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ], [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ], [0. , 0.97, 0.01, 0. , 0. , 0. , 0. , 0.01, 0. , 0. ]])
# extra code – build and train a new MNIST model version
np.random.seed(42)
tf.random.set_seed(42)
model = tf.keras.Sequential([
tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8),
tf.keras.layers.Rescaling(scale=1 / 255),
tf.keras.layers.Dense(50, activation="relu"),
tf.keras.layers.Dense(50, activation="relu"),
tf.keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),
metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10,
validation_data=(X_valid, y_valid))
Epoch 1/10 1719/1719 [==============================] - 2s 931us/step - loss: 0.7039 - accuracy: 0.8056 - val_loss: 0.3418 - val_accuracy: 0.9042 Epoch 2/10 1719/1719 [==============================] - 1s 855us/step - loss: 0.3204 - accuracy: 0.9082 - val_loss: 0.2674 - val_accuracy: 0.9242 Epoch 3/10 1719/1719 [==============================] - 2s 883us/step - loss: 0.2650 - accuracy: 0.9235 - val_loss: 0.2227 - val_accuracy: 0.9368 Epoch 4/10 1719/1719 [==============================] - 1s 869us/step - loss: 0.2319 - accuracy: 0.9329 - val_loss: 0.2032 - val_accuracy: 0.9432 Epoch 5/10 1719/1719 [==============================] - 1s 870us/step - loss: 0.2089 - accuracy: 0.9399 - val_loss: 0.1833 - val_accuracy: 0.9482 Epoch 6/10 1719/1719 [==============================] - 1s 871us/step - loss: 0.1908 - accuracy: 0.9446 - val_loss: 0.1740 - val_accuracy: 0.9498 Epoch 7/10 1719/1719 [==============================] - 2s 873us/step - loss: 0.1756 - accuracy: 0.9490 - val_loss: 0.1605 - val_accuracy: 0.9540 Epoch 8/10 1719/1719 [==============================] - 2s 877us/step - loss: 0.1631 - accuracy: 0.9524 - val_loss: 0.1543 - val_accuracy: 0.9558 Epoch 9/10 1719/1719 [==============================] - 2s 879us/step - loss: 0.1517 - accuracy: 0.9567 - val_loss: 0.1460 - val_accuracy: 0.9570 Epoch 10/10 1719/1719 [==============================] - 1s 872us/step - loss: 0.1429 - accuracy: 0.9584 - val_loss: 0.1358 - val_accuracy: 0.9618
model_version = "0002"
model_path = Path(model_name) / model_version
model.save(model_path, save_format="tf")
INFO:tensorflow:Assets written to: my_mnist_model/0002/assets
Let's take a look at the file tree again:
sorted([str(path) for path in model_path.parent.glob("**/*")]) # extra code
['my_mnist_model/0001', 'my_mnist_model/0001/assets', 'my_mnist_model/0001/keras_metadata.pb', 'my_mnist_model/0001/saved_model.pb', 'my_mnist_model/0001/variables', 'my_mnist_model/0001/variables/variables.data-00000-of-00001', 'my_mnist_model/0001/variables/variables.index', 'my_mnist_model/0002', 'my_mnist_model/0002/assets', 'my_mnist_model/0002/keras_metadata.pb', 'my_mnist_model/0002/saved_model.pb', 'my_mnist_model/0002/variables', 'my_mnist_model/0002/variables/variables.data-00000-of-00001', 'my_mnist_model/0002/variables/variables.index']
Warning: You may need to wait a minute before the new model is loaded by TensorFlow Serving.
import requests
server_url = "http://localhost:8501/v1/models/my_mnist_model:predict"
response = requests.post(server_url, data=request_json)
response.raise_for_status()
response = response.json()
response.keys()
dict_keys(['predictions'])
y_proba = np.array(response["predictions"])
y_proba.round(2)
array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ], [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ], [0. , 0.99, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ]])
Follow the instructions in the book to create a Google Cloud Platform account and activate the Vertex AI and Cloud Storage APIs. Then, if you're running this notebook in Colab, you can run the following cell to authenticate using the same Google account as you used with Google Cloud Platform, and authorize this Colab to access your data.
WARNING: only do this if you trust this notebook!
If you are not running this notebook in Colab, you must follow the instructions in the book to create a service account and generate a key for it, download it to this notebook's directory, and name it my_service_account_key.json
(or make sure the GOOGLE_APPLICATION_CREDENTIALS
environment variable points to your key).
project_id = "my_project" ##### CHANGE THIS TO YOUR PROJECT ID #####
if "google.colab" in sys.modules:
from google.colab import auth
auth.authenticate_user()
elif "kaggle_secrets" in sys.modules:
from kaggle_secrets import UserSecretsClient
UserSecretsClient().set_gcloud_credentials(project=project_id)
else:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_service_account_key.json"
from google.cloud import storage
bucket_name = "my_bucket" ##### CHANGE THIS TO A UNIQUE BUCKET NAME #####
location = "us-central1"
storage_client = storage.Client(project=project_id)
bucket = storage_client.create_bucket(bucket_name, location=location)
#bucket = storage_client.bucket(bucket_name) # to reuse a bucket instead
def upload_directory(bucket, dirpath):
dirpath = Path(dirpath)
for filepath in dirpath.glob("**/*"):
if filepath.is_file():
blob = bucket.blob(filepath.relative_to(dirpath.parent).as_posix())
blob.upload_from_filename(filepath)
upload_directory(bucket, "my_mnist_model")
# extra code – a much faster multithreaded implementation of upload_directory()
# which also accepts a prefix for the target path, and prints stuff
from concurrent import futures
def upload_file(bucket, filepath, blob_path):
blob = bucket.blob(blob_path)
blob.upload_from_filename(filepath)
def upload_directory(bucket, dirpath, prefix=None, max_workers=50):
dirpath = Path(dirpath)
prefix = prefix or dirpath.name
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_filepath = {
executor.submit(
upload_file,
bucket, filepath,
f"{prefix}/{filepath.relative_to(dirpath).as_posix()}"
): filepath
for filepath in sorted(dirpath.glob("**/*"))
if filepath.is_file()
}
for future in futures.as_completed(future_to_filepath):
filepath = future_to_filepath[future]
try:
result = future.result()
except Exception as ex:
print(f"Error uploading {filepath!s:60}: {ex}") # f!s is str(f)
else:
print(f"Uploaded {filepath!s:60}", end="\r")
print(f"Uploaded {dirpath!s:60}")
Alternatively, if you installed Google Cloud CLI (it's preinstalled on Colab), then you can use the following gsutil
command:
#!gsutil -m cp -r my_mnist_model gs://{bucket_name}/
from google.cloud import aiplatform
server_image = "gcr.io/cloud-aiplatform/prediction/tf2-gpu.2-8:latest"
aiplatform.init(project=project_id, location=location)
mnist_model = aiplatform.Model.upload(
display_name="mnist",
artifact_uri=f"gs://{bucket_name}/my_mnist_model/0001",
serving_container_image_uri=server_image,
)
Creating Model Create Model backing LRO: projects/522977795627/locations/us-central1/models/4798114811986575360/operations/53403898236370944 Model created. Resource name: projects/522977795627/locations/us-central1/models/4798114811986575360 To use this Model in another session: model = aiplatform.Model('projects/522977795627/locations/us-central1/models/4798114811986575360')
Warning: this cell may take several minutes to run, as it waits for Vertex AI to provision the compute nodes:
endpoint = aiplatform.Endpoint.create(display_name="mnist-endpoint")
endpoint.deploy(
mnist_model,
min_replica_count=1,
max_replica_count=5,
machine_type="n1-standard-4",
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=1
)
Creating Endpoint Create Endpoint backing LRO: projects/522977795627/locations/us-central1/endpoints/5133373499481522176/operations/4135354010494304256 Endpoint created. Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176 To use this Endpoint in another session: endpoint = aiplatform.Endpoint('projects/522977795627/locations/us-central1/endpoints/5133373499481522176') Deploying Model projects/522977795627/locations/us-central1/models/4798114811986575360 to Endpoint : projects/522977795627/locations/us-central1/endpoints/5133373499481522176 Deploy Endpoint model backing LRO: projects/522977795627/locations/us-central1/endpoints/5133373499481522176/operations/388359120522051584 Endpoint model deployed. Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176
response = endpoint.predict(instances=X_new.tolist())
import numpy as np
np.round(response.predictions, 2)
array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ], [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ], [0. , 0.97, 0.01, 0. , 0. , 0. , 0. , 0.01, 0. , 0. ]])
endpoint.undeploy_all() # undeploy all models from the endpoint
endpoint.delete()
Undeploying Endpoint model: projects/522977795627/locations/us-central1/endpoints/5133373499481522176 Undeploy Endpoint model backing LRO: projects/522977795627/locations/us-central1/endpoints/5133373499481522176/operations/3579722406467469312 Endpoint model undeployed. Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176 Deleting Endpoint : projects/522977795627/locations/us-central1/endpoints/5133373499481522176 Delete Endpoint backing LRO: projects/522977795627/locations/us-central1/operations/4738836360561950720 Endpoint deleted. . Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176
batch_path = Path("my_mnist_batch")
batch_path.mkdir(exist_ok=True)
with open(batch_path / "my_mnist_batch.jsonl", "w") as jsonl_file:
for image in X_test[:100].tolist():
jsonl_file.write(json.dumps(image))
jsonl_file.write("\n")
upload_directory(bucket, batch_path)
Uploaded my_mnist_batch
batch_prediction_job = mnist_model.batch_predict(
job_display_name="my_batch_prediction_job",
machine_type="n1-standard-4",
starting_replica_count=1,
max_replica_count=5,
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=1,
gcs_source=[f"gs://{bucket_name}/{batch_path.name}/my_mnist_batch.jsonl"],
gcs_destination_prefix=f"gs://{bucket_name}/my_mnist_predictions/",
sync=True # set to False if you don't want to wait for completion
)
Creating BatchPredictionJob BatchPredictionJob created. Resource name: projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 To use this BatchPredictionJob in another session: bpj = aiplatform.BatchPredictionJob('projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544') View Batch Prediction Job: https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/4346926367237996544?project=522977795627 BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_PENDING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_RUNNING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_RUNNING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_RUNNING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_RUNNING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_RUNNING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_RUNNING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_RUNNING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_RUNNING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_RUNNING BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state: JobState.JOB_STATE_SUCCEEDED BatchPredictionJob run completed. Resource name: projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544
batch_prediction_job.output_info # extra code – shows the output directory
gcs_output_directory: "gs://my_bucket/my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z"
y_probas = []
for blob in batch_prediction_job.iter_outputs():
print(blob.name) # extra code
if "prediction.results" in blob.name:
for line in blob.download_as_text().splitlines():
y_proba = json.loads(line)["prediction"]
y_probas.append(y_proba)
my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z/prediction.errors_stats-00000-of-00001 my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z/prediction.results-00000-of-00002 my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z/prediction.results-00001-of-00002
y_pred = np.argmax(y_probas, axis=1)
accuracy = np.sum(y_pred == y_test[:100]) / 100
accuracy
0.98
mnist_model.delete()
Deleting Model : projects/522977795627/locations/us-central1/models/4798114811986575360 Delete Model backing LRO: projects/522977795627/locations/us-central1/operations/598902403101622272 Model deleted. . Resource name: projects/522977795627/locations/us-central1/models/4798114811986575360
Let's delete all the directories we created on GCS (i.e., all the blobs with these prefixes):
for prefix in ["my_mnist_model/", "my_mnist_batch/", "my_mnist_predictions/"]:
blobs = bucket.list_blobs(prefix=prefix)
for blob in blobs:
blob.delete()
#bucket.delete() # uncomment and run if you want to delete the bucket itself
batch_prediction_job.delete()
Deleting BatchPredictionJob : projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 Delete BatchPredictionJob backing LRO: projects/522977795627/locations/us-central1/operations/6699028098374959104 BatchPredictionJob deleted. . Resource name: projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544
converter = tf.lite.TFLiteConverter.from_saved_model(str(model_path))
tflite_model = converter.convert()
with open("my_converted_savedmodel.tflite", "wb") as f:
f.write(tflite_model)
2022-04-10 09:03:52.237094: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:357] Ignored output_format. 2022-04-10 09:03:52.237108: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:360] Ignored drop_control_dependency. WARNING:absl:Buffer deduplication procedure will be skipped when flatbuffer library is not properly loaded 2022-04-10 09:03:52.237830: I tensorflow/cc/saved_model/reader.cc:43] Reading SavedModel from: my_mnist_model/0001 2022-04-10 09:03:52.238869: I tensorflow/cc/saved_model/reader.cc:78] Reading meta graph with tags { serve } 2022-04-10 09:03:52.238881: I tensorflow/cc/saved_model/reader.cc:119] Reading SavedModel debug info (if present) from: my_mnist_model/0001 2022-04-10 09:03:52.242108: I tensorflow/cc/saved_model/loader.cc:228] Restoring SavedModel bundle. 2022-04-10 09:03:52.263868: I tensorflow/cc/saved_model/loader.cc:212] Running initialization op on SavedModel bundle at path: my_mnist_model/0001 2022-04-10 09:03:52.271298: I tensorflow/cc/saved_model/loader.cc:301] SavedModel load for tags { serve }; Status: success: OK. Took 33470 microseconds. 2022-04-10 09:03:52.281694: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:237] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
# extra code – shows how to convert a Keras model
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
with open("my_converted_keras_model.tflite", "wb") as f:
f.write(tflite_model)
INFO:tensorflow:Assets written to: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs/assets
INFO:tensorflow:Assets written to: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs/assets WARNING:absl:Buffer deduplication procedure will be skipped when flatbuffer library is not properly loaded 2022-04-10 09:26:30.319286: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:357] Ignored output_format. 2022-04-10 09:26:30.319301: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:360] Ignored drop_control_dependency. 2022-04-10 09:26:30.319417: I tensorflow/cc/saved_model/reader.cc:43] Reading SavedModel from: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs 2022-04-10 09:26:30.320420: I tensorflow/cc/saved_model/reader.cc:78] Reading meta graph with tags { serve } 2022-04-10 09:26:30.320431: I tensorflow/cc/saved_model/reader.cc:119] Reading SavedModel debug info (if present) from: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs 2022-04-10 09:26:30.323773: I tensorflow/cc/saved_model/loader.cc:228] Restoring SavedModel bundle. 2022-04-10 09:26:30.345416: I tensorflow/cc/saved_model/loader.cc:212] Running initialization op on SavedModel bundle at path: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs 2022-04-10 09:26:30.354270: I tensorflow/cc/saved_model/loader.cc:301] SavedModel load for tags { serve }; Status: success: OK. Took 34852 microseconds. 2022-04-10 09:26:30.392352: I tensorflow/lite/tools/optimize/quantize_weights.cc:225] Skipping quantization of tensor sequential/dense_1/MatMul because it has fewer than 1024 elements (1000).
Code examples for this section are hosted on glitch.com, a website that lets you create Web apps for free.
** https://homl.info/wpacode: this WPA's source code.
** https://www.tensorflow.org/js/demos: some fun demos.
Let's check that TensorFlow can see the GPU:
physical_gpus = tf.config.list_physical_devices("GPU")
physical_gpus
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
If you want your TensorFlow script to use only GPUs #0 and #1 (based on PCI order), then you can set the environment variables CUDA_DEVICE_ORDER=PCI_BUS_ID
and CUDA_VISIBLE_DEVICES=0,1
before starting your script, or in the script itself before using TensorFlow.
To limit the amount of RAM to 2GB per GPU:
#for gpu in physical_gpus:
# tf.config.set_logical_device_configuration(
# gpu,
# [tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
# )
To make TensorFlow grab memory as it needs it (only releasing it when the process shuts down):
#for gpu in physical_gpus:
# tf.config.experimental.set_memory_growth(gpu, True)
Equivalently, you can set the TF_FORCE_GPU_ALLOW_GROWTH
environment variable to true
before using TensorFlow.
To split a physical GPU into two logical GPUs:
#tf.config.set_logical_device_configuration(
# physical_gpus[0],
# [tf.config.LogicalDeviceConfiguration(memory_limit=2048),
# tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
#)
logical_gpus = tf.config.list_logical_devices("GPU")
logical_gpus
[LogicalDevice(name='/device:GPU:0', device_type='GPU')]
To log every variable and operation placement (this must be run just after importing TensorFlow):
#tf.get_logger().setLevel("DEBUG") # log level is INFO by default
#tf.debugging.set_log_device_placement(True)
a = tf.Variable([1., 2., 3.]) # float32 variable goes to the GPU
a.device
'/job:localhost/replica:0/task:0/device:GPU:0'
b = tf.Variable([1, 2, 3]) # int32 variable goes to the CPU
b.device
'/job:localhost/replica:0/task:0/device:CPU:0'
You can place variables and operations manually on the desired device using a tf.device()
context:
with tf.device("/cpu:0"):
c = tf.Variable([1., 2., 3.])
c.device
'/job:localhost/replica:0/task:0/device:CPU:0'
If you specify a device that does not exist, or for which there is no kernel, TensorFlow will silently fallback to the default placement:
# extra code
with tf.device("/gpu:1234"):
d = tf.Variable([1., 2., 3.])
d.device
"'/job:localhost/replica:0/task:0/device:GPU:0'"
If you want TensorFlow to throw an exception when you try to use a device that does not exist, instead of falling back to the default device:
tf.config.set_soft_device_placement(False)
# extra code
try:
with tf.device("/gpu:1000"):
d = tf.Variable([1., 2., 3.])
except tf.errors.InvalidArgumentError as ex:
print(ex)
tf.config.set_soft_device_placement(True) # extra code – back to soft placement
Could not satisfy device specification '/job:localhost/replica:0/task:0/device:GPU:1000'. enable_soft_placement=0. Supported device types [CPU]. All available devices [/job:localhost/replica:0/task:0/device:CPU:0].
If you want to set the number of inter-op or intra-op threads (this may be useful if you want to avoid saturating the CPU, or if you want to make TensorFlow single-threaded, to run a perfectly reproducible test case):
#tf.config.threading.set_inter_op_parallelism_threads(10)
#tf.config.threading.set_intra_op_parallelism_threads(10)
# extra code – creates a CNN model for MNIST using Keras
def create_model():
return tf.keras.Sequential([
tf.keras.layers.Reshape([28, 28, 1], input_shape=[28, 28],
dtype=tf.uint8),
tf.keras.layers.Rescaling(scale=1 / 255),
tf.keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
padding="same"),
tf.keras.layers.MaxPooling2D(pool_size=2),
tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
padding="same"),
tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
padding="same"),
tf.keras.layers.MaxPooling2D(pool_size=2),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(units=64, activation="relu"),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(units=10, activation="softmax"),
])
tf.random.set_seed(42)
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
model = create_model() # create a Keras model normally
model.compile(loss="sparse_categorical_crossentropy",
optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),
metrics=["accuracy"]) # compile the model normally
batch_size = 100 # preferably divisible by the number of replicas
model.fit(X_train, y_train, epochs=10,
validation_data=(X_valid, y_valid), batch_size=batch_size)
type(model.weights[0])
tensorflow.python.distribute.values.MirroredVariable
model.predict(X_new).round(2) # extra code – the batch is split across all replicas
array([[0., 0., 0., 0., 0., 0., 0., 1., 0., 0.], [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.], [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.]], dtype=float32)
# extra code – shows that saving a model does not preserve its distribution
# strategy
model.save("my_mirrored_model", save_format="tf")
model = tf.keras.models.load_model("my_mirrored_model")
type(model.weights[0])
INFO:tensorflow:Assets written to: my_mirrored_model/assets
tensorflow.python.ops.resource_variable_ops.ResourceVariable
with strategy.scope():
model = tf.keras.models.load_model("my_mirrored_model")
type(model.weights[0])
tensorflow.python.distribute.values.MirroredVariable
If you want to specify the list of GPUs to use:
strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1 INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
If you want to change the default all-reduce algorithm:
strategy = tf.distribute.MirroredStrategy(
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
If you want to use the CentralStorageStrategy
:
strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:CPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:CPU:0'
# To train on a TPU in Google Colab:
#if "google.colab" in sys.modules and "COLAB_TPU_ADDR" in os.environ:
# tpu_address = "grpc://" + os.environ["COLAB_TPU_ADDR"]
#else:
# tpu_address = ""
#resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_address)
#tf.config.experimental_connect_to_cluster(resolver)
#tf.tpu.experimental.initialize_tpu_system(resolver)
#strategy = tf.distribute.experimental.TPUStrategy(resolver)
A TensorFlow cluster is a group of TensorFlow processes running in parallel, usually on different machines, and talking to each other to complete some work, for example training or executing a neural network. Each TF process in the cluster is called a "task" (or a "TF server"). It has an IP address, a port, and a type (also called its role or its job). The type can be "worker"
, "chief"
, "ps"
(parameter server) or "evaluator"
:
The set of tasks that share the same type is often called a "job". For example, the "worker" job is the set of all workers.
To start a TensorFlow cluster, you must first define it. This means specifying all the tasks (IP address, TCP port, and type). For example, the following cluster specification defines a cluster with 3 tasks (2 workers and 1 parameter server). It's a dictionary with one key per job, and the values are lists of task addresses:
cluster_spec = {
"worker": [
"machine-a.example.com:2222", # /job:worker/task:0
"machine-b.example.com:2222" # /job:worker/task:1
],
"ps": ["machine-a.example.com:2221"] # /job:ps/task:0
}
Every task in the cluster may communicate with every other task in the server, so make sure to configure your firewall to authorize all communications between these machines on these ports (it's usually simpler if you use the same port on every machine).
When a task is started, it needs to be told which one it is: its type and index (the task index is also called the task id). A common way to specify everything at once (both the cluster spec and the current task's type and id) is to set the TF_CONFIG
environment variable before starting the program. It must be a JSON-encoded dictionary containing a cluster specification (under the "cluster"
key), and the type and index of the task to start (under the "task"
key). For example, the following TF_CONFIG
environment variable defines the same cluster as above, with 2 workers and 1 parameter server, and specifies that the task to start is worker #0:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": cluster_spec,
"task": {"type": "worker", "index": 0}
})
Some platforms (e.g., Google Vertex AI) automatically set this environment variable for you.
TensorFlow's TFConfigClusterResolver
class reads the cluster configuration from this environment variable:
resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
resolver.cluster_spec()
ClusterSpec({'ps': ['machine-a.example.com:2221'], 'worker': ['machine-a.example.com:2222', 'machine-b.example.com:2222']})
resolver.task_type
'worker'
resolver.task_id
0
Now let's run a simpler cluster with just two worker tasks, both running on the local machine. We will use the MultiWorkerMirroredStrategy
to train a model across these two tasks.
The first step is to write the training code. As this code will be used to run both workers, each in its own process, we write this code to a separate Python file, my_mnist_multiworker_task.py
. The code is relatively straightforward, but there are a couple important things to note:
MultiWorkerMirroredStrategy
before doing anything else with TensorFlow.%%writefile my_mnist_multiworker_task.py
import tempfile
import tensorflow as tf
strategy = tf.distribute.MultiWorkerMirroredStrategy() # at the start!
resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
print(f"Starting task {resolver.task_type} #{resolver.task_id}")
# extra code – Load and split the MNIST dataset
mnist = tf.keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Reshape([28, 28, 1], input_shape=[28, 28],
dtype=tf.uint8),
tf.keras.layers.Rescaling(scale=1 / 255),
tf.keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
padding="same", input_shape=[28, 28, 1]),
tf.keras.layers.MaxPooling2D(pool_size=2),
tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
padding="same"),
tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
padding="same"),
tf.keras.layers.MaxPooling2D(pool_size=2),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(units=64, activation="relu"),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(units=10, activation="softmax"),
])
model.compile(loss="sparse_categorical_crossentropy",
optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),
metrics=["accuracy"])
model.fit(X_train, y_train, validation_data=(X_valid, y_valid), epochs=10)
if resolver.task_id == 0: # the chief saves the model to the right location
model.save("my_mnist_multiworker_model", save_format="tf")
else:
tmpdir = tempfile.mkdtemp() # other workers save to a temporary directory
model.save(tmpdir, save_format="tf")
tf.io.gfile.rmtree(tmpdir) # and we can delete this directory at the end!
Writing my_mnist_multiworker_task.py
In a real world application, there would typically be a single worker per machine, but in this example we're running both workers on the same machine, so they will both try to use all the available GPU RAM (if this machine has a GPU), and this will likely lead to an Out-Of-Memory (OOM) error. To avoid this, we could use the CUDA_VISIBLE_DEVICES
environment variable to assign a different GPU to each worker. Alternatively, we can simply disable GPU support, by setting CUDA_VISIBLE_DEVICES
to an empty string.
We are now ready to start both workers, each in its own process. Notice that we change the task index:
%%bash --bg
export CUDA_VISIBLE_DEVICES=''
export TF_CONFIG='{"cluster": {"worker": ["127.0.0.1:9901", "127.0.0.1:9902"]},
"task": {"type": "worker", "index": 0}}'
python my_mnist_multiworker_task.py > my_worker_0.log 2>&1
%%bash --bg
export CUDA_VISIBLE_DEVICES=''
export TF_CONFIG='{"cluster": {"worker": ["127.0.0.1:9901", "127.0.0.1:9902"]},
"task": {"type": "worker", "index": 1}}'
python my_mnist_multiworker_task.py > my_worker_1.log 2>&1
Note: if you get warnings about AutoShardPolicy
, you can safely ignore them. See TF issue #42146 for more details.
That's it! Our TensorFlow cluster is now running, but we can't see it in this notebook because it's running in separate processes (but you can see the progress in my_worker_*.log
).
Since the chief (worker #0) is writing to TensorBoard, we use TensorBoard to view the training progress. Run the following cell, then click on the settings button (i.e., the gear icon) in the TensorBoard interface and check the "Reload data" box to make TensorBoard automatically refresh every 30s. Once the first epoch of training is finished (which may take a few minutes), and once TensorBoard refreshes, the SCALARS tab will appear. Click on this tab to view the progress of the model's training and validation accuracy.
%load_ext tensorboard
%tensorboard --logdir=./my_mnist_multiworker_logs --port=6006
# strategy = tf.distribute.MultiWorkerMirroredStrategy(
# communication_options=tf.distribute.experimental.CommunicationOptions(
# implementation=tf.distribute.experimental.CollectiveCommunication.NCCL))
Let's copy the training script, but add import os
and change the save path to be the GCS path that the AIP_MODEL_DIR
environment variable will point to:
%%writefile my_vertex_ai_training_task.py
import os
from pathlib import Path
import tempfile
import tensorflow as tf
strategy = tf.distribute.MultiWorkerMirroredStrategy() # at the start!
resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if resolver.task_type == "chief":
model_dir = os.getenv("AIP_MODEL_DIR") # paths provided by Vertex AI
tensorboard_log_dir = os.getenv("AIP_TENSORBOARD_LOG_DIR")
checkpoint_dir = os.getenv("AIP_CHECKPOINT_DIR")
else:
tmp_dir = Path(tempfile.mkdtemp()) # other workers use a temporary dirs
model_dir = tmp_dir / "model"
tensorboard_log_dir = tmp_dir / "logs"
checkpoint_dir = tmp_dir / "ckpt"
callbacks = [tf.keras.callbacks.TensorBoard(tensorboard_log_dir),
tf.keras.callbacks.ModelCheckpoint(checkpoint_dir)]
# extra code – Load and prepare the MNIST dataset
mnist = tf.keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
# extra code – build and compile the Keras model using the distribution strategy
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Reshape([28, 28, 1], input_shape=[28, 28],
dtype=tf.uint8),
tf.keras.layers.Lambda(lambda X: X / 255),
tf.keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
padding="same", input_shape=[28, 28, 1]),
tf.keras.layers.MaxPooling2D(pool_size=2),
tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
padding="same"),
tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
padding="same"),
tf.keras.layers.MaxPooling2D(pool_size=2),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(units=64, activation="relu"),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(units=10, activation="softmax"),
])
model.compile(loss="sparse_categorical_crossentropy",
optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),
metrics=["accuracy"])
model.fit(X_train, y_train, validation_data=(X_valid, y_valid), epochs=10,
callbacks=callbacks)
model.save(model_dir, save_format="tf")
Writing my_vertex_ai_training_task.py
custom_training_job = aiplatform.CustomTrainingJob(
display_name="my_custom_training_job",
script_path="my_vertex_ai_training_task.py",
container_uri="gcr.io/cloud-aiplatform/training/tf-gpu.2-4:latest",
model_serving_container_image_uri=server_image,
requirements=["gcsfs==2022.3.0"], # not needed, this is just an example
staging_bucket=f"gs://{bucket_name}/staging"
)
mnist_model2 = custom_training_job.run(
machine_type="n1-standard-4",
replica_count=2,
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=2,
)
Training script copied to: gs://my_bucket/aiplatform-2022-04-14-10:08:24.124-aiplatform_custom_trainer_script-0.1.tar.gz. Training Output directory: gs://my_bucket/aiplatform-custom-training-2022-04-14-10:08:25.226 View Training: https://console.cloud.google.com/ai/platform/locations/us-central1/training/5407999068506947584?project=522977795627 CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state: PipelineState.PIPELINE_STATE_PENDING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state: PipelineState.PIPELINE_STATE_RUNNING View backing custom job: https://console.cloud.google.com/ai/platform/locations/us-central1/training/6685701948726837248?project=522977795627 CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob run completed. Resource name: projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 Model available at projects/522977795627/locations/us-central1/models/9094548856498028544
Let's clean up:
mnist_model2.delete()
custom_training_job.delete()
blobs = bucket.list_blobs(prefix=f"gs://{bucket_name}/staging/")
for blob in blobs:
blob.delete()
%%writefile my_vertex_ai_trial.py
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--n_hidden", type=int, default=2)
parser.add_argument("--n_neurons", type=int, default=256)
parser.add_argument("--learning_rate", type=float, default=1e-2)
parser.add_argument("--optimizer", default="adam")
args = parser.parse_args()
import tensorflow as tf
def build_model(args):
with tf.distribute.MirroredStrategy().scope():
model = tf.keras.Sequential()
model.add(tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8))
for _ in range(args.n_hidden):
model.add(tf.keras.layers.Dense(args.n_neurons, activation="relu"))
model.add(tf.keras.layers.Dense(10, activation="softmax"))
opt = tf.keras.optimizers.get(args.optimizer)
opt.learning_rate = args.learning_rate
model.compile(loss="sparse_categorical_crossentropy", optimizer=opt,
metrics=["accuracy"])
return model
# extra code – loads and splits the dataset
mnist = tf.keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
# extra code – use the AIP_* environment variable and create the callbacks
import os
model_dir = os.getenv("AIP_MODEL_DIR")
tensorboard_log_dir = os.getenv("AIP_TENSORBOARD_LOG_DIR")
checkpoint_dir = os.getenv("AIP_CHECKPOINT_DIR")
trial_id = os.getenv("CLOUD_ML_TRIAL_ID")
tensorboard_cb = tf.keras.callbacks.TensorBoard(tensorboard_log_dir)
early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=5)
callbacks = [tensorboard_cb, early_stopping_cb]
model = build_model(args)
history = model.fit(X_train, y_train, validation_data=(X_valid, y_valid),
epochs=10, callbacks=callbacks)
model.save(model_dir, save_format="tf") # extra code
import hypertune
hypertune = hypertune.HyperTune()
hypertune.report_hyperparameter_tuning_metric(
hyperparameter_metric_tag="accuracy", # name of the reported metric
metric_value=max(history.history["val_accuracy"]), # max accuracy value
global_step=model.optimizer.iterations.numpy(),
)
Writing my_vertex_ai_trial.py
trial_job = aiplatform.CustomJob.from_local_script(
display_name="my_search_trial_job",
script_path="my_vertex_ai_trial.py", # path to your training script
container_uri="gcr.io/cloud-aiplatform/training/tf-gpu.2-4:latest",
staging_bucket=f"gs://{bucket_name}/staging",
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=2, # in this example, each trial will have 2 GPUs
)
Training script copied to: gs://homl3-mybucket5/staging/aiplatform-2022-04-18-18:14:02.860-aiplatform_custom_trainer_script-0.1.tar.gz.
from google.cloud.aiplatform import hyperparameter_tuning as hpt
hp_job = aiplatform.HyperparameterTuningJob(
display_name="my_hp_search_job",
custom_job=trial_job,
metric_spec={"accuracy": "maximize"},
parameter_spec={
"learning_rate": hpt.DoubleParameterSpec(min=1e-3, max=10, scale="log"),
"n_neurons": hpt.IntegerParameterSpec(min=1, max=300, scale="linear"),
"n_hidden": hpt.IntegerParameterSpec(min=1, max=10, scale="linear"),
"optimizer": hpt.CategoricalParameterSpec(["sgd", "adam"]),
},
max_trial_count=100,
parallel_trial_count=20,
)
hp_job.run()
Creating HyperparameterTuningJob HyperparameterTuningJob created. Resource name: projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 To use this HyperparameterTuningJob in another session: hpt_job = aiplatform.HyperparameterTuningJob.get('projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568') View HyperparameterTuningJob: https://console.cloud.google.com/ai/platform/locations/us-central1/training/5825136187899117568?project=522977795627 HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state: JobState.JOB_STATE_RUNNING HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state: JobState.JOB_STATE_RUNNING HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state: JobState.JOB_STATE_RUNNING HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state: JobState.JOB_STATE_RUNNING HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state: JobState.JOB_STATE_RUNNING HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state: JobState.JOB_STATE_RUNNING HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state: JobState.JOB_STATE_RUNNING HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state: JobState.JOB_STATE_RUNNING HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state: JobState.JOB_STATE_SUCCEEDED HyperparameterTuningJob run completed. Resource name: projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568
def get_final_metric(trial, metric_id):
for metric in trial.final_measurement.metrics:
if metric.metric_id == metric_id:
return metric.value
trials = hp_job.trials
trial_accuracies = [get_final_metric(trial, "accuracy") for trial in trials]
best_trial = trials[np.argmax(trial_accuracies)]
max(trial_accuracies)
0.977400004863739
best_trial.id
'98'
best_trial.parameters
[parameter_id: "learning_rate" value { number_value: 0.001 } , parameter_id: "n_hidden" value { number_value: 8.0 } , parameter_id: "n_neurons" value { number_value: 216.0 } , parameter_id: "optimizer" value { string_value: "adam" } ]
Instead of using Vertex AI's hyperparameter tuning service, you can use Keras Tuner (introduced in Chapter 10) and run it on Vertex AI VMs. Keras Tuner provides a simple way to scale hyperparameter search by distributing it across multiple machines: it only requires setting three environment variables on each machine, then running your regular Keras Tuner code on each machine. You can use the exact same script on all machines. One of the machines acts as the chief, and the others act as workers. Each worker asks the chief which hyperparameter values to try—it acts as the oracle—then the worker trains the model using these hyperparameter values, and finally it reports the model's performance back to the chief, which can then decide which hyperparameter values the worker should try next.
The three environment variables you need to set on each machine are:
KERASTUNER_TUNER_ID
: equal to "chief"
on the chief machine, or a unique identifier on each worker machine, such as "worker0"
, "worker1"
, etc.KERASTUNER_ORACLE_IP
: the IP address or hostname of the chief machine. The chief itself should generally use "0.0.0.0"
to listen on every IP address on the machine.KERASTUNER_ORACLE_PORT
: the TCP port that the chief will be listening on.You can use distributed Keras Tuner on any set of machines. If you want to run it on Vertex AI machines, then you can spawn a regular training job, and just modify the training script to set the three environment variables properly before using Keras Tuner.
For example, the script below starts by parsing the TF_CONFIG
environment variable, which will be automatically set by Vertex AI, just like earlier. It finds the address of the task of type "chief"
, and it extracts the IP address or hostname, and the TCP port. It then defines the tuner ID as the task type followed by the task index, for example "worker0"
. If the tuner ID is "chief0"
, it changes it to "chief"
, and it sets the IP to "0.0.0.0"
: this will make it listen on all IPv4 address on its machine. Then it defines the environment variables for Keras Tuner. Next, the script creates a tuner, just like in Chapter 10, the it runs the search, and finally it saves the best model to the location given by Vertex AI:
%%writefile my_keras_tuner_search.py
import json
import os
tf_config = json.loads(os.environ["TF_CONFIG"])
chief_ip, chief_port = tf_config["cluster"]["chief"][0].rsplit(":", 1)
tuner_id = f'{tf_config["task"]["type"]}{tf_config["task"]["index"]}'
if tuner_id == "chief0":
tuner_id = "chief"
chief_ip = "0.0.0.0"
# extra code – since the chief doesn't work much, you can optimize compute
# resources by running a worker on the same machine. To do this, you can
# just make the chief start another process, after tweaking the TF_CONFIG
# environment variable to set the task type to "worker" and the task index
# to a unique value. Uncomment the next few lines to give this a try:
# import subprocess
# import sys
# tf_config["task"]["type"] = "workerX" # the worker on the chief's machine
# os.environ["TF_CONFIG"] = json.dumps(tf_config)
# subprocess.Popen([sys.executable] + sys.argv,
# stdout=sys.stdout, stderr=sys.stderr)
os.environ["KERASTUNER_TUNER_ID"] = tuner_id
os.environ["KERASTUNER_ORACLE_IP"] = chief_ip
os.environ["KERASTUNER_ORACLE_PORT"] = chief_port
from pathlib import Path
import keras_tuner as kt
import tensorflow as tf
gcs_path = "/gcs/my_bucket/my_hp_search" # replace with your bucket's name
def build_model(hp):
n_hidden = hp.Int("n_hidden", min_value=0, max_value=8, default=2)
n_neurons = hp.Int("n_neurons", min_value=16, max_value=256)
learning_rate = hp.Float("learning_rate", min_value=1e-4, max_value=1e-2,
sampling="log")
optimizer = hp.Choice("optimizer", values=["sgd", "adam"])
if optimizer == "sgd":
optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)
else:
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
model = tf.keras.Sequential()
model.add(tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8))
for _ in range(n_hidden):
model.add(tf.keras.layers.Dense(n_neurons, activation="relu"))
model.add(tf.keras.layers.Dense(10, activation="softmax"))
model.compile(loss="sparse_categorical_crossentropy",
optimizer=optimizer,
metrics=["accuracy"])
return model
hyperband_tuner = kt.Hyperband(
build_model, objective="val_accuracy", seed=42,
max_epochs=10, factor=3, hyperband_iterations=2,
distribution_strategy=tf.distribute.MirroredStrategy(),
directory=gcs_path, project_name="mnist")
# extra code – Load and split the MNIST dataset
mnist = tf.keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
tensorboard_log_dir = os.environ["AIP_TENSORBOARD_LOG_DIR"] + "/" + tuner_id
tensorboard_cb = tf.keras.callbacks.TensorBoard(tensorboard_log_dir)
early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=5)
hyperband_tuner.search(X_train, y_train, epochs=10,
validation_data=(X_valid, y_valid),
callbacks=[tensorboard_cb, early_stopping_cb])
if tuner_id == "chief":
best_hp = hyperband_tuner.get_best_hyperparameters()[0]
best_model = hyperband_tuner.hypermodel.build(best_hp)
best_model.save(os.getenv("AIP_MODEL_DIR"), save_format="tf")
Writing my_keras_tuner_search.py
Note that Vertex AI automatically mounts the /gcs
directory to GCS, using the open source GCS Fuse adapter. This gives us a shared directory across the workers and the chief, which is required by Keras Tuner. Also note that we set the distribution strategy to a MirroredStrategy
. This will allow each worker to use all the GPUs on its machine, if there's more than one.
Replace /gcs/my_bucket/
with /gcs/{bucket_name}/
:
with open("my_keras_tuner_search.py") as f:
script = f.read()
with open("my_keras_tuner_search.py", "w") as f:
f.write(script.replace("/gcs/my_bucket/", f"/gcs/{bucket_name}/"))
Now all we need to do is to start a custom training job based on this script, exactly like in the previous section. Don't forget to add keras-tuner
to the list of requirements
:
hp_search_job = aiplatform.CustomTrainingJob(
display_name="my_hp_search_job",
script_path="my_keras_tuner_search.py",
container_uri="gcr.io/cloud-aiplatform/training/tf-gpu.2-4:latest",
model_serving_container_image_uri=server_image,
requirements=["keras-tuner~=1.1.2"],
staging_bucket=f"gs://{bucket_name}/staging",
)
mnist_model3 = hp_search_job.run(
machine_type="n1-standard-4",
replica_count=3,
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=2,
)
Training script copied to: gs://my_bucket/staging/aiplatform-2022-04-15-13:34:32.591-aiplatform_custom_trainer_script-0.1.tar.gz. Training Output directory: gs://my_bucket/staging/aiplatform-custom-training-2022-04-15-13:34:34.453 View Training: https://console.cloud.google.com/ai/platform/locations/us-central1/training/8601543785521872896?project=522977795627 View backing custom job: https://console.cloud.google.com/ai/platform/locations/us-central1/training/5022607048831926272?project=522977795627 CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state: PipelineState.PIPELINE_STATE_RUNNING CustomTrainingJob run completed. Resource name: projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 Model available at projects/522977795627/locations/us-central1/models/8176544832480168612
And we have a model!
Let's clean up:
mnist_model3.delete()
hp_search_job.delete()
blobs = bucket.list_blobs(prefix=f"gs://{bucket_name}/staging/")
for blob in blobs:
blob.delete()
Let's start by exporting the MNIST dataset to PNG images, and prepare an import.csv
pointing to each image, and indicating the split (training, validation, or test) and the label:
import matplotlib.pyplot as plt
mnist_path = Path("datasets/mnist")
mnist_path.mkdir(parents=True, exist_ok=True)
idx = 0
with open(mnist_path / "import.csv", "w") as import_csv:
for split, X, y in zip(("training", "validation", "test"),
(X_train, X_valid, X_test),
(y_train, y_valid, y_test)):
for image, label in zip(X, y):
print(f"\r{idx + 1}/70000", end="")
filename = f"{idx:05d}.png"
plt.imsave(mnist_path / filename, np.tile(image, 3))
line = f"{split},gs://{bucket_name}/mnist/{filename},{label}\n"
import_csv.write(line)
idx += 1
70000/70000
Let's upload this dataset to GCS:
upload_directory(bucket, mnist_path)
Uploaded datasets/mnist
Now let's create a managed image dataset on Vertex AI:
from aiplatform.schema.dataset.ioformat.image import single_label_classification
mnist_dataset = aiplatform.ImageDataset.create(
display_name="mnist-dataset",
gcs_source=[f"gs://{bucket_name}/mnist/import.csv"],
project=project_id,
import_schema_uri=single_label_classification,
sync=True,
)
Creating ImageDataset Create ImageDataset backing LRO: projects/522977795627/locations/us-central1/datasets/7532459492777132032/operations/3812233931370004480 ImageDataset created. Resource name: projects/522977795627/locations/us-central1/datasets/7532459492777132032 To use this ImageDataset in another session: ds = aiplatform.ImageDataset('projects/522977795627/locations/us-central1/datasets/7532459492777132032') Importing ImageDataset data: projects/522977795627/locations/us-central1/datasets/7532459492777132032 Import ImageDataset data backing LRO: projects/522977795627/locations/us-central1/datasets/7532459492777132032/operations/3010593197698056192 ImageDataset data imported. Resource name: projects/522977795627/locations/us-central1/datasets/7532459492777132032
Create an AutoML training job on this dataset:
TODO
saved_model_cli
or just load it using tf.saved_model.load()
and inspect it in Python.MultiWorkerMirroredStrategy
performs mirrored data parallelism. The model is replicated across all available servers and devices, and each replica gets a different batch of data at each training iteration and computes its own gradients. The mean of the gradients is computed and shared across all replicas using a distributed AllReduce implementation (NCCL by default), and all replicas perform the same Gradient Descent step. This strategy is the simplest to use since all servers and devices are treated in exactly the same way, and it performs fairly well. In general, you should use this strategy. Its main limitation is that it requires the model to fit in RAM on every replica.ParameterServerStrategy
performs asynchronous data parallelism. The model is replicated across all devices on all workers, and the parameters are sharded across all parameter servers. Each worker has its own training loop, running asynchronously with the other workers; at each training iteration, each worker gets its own batch of data and fetches the latest version of the model parameters from the parameter servers, then it computes the gradients of the loss with regard to these parameters, and it sends them to the parameter servers. Lastly, the parameter servers perform a Gradient Descent step using these gradients. This strategy is generally slower than the previous strategy, and a bit harder to deploy, since it requires managing parameter servers. However, it can be useful in some situations, especially when you can take advantage of the asynchronous updates, for example to reduce I/O bottlenecks. This depends on many factors, including hardware, network topology, number of servers, model size, and more, so your mileage may vary.Exercise: Train a model (any model you like) and deploy it to TF Serving or Google Vertex AI. Write the client code to query it using the REST API or the gRPC API. Update the model and deploy the new version. Your client code will now query the new version. Roll back to the first version.
Please follow the steps in the Deploying TensorFlow models to TensorFlow Serving section above.
Exercise: Train any model across multiple GPUs on the same machine using the MirroredStrategy
(if you do not have access to GPUs, you can use Colaboratory with a GPU Runtime and create two virtual GPUs). Train the model again using the CentralStorageStrategy
and compare the training time.
Please follow the steps in the Distributed Training section above.
Exercise: Train a small model on Google Vertex AI, using TensorFlow Cloud Tuner for hyperparameter tuning.
Please follow the instructions in the Hyperparameter Tuning using TensorFlow Cloud Tuner section in the book.
You've reached the end of the book! I hope you found it useful. 😊