A quick guide to distributed training with TensorFlow and Horovod on Amazon SageMaker

Original article can be found here (source): Deep Learning on Medium

A quick guide to distributed training with TensorFlow and Horovod on Amazon SageMaker

Learn how distributed training works and how Amazon SageMaker makes it as easy as training on your laptop

Distribute training on multiple GPUs using horovod and Amazon SageMaker for faster training and increased productivity

In deep learning, more is better. More data, more layers, and more compute power, usually leads to higher accuracy, and better robustness of trained models.

I may not be able to help you collect more data, but I can show how you can do distributed training on a large number of machines to train faster and run more experiments and increase your productivity.

In this blog post, I’m going to cover how you can run distributed training, without managing infrastructure — no instances to launch, no clusters to setup, no storage volumes to manage, and no containers to build. Bring in your training scripts, specify the number of GPUs, and let Amazon SageMaker handle the rest.

In the first part of this guide, I’ll provide step-by-step instructions for updating your training scripts to use the Horovod library. For distributed training to work, training processes on different GPUs need to communicate. Horovod enables this seamless communication and provides a convenient API to prepare your training scripts for distribution training. The changes you make are agnostic to the number of GPUs, so it’s a one time effort.

In the second part of this guide I’ll show how you can take your updated training scripts and run them at-scale using Amazon SageMaker on as many GPUs as you want, or as little as you need — just by changing a single line of code.

Want to follow along and run examples as you read? Jupyter notebook, and training scripts are available here:

https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker

Horovod and the ring all-reduce approach

Horovod is a distributed deep learning framework that supports popular deep learning frameworks — TensorFlow, Keras, PyTorch, and Apache MXNet. The example in this guide uses TensorFlow and Keras. If you’re a PyTorch or MXNet user updating your scripts will follow a very similar process as described here. The Horovod documentation page also includes plenty of examples for other frameworks.

During distributed training, multiple processes need to communicate with each other. To enable communication between training processes, Horovod uses a communication protocol called Message Passing Interface (MPI). And to average gradients and update all copies of the models, it uses an approach called ring-allreduce (we’ll come back to this). These approaches are not new and have been used for many years by scientists, researchers and engineers working in High-Performance Computing (HPC) to solve problems in computational fluid dynamics, molecular dynamics, computer graphics and others.

MPI itself defines basic concepts for sending and receiving information between multiple processes in a cluster such as allreduce, allgather, and broadcast. And as you may have deduced from their names — allgather gathers data from all processes (in the case of deep learning, gradients). Broadcast, broadcasts data (gradients) from one process to every other process. allreduce (conceptually speaking) combines these two operations — gathers data from all processes, performs a reduction operation (for example, averaging gradients) and then broadcasts (the averaged gradients).

Illustration of ring allreduce with 6 processes on 3 machines with 2 GPUs each. rank is a global unique ID and local rank is a local unique ID for each GPU

As the number of processes that need to communicate increases, this leads to increased communication overhead.

Ring all-reduce improves upon this process by making communication cost independent of the number of processes in the system.

It does this by arranging processes in a logical ring where each process only receives data from it’s “left” neighbor and sends data to it’s “right” neighbor as illustrated in the accompanying figure.

The ring-allreduce process for deep learning is described in the Horovod blog post and in further depth in the Horovod paper. To use the horovod library, you don’t really need to know how ring-allreduce works, but it always helps to have an intuition about how algorithms and libraries you use work.

To use update your training script to use the Horovod library, you primarily need to know the following key concepts:

  • Size: total number of processes/GPUs. This is equal to the product of the number of compute instances in your cluster times the number of GPUs per instance. For example, if you have 2 x p3.16xlarge EC2 instances. Size would be 2 (instances) x 8 (GPUs) = 16.
  • Rank: Unique process ID (size — 1). Each process in a GPU knows it’s unique rank.
  • Local rank: Unique process ID within a machine. For example, the local rank of a GPU in each p3.16xlarge EC2 instance with 8 GPUs would range from 0–7.

What happens during distributed training?

For the purpose of illustration, let’s take an example of a distributed training job on 2 GPUs — these could be on the same of different systems, it doesn’t matter. Here’s what happens behind the scenes:

