Back to Models

Video SSL Tutorial

official/projects/video_ssl/video_ssl.ipynb

2.20.017.6 KB
Original Source

Video SSL Tutorial

This tutorial trains a video_ssl with 3D-ResNet-50 (R3D-50) as backbone model from the TensorFlow Model Garden package (tensorflow-models).

Model Garden contains a collection of state-of-the-art models, implemented with TensorFlow's high-level APIs. The implementations demonstrate the best practices for modeling, letting users to take full advantage of TensorFlow for their research and product development.

Dataset: UCF_101

  • A 101-label video classification dataset

This tutorial demonstrates how to:

  • Use models from the TensorFlow Models package.
  • Train/Fine-tune a pre-built video_ssl for Video Classification.
  • Export the trained/tuned video_ssl model

Install cecessary libraries

!pip install -U -q "tensorflow" "tensorflow_addons" "immutabledict" "tensorflow_datasets"
!pip install -U -q remotezip tqdm opencv-python einops
!pip install -U -q git+https://github.com/tensorflow/docs

Clone models repository

!git clone https://github.com/tensorflow/models.git

Set models as current working dir

%cd ./models/

Import cecessary libraries

import os
import tqdm
import random
import pathlib
import pprint
import itertools
import imageio
import collections

import cv2
import einops
import numpy as np
import remotezip as rz
import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow as tf

from IPython import display
from tensorflow_docs.vis import embed

pp = pprint.PrettyPrinter(indent=4) # Set Pretty Print Indentation
print(tf.__version__) # Check the version of tensorflow used

Import required modules from vidoe_ssl for running the model

from official.core import task_factory
from official.core import train_lib
from official.core import train_utils
from official.modeling import performance
from official.vision.data import tfrecord_lib

from official.projects.video_ssl.configs import video_ssl as exp_cfg
from official.projects.video_ssl.modeling import video_ssl_model
from official.projects.video_ssl.tasks import linear_eval
from official.projects.video_ssl.tasks import pretrain
from official.vision import registry_imports
from official.vision.serving import export_saved_model_lib

Download pretrained weights

!wget https://storage.googleapis.com/tf_model_garden/vision/cvrl/r3d_1x_k600_800ep.tar.gz -P /content/
!tar -xvf /content/r3d_1x_k600_800ep.tar.gz -C ../
!rm ../r3d_1x_k600_800ep.tar.gz

Download Subdataset of UCF_101

# @title Load and preprocess video data
def list_files_per_class(zip_url):
  """
    List the files in each class of the dataset given the zip URL.

    Args:
      zip_url: URL from which the files can be unzipped.

    Return:
      files: List of files in each of the classes.
  """
  files = []
  with rz.RemoteZip(URL) as zip:
    for zip_info in zip.infolist():
      files.append(zip_info.filename)
  return files

def get_class(fname):
  """
    Retrieve the name of the class given a filename.

    Args:
      fname: Name of the file in the UCF101 dataset.

    Return:
      Class that the file belongs to.
  """
  return fname.split('_')[-3]

def get_files_per_class(files):
  """
    Retrieve the files that belong to each class.

    Args:
      files: List of files in the dataset.

    Return:
      Dictionary of class names (key) and files (values).
  """
  files_for_class = collections.defaultdict(list)
  for fname in files:
    class_name = get_class(fname)
    files_for_class[class_name].append(fname)
  return files_for_class

def download_from_zip(zip_url, to_dir, file_names):
  """
    Download the contents of the zip file from the zip URL.

    Args:
      zip_url: Zip URL containing data.
      to_dir: Directory to download data to.
      file_names: Names of files to download.
  """
  with rz.RemoteZip(zip_url) as zip:
    for fn in tqdm.tqdm(file_names):
      class_name = get_class(fn)
      zip.extract(fn, str(to_dir / class_name))
      unzipped_file = to_dir / class_name / fn

      fn = pathlib.Path(fn).parts[-1]
      output_file = to_dir / class_name / fn
      unzipped_file.rename(output_file,)

