Distributed TensorFlow vs MPI TensorFlow

4,594 views
Skip to first unread message

Alex Sergeev

unread,
May 30, 2017, 5:02:13 PM5/30/17
to dis...@tensorflow.org
We have recently tried Baidu all-reduce technique http://research.baidu.com/bringing-hpc-techniques-deep-learning/.

We were very impressed with simplicity that it offers from user perspective and we want to talk about using it as preferred method of distributed training for TensorFlow.

Compare these two code fragments.

import argparse
import sys

import tensorflow as tf

FLAGS
= None

def main(_):
  ps_hosts
= FLAGS.ps_hosts.split(",")
  worker_hosts
= FLAGS.worker_hosts.split(",")

 
# Create a cluster from the parameter server and worker hosts.
  cluster
= tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

 
# Create and start a server for the local task.
  server
= tf.train.Server(cluster,
                           job_name
=FLAGS.job_name,
                           task_index
=FLAGS.task_index)

 
if FLAGS.job_name == "ps":
    server
.join()
 
elif FLAGS.job_name == "worker":

   
# Assigns ops to the local worker by default.
   
with tf.device(tf.train.replica_device_setter(
        worker_device
="/job:worker/task:%d" % FLAGS.task_index,
        cluster
=cluster)):

     
# Build model...
      loss
= ...
      global_step
= tf.contrib.framework.get_or_create_global_step()

      train_op
= tf.train.AdagradOptimizer(0.01).minimize(
          loss
, global_step=global_step)

   
# The StopAtStepHook handles stopping after running given steps.
    hooks
=[tf.train.StopAtStepHook(last_step=1000000)]

   
# The MonitoredTrainingSession takes care of session initialization,
   
# restoring from a checkpoint, saving to a checkpoint, and closing when done
   
# or an error occurs.
   
with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief
=(FLAGS.task_index == 0),
                                           checkpoint_dir
="/tmp/train_logs",
                                           hooks
=hooks) as mon_sess:
     
while not mon_sess.should_stop():
       
# Run a training step asynchronously.
       
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
       
# perform *synchronous* training.
       
# mon_sess.run handles AbortedError in case of preempted PS.
        mon_sess
.run(train_op)

if __name__ == "__main__":
  parser
= argparse.ArgumentParser()
  parser
.register("type", "bool", lambda v: v.lower() == "true")
 
# Flags for defining the tf.train.ClusterSpec
  parser
.add_argument(
     
"--ps_hosts",
      type
=str,
     
default="",
      help
="Comma-separated list of hostname:port pairs"
 
)
  parser
.add_argument(
     
"--worker_hosts",
      type
=str,
     
default="",
      help
="Comma-separated list of hostname:port pairs"
 
)
  parser
.add_argument(
     
"--job_name",
      type
=str,
     
default="",
      help
="One of 'ps', 'worker'"
 
)
 
# Flags for defining the tf.train.Server
  parser
.add_argument(
     
"--task_index",
      type
=int,
     
default=0,
      help
="Index of task within the job"
 
)
  FLAGS
, unparsed = parser.parse_known_args()
  tf
.app.run(main=main, argv=[sys.argv[0]] + unparsed)

2) MPI:
import argparse
import sys

import tensorflow as tf import tensorflow.contrib.mpi as mpi

FLAGS
= None

def main(_):
    # Build model...
    loss
= ...
    global_step
= tf.contrib.framework.get_or_create_global_step()

    opt
= tf.train.AdagradOptimizer(0.01)     # Add MPI Distributed Optimizer
opt = mpi.DistributedOptimizer(opt) train_op = opt.minimize(loss, global_step=global_step)

    # The StopAtStepHook handles stopping after running given steps.
    hooks
=[tf.train.StopAtStepHook(last_step=1000000)]
    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done     # or an error occurs.
    with tf.train.MonitoredTrainingSession(checkpoint_dir="/tmp/train_logs",
                                           hooks
=hooks, session_creator=mpi.SessionCreator()) as mon_sess:
     
while not mon_sess.should_stop():
       
# Perform synchronous training.
        mon_sess.run(train_op)

if __name__ == "__main__":
  parser
= argparse.ArgumentParser()
  # Your other flags go here.   FLAGS, unparsed = parser.parse_known_args()
  tf
.app.run(main=main, argv=[sys.argv[0]] + unparsed)

Code above looks exactly like what somebody would write if they were writing single-GPU code.  All the distribution is done by adding mpi.DistributedOptimizer() and mpi.SessionCreator().  Latter doesn't exist in Baidu MPI because it predated MonitoredTrainingSessions, but it can be easily added along the lines of:
class MPISessionCreator(SessionCreator):
def create_session(self):
gpu_to_use = mpi.local_rank().eval()
return mpi.Session(gpu=gpu_to_use)