Illustration of what happens during distributed training with 2 training processes

Step 1: During the forward pass, it’s business as usual. Each copy of the model does a forward pass with a batch_size of data that it receives.

Step 2: A backward pass is then performed to compute the gradients. But the gradient is NOT used to update the weights yet.

Step 3: Horovod now does an allreduce operation (average gradients and then broadcast) to all processes. In this example to both GPUs.

Step 4: The final allreduced gradients are now used to update each model

By allowing each GPU to train on different batches of data, and allreducing the gradients, you’re effectively training on a larger batch and therefore speeding up training.

Updating your training script to use Horovod API

For this demo, I’ll use the CIFAR-10 dataset which consists of 60,000 32×32 images belonging to 10 different classes (6,000 images per class). The training script is available on blog post’s GitHub repository along with Jupyter notebook to run the full example:

https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker

This section describe the changes that were made to the following files to prepare them for distributed training :

To make it easier for you to follow along, I’ve included the exact same section heading as a comment in the above scripts. Look for “Change NUMBER

Look for the exact same section heading as a comment in the above scripts

Change 1: Import horovod and keras backend

Put these at the top of your training script to import horovod.

import horovod.tensorflow.keras as hvd
import tensorflow.keras.backend as K

Change 2: Initialize horovod and get the size of the cluster

Initialize horovod and get the total number of GPUs in your cluster. If you’re only running this on CPUs then this will be equal to the total number of instances.

hvd.init()
size = hvd.size()

Change 3 — Pin GPU to local process (one GPU per process)

Pin a GPU to current process.

config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))

Change 4: Scale the learning using the size of the cluster (total number of workers)

Update the learning rate by scaling it by number of GPUs. The effective batch during distributed training is batch_size times hvd.size(). This change is in model_def.py

By increasing the learning rate, you compensate for the effective increase in batch size.

opt = SGD(lr=lr * size, decay=weight_decay, momentum=momentum)

Change 5: Wrap your Keras optimizer using Horovod to make it a distributed optimizer

The distributed optimizer does the magic of averaging gradients and broadcasting it using allreduce or allgather, and then updating the weights with the averaged gradients. This change is in model_def.py

opt = hvd.DistributedOptimizer(opt)

Change 6: Add callbacks for syncing initial state, and saving checkpoints only on 1st worker (rank 0)

Change 7: Update the number of steps/epoch

You’ll need to divide the total number of images/batch by the number of GPUs.

Change 8: Update script to accept hyperparameters as command line arguments

Amazon SageMaker will pass these values to the script when it launches a distributed training job.

Run distributed training on Amazon SageMaker

You’re now done with the hard part — modifying your training script to make it distributed ready.

The rest of the process — distributed training — is relatively straightforward using Amazon SageMaker.

To run a distributed training job using Amazon SageMaker, download and install the SageMaker Python SDK. For a more convenient experience, you can also launch an Amazon SageMaker notebook instance which comes with Jupyter Notebook server, SageMaker Python SDK and popular deep learning frameworks pre-installed.

Running a SageMaker training job involves only two key steps which I’ll highlight below:

  1. Creating a SageMaker TensorFlow Estimator
  2. Calling the fit() function

The following code excerpts are from the following Jupyter Notebook in the blog post’s repository.

https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker/blob/master/cifar10-sagemaker-distributed.ipynb

Using the SDK, you need to specify the following details so that Amazon SageMaker can get the requested resources and prepare for training

  • Your training script
  • Directory with training script dependencies
  • Location to save trained models
  • Type of CPU or GPU instance you want to training on
  • Number of GPUs per instance
  • TensorFlow version
  • Distribution type — MPI (used by Horovod) or parameter server (an alternative approach to distributed training)

There are many more options you can specify in the SageMaker TensorFlow estimator and you can a full list in the documentation: https://sagemaker.readthedocs.io/en/stable/index.html

The implementation is as follows:

After defining the estimator, you’ll need to specify the paths to your training, validation and test datasets are in Amazon S3, and pass it to the estimator’s fit function.

And you’re done! Sit back and wait for the distributed training job to complete.

You can (and should) monitor progress, which I’ll cover in the next section, but first, let’s take a closer look at what’s happening behind the scenes.