Welcome to our distributed end-to-end binary Text-Classification example. In this demo, we will use the Hugging Faces transformers
and datasets
library together with a custom Amazon sagemaker-sdk extension to fine-tune a pre-trained transformer on binary text classification. In particular, the pre-trained model will be fine-tuned using the imdb
dataset. To speed upload Training we are going to use SageMaker distributed Data Parallel library to run our training distributed across multiple gpus. To get started, we need to set up the environment with a few prerequisite steps, for permissions, configurations, and so on.
NOTE: You can run this demo in Sagemaker Studio, your local machine or Sagemaker Notebook Instances
Note: we only install the required libraries from Hugging Face and AWS. You also need PyTorch or Tensorflow, if you haven´t it installed
!pip install "sagemaker>=2.48.0" --upgrade
upgrade ipywidgets for datasets
library and restart kernel, only needed when prerpocessing is done in the notebook
%%capture
import IPython
!conda install -c conda-forge ipywidgets -y
IPython.Application.instance().kernel.do_shutdown(True) # has to restart kernel so changes are used
import sagemaker.huggingface
If you are going to use Sagemaker in a local environment. You need access to an IAM Role with the required permissions for Sagemaker. You can find here more about it.
import sagemaker
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
# set to default bucket if a bucket name is not given
sagemaker_session_bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")
In this example the preproccsing will be done in the train.py
when executing the script. You could also move the preprocessing
outside of the script and upload the data to s3 and pass it into it.
In order to create a sagemaker training job we need an HuggingFace
Estimator. The Estimator handles end-to-end Amazon SageMaker training and deployment tasks. In a Estimator we define, which fine-tuning script should be used as entry_point
, which instance_type
should be used, which hyperparameters
are passed in .....
huggingface_estimator = HuggingFace(entry_point='train.py',
source_dir='./scripts',
base_job_name='huggingface-sdk-extension',
instance_type='ml.p3.2xlarge',
instance_count=1,
transformers_version='4.4',
pytorch_version='1.6',
py_version='py37',
role=role,
hyperparameters = {'epochs': 1,
'train_batch_size': 32,
'model_name':'distilbert-base-uncased'
})
When we create a SageMaker training job, SageMaker takes care of starting and managing all the required ec2 instances for us with the huggingface
container, uploads the provided fine-tuning script train.py
and downloads the data from our sagemaker_session_bucket
into the container at /opt/ml/input/data
. Then, it starts the training job by running.
/opt/conda/bin/python train.py --epochs 1 --model_name distilbert-base-uncased --train_batch_size 32
The hyperparameters
you define in the HuggingFace
estimator are passed in as named arguments.
Sagemaker is providing useful properties about the training environment through various environment variables, including the following:
SM_MODEL_DIR
: A string that represents the path where the training job writes the model artifacts to. After training, artifacts in this directory are uploaded to S3 for model hosting.
SM_NUM_GPUS
: An integer representing the number of GPUs available to the host.
SM_CHANNEL_XXXX:
A string that represents the path to the directory that contains the input data for the specified channel. For example, if you specify two input channels in the HuggingFace estimator’s fit call, named train
and test
, the environment variables SM_CHANNEL_TRAIN
and SM_CHANNEL_TEST
are set.
To run your training job locally you can define instance_type='local'
or instance_type='local_gpu'
for gpu usage. Note: this does not working within SageMaker Studio
!pygmentize ./scripts/train.py
import argparse import logging import os import sys import tensorflow as tf from datasets import load_dataset from tqdm import tqdm from transformers import AutoTokenizer, TFAutoModelForSequenceClassification from transformers.file_utils import is_sagemaker_distributed_available if os.environ.get("SDP_ENABLED") or is_sagemaker_distributed_available(): SDP_ENABLED = True os.environ["SAGEMAKER_INSTANCE_TYPE"] = "p3dn.24xlarge" import smdistributed.dataparallel.tensorflow as sdp else: SDP_ENABLED = False def fit(model, loss, opt, train_dataset, epochs, train_batch_size, max_steps=None): pbar = tqdm(train_dataset) for i, batch in enumerate(pbar): with tf.GradientTape() as tape: inputs, targets = batch outputs = model(batch) loss_value = loss(targets, outputs.logits) if SDP_ENABLED: tape = sdp.DistributedGradientTape(tape, sparse_as_dense=True) grads = tape.gradient(loss_value, model.trainable_variables) opt.apply_gradients(zip(grads, model.trainable_variables)) pbar.set_description(f"Loss: {loss_value:.4f}") if SDP_ENABLED: if i == 0: sdp.broadcast_variables(model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0) first_batch = False if max_steps and i >= max_steps: break train_results = {"loss": loss_value.numpy()} return train_results def get_datasets(): # Load dataset train_dataset, test_dataset = load_dataset("imdb", split=["train", "test"]) # Preprocess train dataset train_dataset = train_dataset.map( lambda e: tokenizer(e["text"], truncation=True, padding="max_length"), batched=True ) train_dataset.set_format(type="tensorflow", columns=["input_ids", "attention_mask", "label"]) train_features = { x: train_dataset[x].to_tensor(default_value=0, shape=[None, tokenizer.model_max_length]) for x in ["input_ids", "attention_mask"] } tf_train_dataset = tf.data.Dataset.from_tensor_slices((train_features, train_dataset["label"])) # Preprocess test dataset test_dataset = test_dataset.map( lambda e: tokenizer(e["text"], truncation=True, padding="max_length"), batched=True ) test_dataset.set_format(type="tensorflow", columns=["input_ids", "attention_mask", "label"]) test_features = { x: test_dataset[x].to_tensor(default_value=0, shape=[None, tokenizer.model_max_length]) for x in ["input_ids", "attention_mask"] } tf_test_dataset = tf.data.Dataset.from_tensor_slices((test_features, test_dataset["label"])) if SDP_ENABLED: tf_train_dataset = tf_train_dataset.shard(sdp.size(), sdp.rank()) tf_test_dataset = tf_test_dataset.shard(sdp.size(), sdp.rank()) tf_train_dataset = tf_train_dataset.batch(args.train_batch_size, drop_remainder=True) tf_test_dataset = tf_test_dataset.batch(args.eval_batch_size, drop_remainder=True) return tf_train_dataset, tf_test_dataset if __name__ == "__main__": parser = argparse.ArgumentParser() # Hyperparameters sent by the client are passed as command-line arguments to the script. parser.add_argument("--epochs", type=int, default=3) parser.add_argument("--train-batch-size", type=int, default=16) parser.add_argument("--eval-batch-size", type=int, default=8) parser.add_argument("--model_name", type=str) parser.add_argument("--learning_rate", type=str, default=5e-5) parser.add_argument("--do_train", type=bool, default=True) parser.add_argument("--do_eval", type=bool, default=True) # Data, model, and output directories parser.add_argument("--output_data_dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"]) parser.add_argument("--model_dir", type=str, default=os.environ["SM_MODEL_DIR"]) parser.add_argument("--n_gpus", type=str, default=os.environ["SM_NUM_GPUS"]) args, _ = parser.parse_known_args() # Set up logging logger = logging.getLogger(__name__) logging.basicConfig( level=logging.getLevelName("INFO"), handlers=[logging.StreamHandler(sys.stdout)], format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) if SDP_ENABLED: sdp.init() gpus = tf.config.experimental.list_physical_devices("GPU") for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], "GPU") # Load model and tokenizer model = TFAutoModelForSequenceClassification.from_pretrained(args.model_name) tokenizer = AutoTokenizer.from_pretrained(args.model_name) # get datasets tf_train_dataset, tf_test_dataset = get_datasets() # fine optimizer and loss optimizer = tf.keras.optimizers.Adam(learning_rate=args.learning_rate) loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) metrics = [tf.keras.metrics.SparseCategoricalAccuracy()] model.compile(optimizer=optimizer, loss=loss, metrics=metrics) # Training if args.do_train: # train_results = model.fit(tf_train_dataset, epochs=args.epochs, batch_size=args.train_batch_size) train_results = fit( model, loss, optimizer, tf_train_dataset, args.epochs, args.train_batch_size, max_steps=None ) logger.info("*** Train ***") output_eval_file = os.path.join(args.output_data_dir, "train_results.txt") if not SDP_ENABLED or sdp.rank() == 0: with open(output_eval_file, "w") as writer: logger.info("***** Train results *****") logger.info(train_results) for key, value in train_results.items(): logger.info(" %s = %s", key, value) writer.write("%s = %s\n" % (key, value)) # Evaluation if args.do_eval and (not SDP_ENABLED or sdp.rank() == 0): result = model.evaluate(tf_test_dataset, batch_size=args.eval_batch_size, return_dict=True) logger.info("*** Evaluate ***") output_eval_file = os.path.join(args.output_data_dir, "eval_results.txt") with open(output_eval_file, "w") as writer: logger.info("***** Eval results *****") logger.info(result) for key, value in result.items(): logger.info(" %s = %s", key, value) writer.write("%s = %s\n" % (key, value)) # Save result if SDP_ENABLED: if sdp.rank() == 0: model.save_pretrained(args.model_dir) tokenizer.save_pretrained(args.model_dir) else: model.save_pretrained(args.model_dir) tokenizer.save_pretrained(args.model_dir)
from sagemaker.huggingface import HuggingFace
# hyperparameters, which are passed into the training job
hyperparameters={
'epochs': 1,
'train_batch_size': 16,
'model_name':'distilbert-base-uncased',
}
# configuration for running training on smdistributed Data Parallel
distribution = {'smdistributed':{'dataparallel':{ 'enabled': True }}}
# instance configurations
instance_type='ml.p3dn.24xlarge'
instance_count=2
volume_size=200
nieljare-hf-tf24-droplast-nodebugger-3node-sdp Launched job nieljare-hf-tf24-droplast-nodebugger-3node-sdp
huggingface_estimator = HuggingFace(
entry_point='train.py',
source_dir='./scripts',
instance_type=instance_type,
instance_count=instance_count,
role=role,
transformers_version='4.6',
tensorflow_version='2.4',
py_version='py37',
distribution=distribution,
hyperparameters=hyperparameters,
debugger_hook_config=False, # currently needed
)
huggingface_estimator.fit()
To deploy our endpoint, we call deploy()
on our HuggingFace estimator object, passing in our desired number of instances and instance type.
predictor = huggingface_estimator.deploy(1,"ml.g4dn.xlarge")
Then, we use the returned predictor object to call the endpoint.
sentiment_input= {"inputs":"I love using the new Inference DLC."}
predictor.predict(sentiment_input)
Finally, we delete the endpoint again.
predictor.delete_endpoint()
# container image used for training job
print(f"container image used for training job: \n{huggingface_estimator.image_uri}\n")
# s3 uri where the trained model is located
print(f"s3 uri where the trained model is located: \n{huggingface_estimator.model_data}\n")
# latest training job name for this estimator
print(f"latest training job name for this estimator: \n{huggingface_estimator.latest_training_job.name}\n")
container image used for training job: 763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-tensorflow-training:2.4.1-transformers4.5.0-gpu-py37-cu110-ubuntu18.04 s3 uri where the trained model is located: s3://sagemaker-us-east-1-558105141721/huggingface-training-2021-02-16-11-57-47-658/output/model.tar.gz latest training job name for this estimator: huggingface-training-2021-02-16-11-57-47-658
# access the logs of the training job
huggingface_estimator.sagemaker_session.logs_for_job(huggingface_estimator.latest_training_job.name)
In Sagemaker you can attach an old training job to an estimator to continue training, get results etc..
from sagemaker.estimator import Estimator
# job which is going to be attached to the estimator
old_training_job_name=''
# attach old training job
huggingface_estimator_loaded = Estimator.attach(old_training_job_name)
# get model output s3 from training job
huggingface_estimator_loaded.model_data