Additionally, NCCL 2.0 is around the corner, which should improve all-reduce performance even further.

I realize that it takes some efforts to set up MPI, but once you make that investment from platform perspective user benefit is very significant.

What does community feel about doing distributed training via all-reduce route?

--
Thanks,
Alex

Jim Dowling

unread,
Jun 19, 2017, 6:16:30 PM6/19/17
to Discuss
Hi Alex

SparkOnTensorflow also supports Infiniband/RDMA as a transport, although I believe it no AllReduce.  Facebook scaled out caffe2 to traini imagenet in 1 hr using 256 GPUs, and they included an AllReduce variant with 2-level reductions of gradients (once per host - each with 8 GPUs, then between all the 48 servers).

Is your proposal because of support for the AllReduce algorithm of MPI  or the API or both?
SparkOnTensorflow has a very nice API as well for generating the ClusterSpec object using a cluster mgr like YARN.

Jim

ahai...@hawk.iit.edu

unread,
Aug 9, 2017, 1:56:16 PM8/9/17
to Discuss
Hi Jim, Alex, 

I have also been using the MPI_Allreduce primitive for running distributed tensorflow. I find that this gives me significant speedup (10X) when compared to using standard distributed tensorflow. The all_reduce primitive seems to be the standard for doing synchronous training: https://research.fb.com/wp-content/uploads/2017/06/imagenet1kin1h5.pdf and http://dl.acm.org/citation.cfm?id=3018769&CFID=970492824&CFTOKEN=96794833

Like Alex mentioned, the code is much simpler and performance is also much better. 

Bairen YI

unread,
Aug 9, 2017, 3:20:12 PM8/9/17
to Discuss
Hi,

All reduce is great for sync training, and while we could discuss about the parameter server versus all reduce approach for async/sync data parallelism and scalability, the performance gap may not solely comes from the communication strategy. 

A recent gRPC upgrade in TF has significantly improved the distributed runtime performance for large tensor transmission (around 3-5 times) due to a performance bug fix in gRPC side (see #6116 for relevant discussion). This is not available before the v1.3 release line. 

On the other side, if you do have InfiniBand/RoCE/iWarp networks, you could try some of the more specialized comm protocols in TF, such as grpc+gdr, grpc+verbs, or even grpc+mpi. By comparing against those protocol to the all reduce implementation, we should be able to get the performance difference solely from communication strategy.

Interest declaration: I'm the author of grpc+gdr. The patch #11392 has just been merged into current master. It reaches 3.6 GB/s transmission rate for tensors larger than 100MB, 2-4x faster than grpc (0.8 GB/s) and grpc+verbs (1.5 GB/s) comm protocols. If you have any comments or feedbacks using it, let me know!

Bairen Yi
M.Phil. Student in Computer Science and Engineering,
HKUST



Yi Bairen
M.Phil. Student in Computer Science and Engineering,
HKUST
--
You received this message because you are subscribed to the Google Groups "Discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to discuss+u...@tensorflow.org.
To post to this group, send email to dis...@tensorflow.org.
To view this discussion on the web visit https://groups.google.com/a/tensorflow.org/d/msgid/discuss/5ed84a5e-8793-486c-8317-ce5b3a6eb8e2%40tensorflow.org.

Alex Sergeev

unread,
Aug 10, 2017, 3:13:03 PM8/10/17
to ahai...@hawk.iit.edu, Discuss
Hi Adnan, Jim, and everyone,

Internally at Uber, we also found that all_reduce greatly improves (1) usability and (2) performance.

(1) We found it much easier to explain and use MPI version, compared to traditional distributed TensorFlow.  One of the main benefits is simplicity to user - the are no more workers or parameter servers, user can simply write single GPU code and then sprinkle few operations to make training distributed.

(2) We also observed large performance improvements compared to regular distributed TensorFlow.  We will share full benchmarking results in near future, but to give you a perspective on 16 GPUs we get 15.6x scale for Inception V3 and 13.9x scale for VGG-16 (which is notoriously hard to scale).  Parameter Server method gets 11.9x for Inception V3 and just 2.4x for VGG-16.

Today I'm happy to announce that we open-sourced TensorFlow plugin called Horovod that turns regular TensorFlow installation into MPI-distributed one.  It is available on GitHub - https://github.com/uber/horovod, and we've come to rely on it internally.

This is 0.9.0 release because we're actively seeking external feedback.

Thanks,
Alex

--

ahai...@hawk.iit.edu

unread,
Aug 11, 2017, 10:05:22 AM8/11/17
to Discuss, ahai...@hawk.iit.edu
Hi Alex,

I looked through horovod. Great library! My implementation is very similar to your approach. Essentially having a tensorflow op that does the all_reduce call. I don't make use of NCCL, so that is a great add! I think this would be a great addition if this can get merged into tensorflow. Is Uber planning on doing such a thing? 

Alex Sergeev

unread,
Aug 11, 2017, 3:58:16 PM8/11/17
to ahai...@hawk.iit.edu, Discuss
Thanks!  Yes, we can have a conversation about making Horovod part of TensorFlow repo.

We should still keep it as a separate PyPI package though, so that users don't have to build whole TensorFlow from source with Bazel.  There is precedent for it in TensorFlow serving which is distributed as a separate Debian + PyPI package, which makes it much more convenient to use.

Meanwhile, we are encouraging everyone to use existing horovod PyPI package and send us feedback :-)

