(lightning_advanced_example)=
:::{note}
This is an advanced example for {class}LightningTrainer <ray.train.lightning.LightningTrainer>
, which demonstrates how to use LightningTrainer with {ref}Dataset <data>
and {ref}Batch Predictor <air-predictors>
.
If you just want to quickly convert your existing PyTorch Lightning scripts into Ray AIR, you can refer to this starter example:
{ref}Train a Pytorch Lightning Image Classifier <lightning_mnist_example>
.
:::
In this demo, we will introduce how to finetune a text classifier on CoLA(The Corpus of Linguistic Acceptability) datasets with pretrained BERT. In particular, we will:
SMOKE_TEST = True
Run the following line in order to install all the necessary dependencies:
!pip install numpy datasets "transformers>=4.19.1" "pytorch_lightning>=1.6.5"
Let's start by importing the needed libraries:
import ray
import torch
import pytorch_lightning as pl
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from datasets import load_dataset, load_metric
import numpy as np
CoLA is a binary sentence classification task with 10.6K training examples. First, we download the dataset and metrics using the HuggingFace API, and create Ray Data for each split accordingly.
dataset = load_dataset("glue", "cola")
metric = load_metric("glue", "cola")
ray_datasets = ray.data.from_huggingface(dataset)
Next, define a preprocessor that tokenizes the input sentences and pads the ID sequence to length 128 using the bert-base-uncased tokenizer. The preprocessor transforms all datasets that we provide to the LightningTrainer later.
from ray.data.preprocessors import BatchMapper
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def tokenize_sentence(batch):
encoded_sent = tokenizer(
batch["sentence"].tolist(),
max_length=128,
truncation=True,
padding="max_length",
return_tensors="pt",
)
batch["input_ids"] = encoded_sent["input_ids"].numpy()
batch["attention_mask"] = encoded_sent["attention_mask"].numpy()
batch["label"] = np.array(batch["label"])
batch.pop("sentence")
return batch
preprocessor = BatchMapper(tokenize_sentence, batch_format="numpy")
You don't have to make any change of your LightningModule
definition. Just copy and paste your code here:
class SentimentModel(pl.LightningModule):
def __init__(self, lr=2e-5, eps=1e-8):
super().__init__()
self.lr = lr
self.eps = eps
self.num_classes = 2
self.model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", num_labels=self.num_classes
)
self.metric = load_metric("glue", "cola")
self.predictions = []
self.references = []
def forward(self, batch):
input_ids, attention_mask = batch["input_ids"], batch["attention_mask"]
outputs = self.model(input_ids, attention_mask=attention_mask)
logits = outputs.logits
return logits
def training_step(self, batch, batch_idx):
labels = batch["label"]
logits = self.forward(batch)
loss = F.cross_entropy(logits.view(-1, self.num_classes), labels)
self.log("train_loss", loss)
return loss
def validation_step(self, batch, batch_idx):
labels = batch["label"]
logits = self.forward(batch)
preds = torch.argmax(logits, dim=1)
self.predictions.append(preds)
self.references.append(labels)
def on_validation_epoch_end(self):
predictions = torch.concat(self.predictions).view(-1)
references = torch.concat(self.references).view(-1)
matthews_correlation = self.metric.compute(
predictions=predictions, references=references
)
# self.metric.compute() returns a dictionary:
# e.g. {"matthews_correlation": 0.53}
self.log_dict(matthews_correlation, sync_dist=True)
self.predictions.clear()
self.references.clear()
def configure_optimizers(self):
return torch.optim.AdamW(self.parameters(), lr=self.lr, eps=self.eps)
Define a LightningTrainer with necessary configurations, including hyper-parameters, checkpointing and compute resources settings.
You may find the API of {class}LightningConfigBuilder <ray.train.lightning.LightningConfigBuilder>
and the discussion {ref}here <lightning-config-builder-intro>
useful.
from ray.train.lightning import LightningTrainer, LightningConfigBuilder
from ray.air.config import RunConfig, ScalingConfig, CheckpointConfig
# Define the configs for LightningTrainer
lightning_config = (
LightningConfigBuilder()
.module(cls=SentimentModel, lr=1e-5, eps=1e-8)
.trainer(max_epochs=5, accelerator="gpu")
.checkpointing(save_on_train_epoch_end=False)
.build()
)
:::{note}
Note that the lightning_config
is created on the head node and will be passed to the worker nodes later. Be aware that the environment variables and hardware settings may differ between the head node and worker nodes.
:::
:::{note}
{meth}LightningConfigBuilder.checkpointing() <ray.train.lightning.LightningConfigBuilder.checkpointing>
creates a ModelCheckpoint callback. This callback defines the checkpoint frequency and saves checkpoint files in Lightning style.
If you want to save AIR checkpoints for Batch Prediction, please also provide an AIR {class}CheckpointConfig <ray.air.config.CheckpointConfig>
.
:::
# Save AIR checkpoints according to the performance on validation set
run_config = RunConfig(
name="ptl-sent-classification",
checkpoint_config=CheckpointConfig(
num_to_keep=2,
checkpoint_score_attribute="matthews_correlation",
checkpoint_score_order="max",
),
)
# Scale the DDP training workload across 4 GPUs
# You can change this config based on your compute resources.
scaling_config = ScalingConfig(
num_workers=4, use_gpu=True, resources_per_worker={"CPU": 1, "GPU": 1}
)
if SMOKE_TEST:
lightning_config = (
LightningConfigBuilder()
.module(cls=SentimentModel, lr=1e-5, eps=1e-8)
.trainer(max_epochs=2, accelerator="gpu")
.checkpointing(save_on_train_epoch_end=False)
.build()
)
for split, ds in ray_datasets.items():
ray_datasets[split] = ds.random_sample(0.1)
Train the model with the configuration we specified above.
To feed data into LightningTrainer, we need to configure the following arguments:
datasets
: A dictionary of the input Ray datasets, with special keys "train" and "val".datasets_iter_config
: The argument list of {meth}iter_torch_batches() <ray.data.Dataset.iter_torch_batches>
. It defines the way we iterate dataset shards for each worker.preprocessor
: The preprocessor that will be applied to the input dataset.:::{note}
Note that we are using Dataset for data ingestion for faster preprocessing here, but you can also continue to use the native PyTorch DataLoader
or LightningDataModule
. See {ref}this example <lightning_mnist_example>
.
:::
Now, call trainer.fit()
to initiate the training process.
trainer = LightningTrainer(
lightning_config=lightning_config,
run_config=run_config,
scaling_config=scaling_config,
datasets={"train": ray_datasets["train"], "val": ray_datasets["validation"]},
datasets_iter_config={"batch_size": 16},
preprocessor=preprocessor,
)
result = trainer.fit()
Current time: | 2023-04-24 10:42:50 |
Running for: | 00:06:26.94 |
Memory: | 23.8/186.6 GiB |
Trial name | status | loc | iter | total time (s) | train_loss | matthews_correlation | epoch |
---|---|---|---|---|---|---|---|
LightningTrainer_87ecf_00000 | TERMINATED | 10.0.60.127:67819 | 5 | 376.028 | 0.0119807 | 0.589931 | 4 |
(pid=67819) /home/ray/anaconda3/lib/python3.9/site-packages/xgboost/compat.py:31: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead. (pid=67819) from pandas import MultiIndex, Int64Index (LightningTrainer pid=67819) 2023-04-24 10:36:31,679 INFO backend_executor.py:128 -- Starting distributed worker processes: ['68396 (10.0.60.127)', '68397 (10.0.60.127)', '68398 (10.0.60.127)', '68399 (10.0.60.127)'] (RayTrainWorker pid=68396) 2023-04-24 10:36:32,731 INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=4]
(pid=67819) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=67819) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(LightningTrainer pid=67819) 2023-04-24 10:36:34,052 INFO streaming_executor.py:87 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[RandomizeBlockOrder] (LightningTrainer pid=67819) 2023-04-24 10:36:34,052 INFO streaming_executor.py:88 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False) (LightningTrainer pid=67819) 2023-04-24 10:36:34,053 INFO streaming_executor.py:90 -- Tip: To enable per-operator progress reporting, set RAY_DATA_VERBOSE_PROGRESS=1. (RayTrainWorker pid=68396) /home/ray/anaconda3/lib/python3.9/site-packages/xgboost/compat.py:31: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead. (RayTrainWorker pid=68396) from pandas import MultiIndex, Int64Index Downloading: 0%| | 0.00/416M [00:00<?, ?B/s] Downloading: 0%| | 1.41M/416M [00:00<00:29, 14.8MB/s] Downloading: 2%|▏ | 7.52M/416M [00:00<00:09, 43.8MB/s] Downloading: 4%|▍ | 16.8M/416M [00:00<00:06, 68.4MB/s] Downloading: 6%|▌ | 25.7M/416M [00:00<00:05, 78.2MB/s] Downloading: 8%|▊ | 34.1M/416M [00:00<00:04, 81.8MB/s] Downloading: 10%|█ | 42.0M/416M [00:00<00:04, 80.8MB/s] Downloading: 12%|█▏ | 49.7M/416M [00:00<00:05, 76.4MB/s] Downloading: 14%|█▍ | 58.3M/416M [00:00<00:04, 80.5MB/s] Downloading: 16%|█▌ | 66.5M/416M [00:00<00:04, 82.1MB/s] Downloading: 18%|█▊ | 74.3M/416M [00:01<00:04, 78.1MB/s] Downloading: 20%|██ | 83.2M/416M [00:01<00:04, 82.5MB/s] Downloading: 22%|██▏ | 91.8M/416M [00:01<00:04, 84.5MB/s] Downloading: 24%|██▍ | 99.9M/416M [00:01<00:04, 79.3MB/s] Downloading: 26%|██▌ | 108M/416M [00:01<00:04, 80.3MB/s] Downloading: 28%|██▊ | 116M/416M [00:01<00:04, 78.3MB/s] Downloading: 30%|██▉ | 123M/416M [00:01<00:03, 79.3MB/s] Downloading: 31%|███▏ | 131M/416M [00:01<00:04, 72.6MB/s] Downloading: 34%|███▎ | 139M/416M [00:01<00:03, 76.8MB/s] Downloading: 35%|███▌ | 147M/416M [00:02<00:03, 79.2MB/s] Downloading: 37%|███▋ | 155M/416M [00:02<00:03, 77.9MB/s] Downloading: 39%|███▉ | 163M/416M [00:02<00:03, 67.7MB/s] Downloading: 42%|████▏ | 173M/416M [00:02<00:03, 79.2MB/s] Downloading: 44%|████▎ | 182M/416M [00:02<00:02, 81.8MB/s] Downloading: 46%|████▌ | 190M/416M [00:02<00:03, 70.8MB/s] (RayTrainWorker pid=68399) /home/ray/anaconda3/lib/python3.9/site-packages/xgboost/compat.py:31: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead. [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.) (RayTrainWorker pid=68399) from pandas import MultiIndex, Int64Index [repeated 3x across cluster] Downloading: 48%|████▊ | 198M/416M [00:02<00:03, 74.1MB/s] Downloading: 49%|████▉ | 205M/416M [00:02<00:03, 72.9MB/s] Downloading: 51%|█████ | 212M/416M [00:02<00:02, 71.7MB/s] Downloading: 53%|█████▎ | 220M/416M [00:03<00:02, 73.6MB/s] Downloading: 55%|█████▍ | 228M/416M [00:03<00:02, 76.4MB/s] Downloading: 57%|█████▋ | 236M/416M [00:03<00:02, 78.7MB/s] Downloading: 59%|█████▊ | 244M/416M [00:03<00:02, 75.0MB/s] Downloading: 60%|██████ | 251M/416M [00:03<00:02, 73.0MB/s] Downloading: 62%|██████▏ | 258M/416M [00:03<00:02, 67.8MB/s] Downloading: 64%|██████▎ | 264M/416M [00:03<00:02, 67.1MB/s] Downloading: 66%|██████▌ | 273M/416M [00:03<00:02, 72.4MB/s] Downloading: 67%|██████▋ | 280M/416M [00:03<00:01, 73.5MB/s] Downloading: 69%|██████▉ | 287M/416M [00:04<00:02, 65.1MB/s] Downloading: 71%|███████ | 294M/416M [00:04<00:01, 67.6MB/s] Downloading: 73%|███████▎ | 302M/416M [00:04<00:01, 72.4MB/s] Downloading: 74%|███████▍ | 309M/416M [00:04<00:01, 69.6MB/s] Downloading: 76%|███████▋ | 318M/416M [00:04<00:01, 75.2MB/s] Downloading: 78%|███████▊ | 326M/416M [00:04<00:01, 78.8MB/s] Downloading: 80%|████████ | 334M/416M [00:04<00:01, 77.5MB/s] Downloading: 82%|████████▏ | 341M/416M [00:04<00:01, 75.1MB/s] Downloading: 84%|████████▍ | 349M/416M [00:04<00:00, 75.4MB/s] Downloading: 86%|████████▌ | 356M/416M [00:04<00:00, 76.7MB/s] Downloading: 88%|████████▊ | 365M/416M [00:05<00:00, 78.9MB/s] Downloading: 90%|████████▉ | 372M/416M [00:05<00:00, 75.9MB/s] Downloading: 91%|█████████▏| 380M/416M [00:05<00:00, 78.5MB/s] Downloading: 93%|█████████▎| 388M/416M [00:05<00:00, 78.5MB/s] Downloading: 95%|█████████▌| 395M/416M [00:05<00:00, 75.7MB/s] Downloading: 97%|█████████▋| 403M/416M [00:05<00:00, 70.9MB/s] Downloading: 100%|██████████| 416M/416M [00:05<00:00, 74.0MB/s] (RayTrainWorker pid=68398) Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.bias', 'cls.predictions.bias'] (RayTrainWorker pid=68398) - This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model). (RayTrainWorker pid=68398) - This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model). (RayTrainWorker pid=68398) Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight'] (RayTrainWorker pid=68398) You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference. (RayTrainWorker pid=68396) Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias'] (RayTrainWorker pid=68396) Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.weight', 'classifier.bias'] (RayTrainWorker pid=68397) Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.weight'] (RayTrainWorker pid=68399) Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForSequenceClassification: ['cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight'] (RayTrainWorker pid=68398) Missing logger folder: /home/ray/ray_results/ptl-sent-classification/LightningTrainer_87ecf_00000_0_2023-04-24_10-36-23/rank_2/lightning_logs (RayTrainWorker pid=68396) GPU available: True, used: True (RayTrainWorker pid=68396) TPU available: False, using: 0 TPU cores (RayTrainWorker pid=68396) IPU available: False, using: 0 IPUs (RayTrainWorker pid=68396) HPU available: False, using: 0 HPUs (RayTrainWorker pid=68398) LOCAL_RANK: 2 - CUDA_VISIBLE_DEVICES: [0,1,2,3] (RayTrainWorker pid=68399) - This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model). [repeated 3x across cluster] (RayTrainWorker pid=68399) - This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model). [repeated 3x across cluster] (RayTrainWorker pid=68399) LOCAL_RANK: 2 - CUDA_VISIBLE_DEVICES: [0,1,2,3] (RayTrainWorker pid=68399) You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference. [repeated 3x across cluster] (RayTrainWorker pid=68397) LOCAL_RANK: 2 - CUDA_VISIBLE_DEVICES: [0,1,2,3] (RayTrainWorker pid=68399) Missing logger folder: /home/ray/ray_results/ptl-sent-classification/LightningTrainer_87ecf_00000_0_2023-04-24_10-36-23/rank_3/lightning_logs [repeated 3x across cluster] (RayTrainWorker pid=68396) (RayTrainWorker pid=68396) | Name | Type | Params (RayTrainWorker pid=68396) -------------------------------------------------------- (RayTrainWorker pid=68396) 0 | model | BertForSequenceClassification | 108 M (RayTrainWorker pid=68396) -------------------------------------------------------- (RayTrainWorker pid=68396) 108 M Trainable params (RayTrainWorker pid=68396) 0 Non-trainable params (RayTrainWorker pid=68396) 108 M Total params (RayTrainWorker pid=68396) 433.247 Total estimated model params size (MB) (RayTrainWorker pid=68398) 2023-04-24 10:36:59,628 INFO streaming_executor.py:87 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[RandomizeBlockOrder] (RayTrainWorker pid=68398) 2023-04-24 10:36:59,629 INFO streaming_executor.py:88 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False) (RayTrainWorker pid=68398) 2023-04-24 10:36:59,629 INFO streaming_executor.py:90 -- Tip: To enable per-operator progress reporting, set RAY_DATA_VERBOSE_PROGRESS=1.
(pid=68398) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68398) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(RayTrainWorker pid=68396) /home/ray/anaconda3/lib/python3.9/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:240: PossibleUserWarning: The dataloader, val_dataloader 0, does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` (try 48 which is the number of cpus on this machine) in the `DataLoader` init to improve performance. (RayTrainWorker pid=68396) rank_zero_warn(
(pid=68397) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(RayTrainWorker pid=68396) /home/ray/anaconda3/lib/python3.9/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:240: PossibleUserWarning: The dataloader, train_dataloader, does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` (try 48 which is the number of cpus on this machine) in the `DataLoader` init to improve performance. (RayTrainWorker pid=68396) rank_zero_warn( (RayTrainWorker pid=68399) LOCAL_RANK: 3 - CUDA_VISIBLE_DEVICES: [0,1,2,3] [repeated 3x across cluster] (RayTrainWorker pid=68399) 2023-04-24 10:36:59,628 INFO streaming_executor.py:87 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[RandomizeBlockOrder] [repeated 3x across cluster] (RayTrainWorker pid=68399) 2023-04-24 10:36:59,628 INFO streaming_executor.py:88 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False) [repeated 3x across cluster] (RayTrainWorker pid=68399) 2023-04-24 10:36:59,629 INFO streaming_executor.py:90 -- Tip: To enable per-operator progress reporting, set RAY_DATA_VERBOSE_PROGRESS=1. [repeated 3x across cluster] (RayTrainWorker pid=68398) [W reducer.cpp:1298] Warning: find_unused_parameters=True was specified in DDP constructor, but did not find any unused parameters in the forward pass. This flag results in an extra traversal of the autograd graph every iteration, which can adversely affect performance. If your model indeed never has any unused parameters in the forward pass, consider turning this flag off. Note that this warning may be a false positive if your model has flow control causing later iterations to have unused parameters. (function operator()) (RayTrainWorker pid=68396) 2023-04-24 10:37:27.091660: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 AVX512F AVX512_VNNI FMA (RayTrainWorker pid=68396) To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags. (RayTrainWorker pid=68399) [W reducer.cpp:1298] Warning: find_unused_parameters=True was specified in DDP constructor, but did not find any unused parameters in the forward pass. This flag results in an extra traversal of the autograd graph every iteration, which can adversely affect performance. If your model indeed never has any unused parameters in the forward pass, consider turning this flag off. Note that this warning may be a false positive if your model has flow control causing later iterations to have unused parameters. (function operator()) [repeated 3x across cluster] (RayTrainWorker pid=68396) 2023-04-24 10:37:27.373013: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`. (RayTrainWorker pid=68396) 2023-04-24 10:37:28.763569: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia/lib:/usr/local/nvidia/lib64 (RayTrainWorker pid=68396) 2023-04-24 10:37:28.763761: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia/lib:/usr/local/nvidia/lib64 (RayTrainWorker pid=68396) 2023-04-24 10:37:28.763770: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly. (RayTrainWorker pid=68398) 2023-04-24 10:38:01,220 INFO streaming_executor.py:87 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[RandomizeBlockOrder] (RayTrainWorker pid=68398) 2023-04-24 10:38:01,221 INFO streaming_executor.py:88 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False) (RayTrainWorker pid=68398) 2023-04-24 10:38:01,221 INFO streaming_executor.py:90 -- Tip: To enable per-operator progress reporting, set RAY_DATA_VERBOSE_PROGRESS=1.
(pid=68398) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68398) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
Trial name | _report_on | date | done | epoch | experiment_tag | hostname | iterations_since_restore | matthews_correlation | node_ip | pid | should_checkpoint | step | time_since_restore | time_this_iter_s | time_total_s | timestamp | train_loss | training_iteration | trial_id |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
LightningTrainer_87ecf_00000 | validation_end | 2023-04-24_10-42-46 | True | 4 | 0 | ip-10-0-60-127 | 5 | 0.589931 | 10.0.60.127 | 67819 | True | 670 | 376.028 | 70.6609 | 376.028 | 1682358165 | 0.0119807 | 5 | 87ecf_00000 |
(RayTrainWorker pid=68398) 2023-04-24 10:39:03,705 INFO streaming_executor.py:87 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[RandomizeBlockOrder] [repeated 4x across cluster] (RayTrainWorker pid=68398) 2023-04-24 10:39:03,706 INFO streaming_executor.py:88 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False) [repeated 4x across cluster] (RayTrainWorker pid=68398) 2023-04-24 10:39:03,706 INFO streaming_executor.py:90 -- Tip: To enable per-operator progress reporting, set RAY_DATA_VERBOSE_PROGRESS=1. [repeated 4x across cluster]
(pid=68398) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68398) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(RayTrainWorker pid=68398) 2023-04-24 10:40:09,873 INFO streaming_executor.py:87 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[RandomizeBlockOrder] [repeated 4x across cluster] (RayTrainWorker pid=68398) 2023-04-24 10:40:09,873 INFO streaming_executor.py:88 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False) [repeated 4x across cluster] (RayTrainWorker pid=68398) 2023-04-24 10:40:09,873 INFO streaming_executor.py:90 -- Tip: To enable per-operator progress reporting, set RAY_DATA_VERBOSE_PROGRESS=1. [repeated 4x across cluster]
(pid=68398) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68398) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(RayTrainWorker pid=68398) 2023-04-24 10:41:18,552 INFO streaming_executor.py:87 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[RandomizeBlockOrder] [repeated 4x across cluster] (RayTrainWorker pid=68398) 2023-04-24 10:41:18,552 INFO streaming_executor.py:88 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False) [repeated 4x across cluster] (RayTrainWorker pid=68398) 2023-04-24 10:41:18,552 INFO streaming_executor.py:90 -- Tip: To enable per-operator progress reporting, set RAY_DATA_VERBOSE_PROGRESS=1. [repeated 4x across cluster]
(pid=68398) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68398) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(RayTrainWorker pid=68398) 2023-04-24 10:42:29,325 INFO streaming_executor.py:87 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper] -> AllToAllOperator[RandomizeBlockOrder] [repeated 4x across cluster] (RayTrainWorker pid=68398) 2023-04-24 10:42:29,325 INFO streaming_executor.py:88 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False) [repeated 4x across cluster] (RayTrainWorker pid=68398) 2023-04-24 10:42:29,325 INFO streaming_executor.py:90 -- Tip: To enable per-operator progress reporting, set RAY_DATA_VERBOSE_PROGRESS=1. [repeated 4x across cluster]
(pid=68398) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68398) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68396) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68397) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) - RandomizeBlockOrder 1: 0%| | 0/1 [00:00<?, ?it/s]
(pid=68399) Running 0: 0%| | 0/1 [00:00<?, ?it/s]
2023-04-24 10:42:50,016 INFO tune.py:1010 -- Total run time: 387.00 seconds (386.94 seconds for the tuning loop).
:::{note}
Note that we are using Ray Data for data ingestion for faster preprocessing here, but you can also continue to use the native PyTorch DataLoader
or LightningDataModule
. See {ref}this example <lightning_mnist_example>
.
:::
result
Result( metrics={'_report_on': 'validation_end', 'train_loss': 0.011980690062046051, 'matthews_correlation': 0.5899314497879129, 'epoch': 4, 'step': 670, 'should_checkpoint': True, 'done': True, 'trial_id': '87ecf_00000', 'experiment_tag': '0'}, path='/home/ray/ray_results/ptl-sent-classification/LightningTrainer_87ecf_00000_0_2023-04-24_10-36-23', checkpoint=LightningCheckpoint(local_path=/home/ray/ray_results/ptl-sent-classification/LightningTrainer_87ecf_00000_0_2023-04-24_10-36-23/checkpoint_000004) )
Now that we have fine-tuned the module, we can load the checkpoint into a BatchPredictor and perform fast inference with multiple GPUs. It will distribute the inference workload across multiple workers when calling predict()
and run prediction on multiple shards of data in parallel.
You can find more details in Using Predictors for Inference.
from ray.train.batch_predictor import BatchPredictor
from ray.train.lightning import LightningCheckpoint, LightningPredictor
# Use in-memory checkpoint object
checkpoint = result.checkpoint
# You can also load a checkpoint from disk:
# YOUR_CHECKPOINT_DIR = result.checkpoint.path
# checkpoint = LightningCheckpoint.from_directory(YOUR_CHECKPOINT_DIR)
batch_predictor = BatchPredictor(
checkpoint=checkpoint,
predictor_cls=LightningPredictor,
use_gpu=True,
model_class=SentimentModel,
preprocessor=preprocessor,
)
# Use 2 GPUs for batch inference
predictions = batch_predictor.predict(
ray_datasets["validation"],
feature_columns=["input_ids", "attention_mask", "label"],
keep_columns=["label"],
batch_size=16,
min_scoring_workers=2,
max_scoring_workers=2,
num_gpus_per_worker=1,
)
We obtained a Ray dataset containing predictions from batch_predictor.predict()
. Now we can easily evaluate the results with just a few lines of code:
# Internally, BatchPredictor calls forward() method of the LightningModule.
# Convert the logits tensor into labels with argmax.
def argmax(batch):
batch["predictions"] = batch["predictions"].apply(lambda x: np.argmax(x))
return batch
results = predictions.map_batches(argmax, batch_format="pandas").to_pandas()
matthews_corr = metric.compute(
predictions=results["predictions"], references=results["label"]
)
print(results.head(10))
print(matthews_corr)
predictions label 0 1 1 1 1 1 2 0 1 3 1 1 4 0 0 5 1 0 6 1 0 7 1 1 8 1 1 9 1 1 {'matthews_correlation': 0.5899314497879129}
Fine-tune a Large Language Model with LightningTrainer and FSDP <dolly_lightning_fsdp_finetuning>
Hyperparameter searching with LightningTrainer + Ray Tune. <tune-pytorch-lightning-ref>
Experiment Tracking with Wandb, CometML, MLFlow, and Tensorboard in LightningTrainer <lightning_experiment_tracking>