def split_class_lists(files_for_class, count):
  """
    Returns the list of files belonging to a subset of data as well as the remainder of
    files that need to be downloaded.

    Args:
      files_for_class: Files belonging to a particular class of data.
      count: Number of files to download.

    Return:
      split_files: Files belonging to the subset of data.
      remainder: Dictionary of the remainder of files that need to be downloaded.
  """
  split_files = []
  remainder = {}
  for cls in files_for_class:
    split_files.extend(files_for_class[cls][:count])
    remainder[cls] = files_for_class[cls][count:]
  return split_files, remainder

def download_ufc_101_subset(zip_url, num_classes, splits, download_dir):
  """
    Download a subset of the UFC101 dataset and split them into various parts, such as
    training, validation, and test.

    Args:
      zip_url: Zip URL containing data.
      num_classes: Number of labels.
      splits: Dictionary specifying the training, validation, test, etc. (key) division of data
              (value is number of files per split).
      download_dir: Directory to download data to.

    Return:
      dir: Posix path of the resulting directories containing the splits of data.
  """
  files = list_files_per_class(zip_url)
  for f in files:
    tokens = f.split('/')
    if len(tokens) <= 2:
      files.remove(f) # Remove that item from the list if it does not have a filename

  files_for_class = get_files_per_class(files)

  classes = list(files_for_class.keys())[:num_classes]

  for cls in classes:
    new_files_for_class = files_for_class[cls]
    random.shuffle(new_files_for_class)
    files_for_class[cls] = new_files_for_class

  # Only use the number of classes you want in the dictionary
  files_for_class = {x: files_for_class[x] for x in list(files_for_class)[:num_classes]}

  dirs = {}
  for split_name, split_count in splits.items():
    print(split_name, ":")
    split_dir = download_dir / split_name
    split_files, files_for_class = split_class_lists(files_for_class, split_count)
    download_from_zip(zip_url, split_dir, split_files)
    dirs[split_name] = split_dir

  return dirs

def format_frames(frame, output_size):
  """
    Pad and resize an image from a video.

    Args:
      frame: Image that needs to resized and padded.
      output_size: Pixel size of the output frame image.

    Return:
      Formatted frame with padding of specified output size.
  """
  frame = tf.image.convert_image_dtype(frame, tf.uint8)
  frame = tf.image.resize_with_pad(frame, *output_size)
  return frame

def frames_from_video_file(video_path, n_frames, output_size = (224,224), frame_step = 15):
  """
    Creates frames from each video file present for each category.

    Args:
      video_path: File path to the video.
      n_frames: Number of frames to be created per video file.
      output_size: Pixel size of the output frame image.

    Return:
      An NumPy array of frames in the shape of (n_frames, height, width, channels).
  """
  # Read each video frame by frame
  result = []
  src = cv2.VideoCapture(str(video_path))

  video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

  need_length = 1 + (n_frames - 1) * frame_step

  if need_length > video_length:
    start = 0
  else:
    max_start = video_length - need_length
    start = random.randint(0, max_start + 1)

  src.set(cv2.CAP_PROP_POS_FRAMES, start)
  # ret is a boolean indicating whether read was successful, frame is the image itself
  ret, frame = src.read()
  result.append(format_frames(frame, output_size))

  for _ in range(n_frames - 1):
    for _ in range(frame_step):
      ret, frame = src.read()
    if ret:
      frame = format_frames(frame, output_size)
      result.append(frame)
    else:
      result.append(np.zeros_like(result[0]))
  src.release()
  result = np.array(result)[..., [2, 1, 0]]

  return result

def to_gif(images):
  imageio.mimsave('./animation.gif', images, fps=10)
  return embed.embed_file('./animation.gif')

class FrameGenerator:
  def __init__(self, path, n_frames, training = False):
    """ Returns a set of frames with their associated label.

      Args:
        path: Video file paths.
        n_frames: Number of frames.
        training: Boolean to determine if training dataset is being created.
    """
    self.path = path
    self.n_frames = n_frames
    self.training = training
    self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
    self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

  def get_files_and_class_names(self):
    video_paths = list(self.path.glob('*/*.avi'))
    classes = [p.parent.name for p in video_paths]
    return video_paths, classes

  def __call__(self):
    video_paths, classes = self.get_files_and_class_names()

    pairs = list(zip(video_paths, classes))

    if self.training:
      random.shuffle(pairs)

    for path, name in pairs:
      video_frames = frames_from_video_file(path, self.n_frames)
      label = self.class_ids_for_name[name] # Encode labels
      yield video_frames, label