Thanks,
Alex

buvaneswa...@gmail.com

unread,
Aug 11, 2017, 7:09:01 PM8/11/17
to Discuss, ahai...@hawk.iit.edu
Hello Alex, 

Very promising result. When you say it is version 0.9.0, do you mean its version 0.9.0 of Horovod? I hope this is compatible with latest tensorflow version... please confirm

-Buvana

Alex Sergeev

unread,
Aug 11, 2017, 7:17:50 PM8/11/17
to buvaneswa...@gmail.com, Discuss, ahai...@hawk.iit.edu
Thanks Buvana,

Yes - 0.9.0 is version of Horovod.  

Horovod has been tested with TensorFlow 1.3.0rc2 and TensorFlow 1.2.1.  It should work with prior versions, too - if it doesn't please raise an issue.

Thanks,
Alex

--
You received this message because you are subscribed to the Google Groups "Discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to discuss+u...@tensorflow.org.
To post to this group, send email to dis...@tensorflow.org.

Jim Dowling

unread,
Aug 15, 2017, 5:47:14 PM8/15/17
to Discuss
Hi Alex
This is wonderful! We have been staring at NCCL thinking we should do something like Facebook, and now you've done it, and open-sourced it!

Are you working on integrating Horovod with Spark, in a similar manner that Yahoo integrated TensorflowOnSpark? Or a cluster scheduler like YARN or Mesos? Or some other workflow engine like Airflow?

Jim

Jim Dowling

unread,
Aug 15, 2017, 5:57:10 PM8/15/17
to Discuss
I noticed that its using GPUDirect for the allgather and broadcast operations.
What's the fallback behaviour if you don't have a Tesla card? (there's lots of 1080Ti clusters popping up). Have you looked at doing per-host aggregation of gradients and overlapping I/O with backprop by broadcasting after every layer? 

Jim 

Alex Sergeev

unread,
Aug 16, 2017, 9:23:33 AM8/16/17
to Jim Dowling, Discuss
Hi Jim,

Thanks!  I'm glad that you're finding Horovod useful.  Would be great if you give it a spin and share your feedback.

To answer your questions:

1) We're focusing on running TensorFlow inside containers, such as Mesos.  We do use cluster manager like Aurora to provision Mesos containers for us.  We find containerized experience to be very similar to running standalone TensorFlow, so folks internally have relatively easy time jumping from standalone to distributed.

2) We do support GPUDirect indeed.  NCCL uses it for allreduce if it can find it, but it falls back to regular RDMA if it can't, which is still quite fast.  Allgather & broadcast use GPUDirect if MPI is built with CUDA support.  Those two ops usually have less traffic (allgather is used for sparse tensors, like embeddings) and broadcast is used once per Session initialization, so not having MPI-GPUDirect is typically not a big deal.  Does your use case fall into this pattern, or you do heavily use allgather / broadcast?

3) We do overlapping of I/O with backprop by reducing layer-by-layer.  This is accomplished by having workers send rank 0 metadata about tensors they're ready to reduce, and rank 0 broadcasting to all workers order in which tensors are to be reduced.  Haven't looked in doing per-host aggregation yet.

Thanks,
Alex

Ronica Jethwa

unread,
May 24, 2018, 9:35:02 PM5/24/18
to Discuss
Hello,
I noticed that the config parameter attaches a single process to a single GPU instance. Is there any way to have multiple processes running on a single gpu ?
I have a case where a single process does not exhaust the GPU in terms of memory and utilization. Having every single GPU under-utilized in a cluster is not very preferable. 
I have tried to incorporate resource allocation by specifying the per process gpu consumption but to no avail.

Looking for any guidance,
Ronica
Reply all
Reply to author
Forward
0 new messages