# Transformers installation ! pip install transformers datasets # To install from source instead of the last release, comment the command above and uncomment the following one. # ! pip install git+https://github.com/huggingface/transformers.git from huggingface_hub import notebook_login notebook_login() from huggingface_hub import hf_hub_download hf_dataset_identifier = "sayakpaul/ucf101-subset" filename = "UCF101_subset.tar.gz" file_path = hf_hub_download(repo_id=hf_dataset_identifier, filename=filename, repo_type="dataset") import tarfile with tarfile.open(file_path) as t: t.extractall(".") class_labels = sorted({str(path).split("/")[2] for path in all_video_file_paths}) label2id = {label: i for i, label in enumerate(class_labels)} id2label = {i: label for label, i in label2id.items()} print(f"Unique classes: {list(label2id.keys())}.") from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification model_ckpt = "MCG-NJU/videomae-base" image_processor = VideoMAEImageProcessor.from_pretrained(model_ckpt) model = VideoMAEForVideoClassification.from_pretrained( model_ckpt, label2id=label2id, id2label=id2label, ignore_mismatched_sizes=True, # provide this in case you're planning to fine-tune an already fine-tuned checkpoint ) import pytorchvideo.data from pytorchvideo.transforms import ( ApplyTransformToKey, Normalize, RandomShortSideScale, RemoveKey, ShortSideScale, UniformTemporalSubsample, ) from torchvision.transforms import ( Compose, Lambda, RandomCrop, RandomHorizontalFlip, Resize, ) mean = image_processor.image_mean std = image_processor.image_std if "shortest_edge" in image_processor.size: height = width = image_processor.size["shortest_edge"] else: height = image_processor.size["height"] width = image_processor.size["width"] resize_to = (height, width) num_frames_to_sample = model.config.num_frames sample_rate = 4 fps = 30 clip_duration = num_frames_to_sample * sample_rate / fps train_transform = Compose( [ ApplyTransformToKey( key="video", transform=Compose( [ UniformTemporalSubsample(num_frames_to_sample), Lambda(lambda x: x / 255.0), Normalize(mean, std), RandomShortSideScale(min_size=256, max_size=320), RandomCrop(resize_to), RandomHorizontalFlip(p=0.5), ] ), ), ] ) train_dataset = pytorchvideo.data.Ucf101( data_path=os.path.join(dataset_root_path, "train"), clip_sampler=pytorchvideo.data.make_clip_sampler("random", clip_duration), decode_audio=False, transform=train_transform, ) val_transform = Compose( [ ApplyTransformToKey( key="video", transform=Compose( [ UniformTemporalSubsample(num_frames_to_sample), Lambda(lambda x: x / 255.0), Normalize(mean, std), Resize(resize_to), ] ), ), ] ) val_dataset = pytorchvideo.data.Ucf101( data_path=os.path.join(dataset_root_path, "val"), clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration), decode_audio=False, transform=val_transform, ) test_dataset = pytorchvideo.data.Ucf101( data_path=os.path.join(dataset_root_path, "test"), clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration), decode_audio=False, transform=val_transform, ) print(train_dataset.num_videos, val_dataset.num_videos, test_dataset.num_videos) import imageio import numpy as np from IPython.display import Image def unnormalize_img(img): """Un-normalizes the image pixels.""" img = (img * std) + mean img = (img * 255).astype("uint8") return img.clip(0, 255) def create_gif(video_tensor, filename="sample.gif"): """Prepares a GIF from a video tensor. The video tensor is expected to have the following shape: (num_frames, num_channels, height, width). """ frames = [] for video_frame in video_tensor: frame_unnormalized = unnormalize_img(video_frame.permute(1, 2, 0).numpy()) frames.append(frame_unnormalized) kargs = {"duration": 0.25} imageio.mimsave(filename, frames, "GIF", **kargs) return filename def display_gif(video_tensor, gif_name="sample.gif"): """Prepares and displays a GIF from a video tensor.""" video_tensor = video_tensor.permute(1, 0, 2, 3) gif_filename = create_gif(video_tensor, gif_name) return Image(filename=gif_filename) sample_video = next(iter(train_dataset)) video_tensor = sample_video["video"] display_gif(video_tensor) from transformers import TrainingArguments, Trainer model_name = model_ckpt.split("/")[-1] new_model_name = f"{model_name}-finetuned-ucf101-subset" num_epochs = 4 args = TrainingArguments( new_model_name, remove_unused_columns=False, evaluation_strategy="epoch", save_strategy="epoch", learning_rate=5e-5, per_device_train_batch_size=batch_size, per_device_eval_batch_size=batch_size, warmup_ratio=0.1, logging_steps=10, load_best_model_at_end=True, metric_for_best_model="accuracy", push_to_hub=True, max_steps=(train_dataset.num_videos // batch_size) * num_epochs, ) import evaluate metric = evaluate.load("accuracy") def compute_metrics(eval_pred): predictions = np.argmax(eval_pred.predictions, axis=1) return metric.compute(predictions=predictions, references=eval_pred.label_ids) def collate_fn(examples): # permute to (num_frames, num_channels, height, width) pixel_values = torch.stack( [example["video"].permute(1, 0, 2, 3) for example in examples] ) labels = torch.tensor([example["label"] for example in examples]) return {"pixel_values": pixel_values, "labels": labels} trainer = Trainer( model, args, train_dataset=train_dataset, eval_dataset=val_dataset, tokenizer=image_processor, compute_metrics=compute_metrics, data_collator=collate_fn, ) train_results = trainer.train() trainer.push_to_hub() sample_test_video = next(iter(test_dataset)) from transformers import pipeline video_cls = pipeline(model="my_awesome_video_cls_model") video_cls("https://huggingface.co/datasets/sayakpaul/ucf101-subset/resolve/main/v_BasketballDunk_g14_c06.avi") def run_inference(model, video): # (num_frames, num_channels, height, width) perumuted_sample_test_video = video.permute(1, 0, 2, 3) inputs = { "pixel_values": perumuted_sample_test_video.unsqueeze(0), "labels": torch.tensor( [sample_test_video["label"]] ), # this can be skipped if you don't have labels available. } device = torch.device("cuda" if torch.cuda.is_available() else "cpu") inputs = {k: v.to(device) for k, v in inputs.items()} model = model.to(device) # forward pass with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits return logits predicted_class_idx = logits.argmax(-1).item() print("Predicted class:", model.config.id2label[predicted_class_idx])