# Helper functions below used are taken from following tutorials
# https://www.tensorflow.org/tutorials/video/video_classification
# https://www.tensorflow.org/tutorials/video/transfer_learning_with_movinet

URL = 'https://storage.googleapis.com/thumos14_files/UCF101_videos.zip'
download_dir = pathlib.Path('../UCF101_subset/')
subset_paths = download_ufc_101_subset(URL,
                        num_classes = 10,
                        splits = {"train": 40, "val": 10, "test": 10},
                        download_dir = download_dir)

Prepare train, valid and test dataset

n_frames = 10
CLASSES = sorted(os.listdir('../UCF101_subset/train/'))

output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.uint8, name='image'),
                    tf.TensorSpec(shape = (), dtype = tf.int16, name='label'))

train_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['train'], n_frames, training=True),
                                          output_signature = output_signature)

val_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['val'], n_frames),
                                        output_signature = output_signature)

test_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['test'], n_frames),
                                         output_signature = output_signature)

Write data as TFRecords

Helper function to convert data as TF Sequence Example

def process_record(record):
  """
    Convert training samples to SequenceExample format. For more detailed
    explaination about SequenceExample, please check here
    https://www.tensorflow.org/api_docs/python/tf/train/SequenceExample

    Args:
      record: training example with image frames and corresponding label.

    Return:
      Return a SequenceExample which represents a
      sequence of features and some context.
  """
  seq_example = tf.train.SequenceExample()
  for example in record[0]:
    seq_example.feature_lists.feature_list.get_or_create(
        'image/encoded').feature.add().bytes_list.value[:] = [
                tf.io.encode_jpeg(example).numpy()
                ]
  seq_example.context.feature[
      'clip/label/index'].int64_list.value[:] = [record[1].numpy()]
  return seq_example
output_dir = '../ucf101_tfrecords/'
LOG_EVERY = 100
if not os.path.exists(output_dir):
  os.mkdir(output_dir)


def write_tfrecords(dataset, output_path, num_shards=1):
  """
      Convert training samples to tfrecords

      Args:
        dataset: Dataset as a iterator in (tfds format).
        output_path: Directory to store the tfrecords.
        num_shards: Split the tfrecords to sepecific number of shards.
  """

  writers = [
        tf.io.TFRecordWriter(
            output_path + '-%05d-of-%05d.tfrecord' % (i, num_shards))
        for i in range(num_shards)
    ]
  for idx, record in enumerate(dataset):
    if idx % LOG_EVERY == 0:
      print('On image %d', idx)
    seq_example = process_record(record)
    writers[idx % num_shards].write(seq_example.SerializeToString())

Write training data as TFRecords

output_train_tfrecs = output_dir + 'train'
write_tfrecords(train_ds, output_train_tfrecs, num_shards=10)

Write validation data as TFRecords

output_val_tfrecs = output_dir + 'valid'
write_tfrecords(val_ds, output_val_tfrecs, num_shards=5)

Write test data as TFRecords

output_test_tfrecs = output_dir + 'test'
write_tfrecords(test_ds, output_test_tfrecs, num_shards=5)

Experiment Configuration

Load the existing Configuration

import yaml

with open('./official/projects/video_ssl/configs/experiments/cvrl_linear_eval_k600.yaml', 'r') as file:
  override_params = yaml.full_load(file)


exp_config = exp_cfg.exp_factory.get_exp_config('video_ssl_linear_eval_kinetics600')

Override the configuration parameters

exp_config.override(override_params, is_strict=False)

WIDTH, HEIGHT = 224, 224

# Runtime configuration
exp_config.runtime.distribution_strategy = "mirrored"

# Task configuration
exp_config.task.freeze_backbone = True
exp_config.task.init_checkpoint = "../r3d_1x_k600_800ep/r3d_1x_k600_800ep_backbone-1"
exp_config.task.init_checkpoint_modules = "backbone"

# Model configuration
exp_config.task.model.projection_dim = 10

