tensorflow/python/ops/numpy_ops/g3doc/TensorFlow_Numpy_Distributed_Image_Classification.ipynb
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
TensorFlow implements a subset of the NumPy API, available as tf.experimental.numpy. This allows running NumPy code, accelerated by TensorFlow together with access to all of TensorFlow's APIs. Please see TensorFlow NumPy Guide to get started.
Here you will learn how to build a deep model for an image classification task by using TensorFlow Numpy APIs. For using higher level tf.keras APIs, see the following tutorial.
tf.experimental.numpy will be available in the stable branch starting from TensorFlow 2.4. For now, it is available in nightly.
!pip install --quiet --upgrade tf-nightly
!pip install --quiet --upgrade tensorflow-datasets
import collections
import functools
import matplotlib.pyplot as plt
import os
import tempfile
import tensorflow as tf
import tensorflow.experimental.numpy as tnp
import tensorflow_datasets as tfds
gpus = tf.config.list_physical_devices('GPU')
if gpus:
tf.config.set_logical_device_configuration(gpus[0], [
tf.config.LogicalDeviceConfiguration(memory_limit=128),
tf.config.LogicalDeviceConfiguration(memory_limit=128)])
devices = tf.config.list_logical_devices('GPU')
else:
cpus = tf.config.list_physical_devices('CPU')
tf.config.set_logical_device_configuration(cpus[0], [
tf.config.LogicalDeviceConfiguration(),
tf.config.LogicalDeviceConfiguration()])
devices = tf.config.list_logical_devices('CPU')
print("Using following virtual devices", devices)
Mnist contains 28 * 28 images of digits from 0 to 9. The task is to classify the images as these 10 possible classes.
Below, load the dataset and examine a few samples.
NUM_CLASSES = 10
BATCH_SIZE = 64
INPUT_SIZE = 28 * 28
def process_data(data_dict):
images = tnp.asarray(data_dict['image']) / 255.0
images = images.reshape(-1, INPUT_SIZE).astype(tnp.float32)
labels = tnp.asarray(data_dict['label'])
labels = tnp.eye(NUM_CLASSES, dtype=tnp.float32)[labels]
return images, labels
with tf.device("CPU:0"):
train_dataset = tfds.load('mnist', split='train', shuffle_files=True,
batch_size=BATCH_SIZE).map(process_data)
test_dataset = tfds.load('mnist', split='test', shuffle_files=True,
batch_size=-1)
x_test, y_test = process_data(test_dataset)
# Plots some examples.
images, labels = next(iter(train_dataset.take(1)))
_, axes = plt.subplots(1, 8, figsize=(12, 96))
for i, ax in enumerate(axes):
ax.imshow(images[i].reshape(28, 28), cmap='gray')
ax.axis("off")
ax.set_title("Label: %d" % int(tnp.argmax(labels[i])))
Here, you will implement a multi-layer perceptron model that trains on the MNIST data. First, define a Dense class which applies a linear transform followed by a "relu" non-linearity.
class Dense(tf.Module):
def __init__(self, units, use_relu=True):
self.wt = None
self.bias = None
self._use_relu = use_relu
self._built = False
self._units = units
def __call__(self, inputs):
if not self._built:
self._build(inputs.shape)
x = tnp.add(tnp.matmul(inputs, self.wt), self.bias)
if self._use_relu:
return tnp.maximum(x, 0.)
else:
return x
@property
def params(self):
assert self._built
return [self.wt, self.bias]
def _build(self, input_shape):
size = input_shape[1]
stddev = 1 / tnp.sqrt(size)
# Note that model parameters are `tf.Variable` since they requires
# mutation, which is currently unsupported by TensorFlow NumPy.
# Also note interoperation with TensorFlow APIs below.
self.wt = tf.Variable(
tf.random.truncated_normal(
[size, self._units], stddev=stddev, dtype=tf.float32))
self.bias = tf.Variable(tf.zeros([self._units], dtype=tf.float32))
self._built = True
Next, create a Model object that applies two non-linear Dense transforms,
followed by a linear transform.
class Model(tf.Module):
"""A three layer neural network."""
def __init__(self):
self.layer1 = Dense(128)
self.layer2 = Dense(32)
self.layer3 = Dense(NUM_CLASSES, use_relu=False)
def __call__(self, inputs):
x = self.layer1(inputs)
x = self.layer2(x)
return self.layer3(x)
@property
def params(self):
return self.layer1.params + self.layer2.params + self.layer3.params
Checkout the following methods for performing training and evaluation.
def forward(model, inputs, labels):
"""Computes prediction and loss."""
logits = model(inputs)
# TensorFlow's loss function has numerically stable implementation of forward
# pass and gradients. So we prefer that here.
loss = tf.nn.softmax_cross_entropy_with_logits(labels, logits)
mean_loss = tnp.mean(loss)
return logits, mean_loss
def compute_gradients(model, inputs, labels):
"""Computes gradients of loss based on `labels` and prediction on `inputs`."""
with tf.GradientTape() as tape:
tape.watch(inputs)
_, loss = forward(model, inputs, labels)
gradients = tape.gradient(loss, model.params)
return gradients
def compute_sgd_updates(gradients, learning_rate):
"""Computes parameter updates based on SGD update rule."""
return [-learning_rate * grad for grad in gradients]
def apply_updates(model, updates):
"""Applies `update` to `model.params`."""
for param, update in zip(model.params, updates):
param.assign_add(update)
def evaluate(model, images, labels):
"""Evaluates accuracy for `model`'s predictions."""
prediction = model(images)
predicted_class = tnp.argmax(prediction, axis=-1)
actual_class = tnp.argmax(labels, axis=-1)
return float(tnp.mean(predicted_class == actual_class))
NUM_EPOCHS = 10
@tf.function
def train_step(model, input, labels, learning_rate):
gradients = compute_gradients(model, input, labels)
updates = compute_sgd_updates(gradients, learning_rate)
apply_updates(model, updates)
# Creates and build a model.
model = Model()
accuracies = []
for _ in range(NUM_EPOCHS):
for inputs, labels in train_dataset:
train_step(model, inputs, labels, learning_rate=0.1)
accuracies.append(evaluate(model, x_test, y_test))
def plot_accuracies(accuracies):
plt.plot(accuracies)
plt.xlabel("epoch")
plt.ylabel("accuracy")
plt.title("Eval accuracy vs epoch")
plot_accuracies(accuracies)
# A temporary directory to save our models into.
dir = tempfile.TemporaryDirectory()
# We take our model, and create a wrapper for it.
class SaveableModel(Model):
@tf.function
def __call__(self, inputs):
return super().__call__(inputs)
saveable_model = SaveableModel()
# This saves a concrete function that we care about.
outputs = saveable_model(x_test)
# This saves the model to disk.
tf.saved_model.save(saveable_model, dir.name)
loaded = tf.saved_model.load(dir.name)
outputs_loaded = loaded(x_test)
# Ensure that the loaded model preserves the weights
# of the saved model.
assert tnp.allclose(outputs, outputs_loaded)
Next, run mirrored training on multiple GPUs. Note that the GPUs used here are virtual and map to the same physical GPU.
First, define a few utilities to run replicated computation and reductions.
Checkout primitives below for function replication and distributed reduction.
import threading
import queue
# Note that this code currently relies on dispatching operations from python
# threads.
class ReplicatedFunction(object):
"""Creates a callable that will run `fn` on each device in `devices`."""
def __init__(self, fn, devices, **kw_args):
self._shutdown = False
def _replica_fn(device, input_queue, output_queue):
while not self._shutdown:
inputs = input_queue.get()
with tf.device(device):
output_queue.put(fn(*inputs, **kw_args))
self.threads = []
self.input_queues = [queue.Queue() for _ in devices]
self.output_queues = [queue.Queue() for _ in devices]
for i, device in enumerate(devices):
thread = threading.Thread(
target=_replica_fn,
args=(device, self.input_queues[i], self.output_queues[i]))
thread.start()
self.threads.append(thread)
def __call__(self, *inputs):
all_inputs = zip(*inputs)
for input_queue, replica_input, in zip(self.input_queues, all_inputs):
input_queue.put(replica_input)
return [q.get() for q in self.output_queues]
def __del__(self):
self._shutdown = True
for t in self.threads:
t.join(3)
self.threads = None
def collective_mean(inputs, num_devices):
"""Performs collective mean reduction on inputs."""
outputs = []
for instance_key, inp in enumerate(inputs):
outputs.append(tnp.asarray(
tf.raw_ops.CollectiveReduce(
input=inp, group_size=num_devices, group_key=0,
instance_key=instance_key, merge_op='Add', final_op='Div',
subdiv_offsets=[])))
return outputs
# This is similar to `train_step` except for an extra collective reduction of
# gradients
@tf.function
def replica_step(model, inputs, labels,
learning_rate=None, num_devices=None):
gradients = compute_gradients(model, inputs, labels)
# Note that each replica performs a reduction to compute mean of gradients.
reduced_gradients = collective_mean(gradients, num_devices)
updates = compute_sgd_updates(reduced_gradients, learning_rate)
apply_updates(model, updates)
models = [Model() for _ in devices]
# The code below builds all the model objects and copies model parameters from
# the first model to all the replicas.
def init_model(model):
model(tnp.zeros((1, INPUT_SIZE), dtype=tnp.float32))
if model != models[0]:
# Copy the first models weights into the other models.
for p1, p2 in zip(model.params, models[0].params):
p1.assign(p2)
with tf.device(devices[0]):
init_model(models[0])
# Replicate and run the parameter initialization.
ReplicatedFunction(init_model, devices[1:])(models[1:])
# Replicate the training step
replicated_step = ReplicatedFunction(
replica_step, devices, learning_rate=0.1, num_devices=len(devices))
accuracies = []
print("Running distributed training on devices: %s" % devices)
for _ in range(NUM_EPOCHS):
for inputs, labels in train_dataset:
replicated_step(models,
tnp.split(inputs, len(devices)),
tnp.split(labels, len(devices)))
accuracies.append(evaluate(models[0], x_test, y_test))
plot_accuracies(accuracies)