# Training data configuration
exp_config.task.train_data.input_path = '../ucf101_tfrecords/train*'
exp_config.task.train_data.num_classes=10
exp_config.task.train_data.global_batch_size = 2
exp_config.task.train_data.min_image_size = WIDTH
exp_config.task.train_data.num_examples = 400
exp_config.task.train_data.feature_shape = (n_frames, HEIGHT, WIDTH, 3)

# Validation data configuration
exp_config.task.validation_data.num_classes=10
exp_config.task.validation_data.input_path = '../ucf101_tfrecords/valid*'
exp_config.task.validation_data.global_batch_size = 2
exp_config.task.validation_data.min_image_size = WIDTH
exp_config.task.validation_data.num_examples = 100
exp_config.task.validation_data.feature_shape = (n_frames, HEIGHT, WIDTH, 3)

# Trainer configuration

exp_config.trainer.train_steps = 2000
exp_config.trainer.checkpoint_interval = 200
exp_config.trainer.steps_per_loop = 200
exp_config.trainer.summary_interval = 200
exp_config.trainer.validation_interval = 200
exp_config.trainer.validation_steps = 200
exp_config.trainer.optimizer_config.learning_rate.cosine.decay_steps = 2000
exp_config.trainer.optimizer_config.learning_rate.cosine.initial_learning_rate = 0.008
exp_config.trainer.optimizer_config.warmup.linear.warmup_learning_rate = 0.007
exp_config.trainer.optimizer_config.warmup.linear.warmup_steps = 200

Set up the distribution strategy

# Detect hardware
try:
  tpu_resolver = tf.distribute.cluster_resolver.TPUClusterResolver() # TPU detection
except ValueError:
  tpu_resolver = None
  gpus = tf.config.experimental.list_logical_devices("GPU")

# Select appropriate distribution strategy
if tpu_resolver:
  tf.config.experimental_connect_to_cluster(tpu_resolver)
  tf.tpu.experimental.initialize_tpu_system(tpu_resolver)
  distribution_strategy = tf.distribute.experimental.TPUStrategy(tpu_resolver)
  print('Running on TPU ', tpu_resolver.cluster_spec().as_dict()['worker'])
elif len(gpus) > 1:
  distribution_strategy = tf.distribute.MirroredStrategy([gpu.name for gpu in gpus])
  print('Running on multiple GPUs ', [gpu.name for gpu in gpus])
elif len(gpus) == 1:
  distribution_strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU
  print('Running on single GPU ', gpus[0].name)
else:
  distribution_strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU
  print('Running on CPU')

print("Number of accelerators: ", distribution_strategy.num_replicas_in_sync)

Display the final configuration

pp.pprint(exp_config.as_dict())
display.Javascript('google.colab.output.setIframeHeight("500px");')

Create the Task object (tfm.core.base_task.Task) from the config_definitions.TaskConfig.

The Task object has all the methods necessary for building the dataset, building the model, and running training & evaluation. These methods are driven by tfm.core.train_lib.run_experiment.

model_dir = '../trained_model/'

with distribution_strategy.scope():
  task = task_factory.get_task(exp_config.task, logging_dir=model_dir)

Visualization of Train Data

frames, label = list(train_ds.take(1))[0]
print(CLASSES[label])
to_gif(frames)

Train and evaluate

model, eval_logs = train_lib.run_experiment(
    distribution_strategy=distribution_strategy,
    task=task,
    mode='train_and_eval',
    params=exp_config,
    model_dir=model_dir)

Load logs in tensorboard

%load_ext tensorboard
%tensorboard --logdir '../trained_model'

Saving and exporting the trained model

export_dir = '../exported_model/'

export_saved_model_lib.export_inference_graph(
    input_type='image_tensor',
    batch_size=1,
    input_image_size=[n_frames, HEIGHT, WIDTH],
    params=exp_config,
    checkpoint_path=tf.train.latest_checkpoint(model_dir),
    export_dir=export_dir)

Importing SavedModel

imported = tf.saved_model.load(export_dir)
model_fn = imported.signatures['serving_default']

Visualize predictions

frames, label = list(test_ds.shuffle(buffer_size=90).take(1))[0]
frames = tf.expand_dims(frames, axis=0)
result = model_fn(frames)
predicted_label = tf.argmax(result['probs'][0])
print(f"Actual: {CLASSES[label]}")
print(f"Predicted: {CLASSES[predicted_label]}")
to_gif(frames[0])