Possible RFCs for a thread-safe graph and flexible concurrency

64 views
Skip to first unread message

Gabriel Perdue

unread,
Sep 28, 2018, 2:26:21 PM9/28/18
to TensorFlow Developers
Hello,

I'd like to start a discussion on a pair of possible RFCs on behalf of the High Energy Physics community. I work at Fermilab and am here representing the interests of several experiments.

The short version - in particle physics we do a lot of very large scale batch processing (several hundred thousand concurrent processes). Our goal is generally "high throughput computing" - we want to process an enormous number of data chunks as fast as possible, but we don't care about the order in which they are processed at all. For maximum efficiency in a batch environment, we would like to be able to more easily share the computation graph. This means we need a thread-safe graph. We also would like more hooks to manage threads, something like a plug-in mechanism for concurrency engines. We'd like to start discussion around two possible RFCs - one for a thread-safe graph and one for more flexibility in the concurrency engine.

In more detail, the CMS experiment, for example, and other 'High Energy Physics' (HEP) experiments in general, have executables which processes a large number of independent data chunks (an individual process might handle from a thousand to a hundred million chunks). Each chunk needs to be processed by a large number of tasks (from a few tens to a few thousand) where each task is doing a transformation on a given chunk. Our processes use multiple threads where we can process multiple chunks in parallel as well as multiple independent tasks all transforming parts of the same data chunk. One of the major limiting factors of our processes is memory where we need to limit the memory shared by all threads as well as limiting the memory used by a given thread. Another constraint is our processes must restrict the number of processing threads they use to a number assigned to them at process start. This is needed since we use computing resources shared by multiple users with each user assigned a certain number of computing cores they are allowed to use. (NOTE: a small number of primarily 'waiting' threads is permissible, as long as the processing time used by those threads is negligible.)

Some of our processing tasks use TensorFlow inferences. This can mean we use the same graph description to processes different data chunks concurrently or we might have different graph descriptions processing the same (or a different) data chunk concurrently.

We face two challenges with our use of TensorFlow: memory and computing core scheduling. On the memory side, we must replicate each graph description so that each simultaneous calls to evaluate that graph has its own copy of the graph. This limits the size of a graph we can use and therefore limits the accuracy of the inference which limits our scientific results. We would much prefer to share as much of the inference graph description between the different threads as possible and limit the amount of memory needed to process a given inference with a given input data. On the computing core scheduling side, we use the Intel Thread Building Blocks (TBB) library to manage a thread pool of processing tasks. We tell TBB the total number of cores it is allowed to use and that sets our processing limits for the process. In order to use TensorFlow based inferences in this system, we hacked a new Session class to not use any threads. This allowed the inference to be evaluated completely on TBB managed thread which the task needed the inference is using. This works, but we would like to be able to do better. There are cases where the total number of TBB tasks available to the process at a given time is less than the number of threads available to TBB. Under those cases, we would like to be able to use the underutilized threads to speed up the TensorFlow inference calculation. This could be accomplished if TensorFlow had a mechanism to allow alternative concurrency engines to be 'plugged into' the system. We would then use those APIs to implement a TBB based concurrency engine. Such a facility could also be used to allow OpenMP, another common scientific concurrency engine, to be developed as well.

Is there any interest in the TF developers community in these initiatives?

Thanks!

pax
Gabe

Martin Wicke

unread,
Oct 2, 2018, 1:03:43 PM10/2/18
to Gabriel Perdue, Aleksandr Zaks, Christina Sorokin, TensorFlow Developers
Thank you for these proposals!

I think there are some misunderstandings that will make your life a little better:

- While graph construction is not thread-safe, execution is. This means: You do not need to create one graph per thread, you can call Session.run on the same graph object from as many threads as you want.

- TensorFlow maintains its own threadpools. You can configure them (to some extent) using the config passed to a Session.run call: https://github.com/tensorflow/tensorflow/blob/r1.11/tensorflow/core/protobuf/config.proto#L290. In particular, you can tell any Session.run call: how many threads to use to execute different ops concurrently (inter_op_parallelism_threads), how many threads to use to execute any given op (intra_op_parallelism_threads), and which threadpool to use (session_inter_op_thread_pool). 

We would be interested in a proposal for how to enable swapping out the thread management and replace it with something else (Currently, there's no straightforward way to use Intel TBB, for instance). We've had other requests for something like this as well.

Martin

--
You received this message because you are subscribed to the Google Groups "TensorFlow Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to developers+...@tensorflow.org.
Visit this group at https://groups.google.com/a/tensorflow.org/group/developers/.
To view this discussion on the web visit https://groups.google.com/a/tensorflow.org/d/msgid/developers/5d95e28b-ece8-47f0-bd21-33f26fe7ea1a%40tensorflow.org.

Asim Shankar

unread,
Oct 2, 2018, 3:57:17 PM10/2/18
to Gabriel Perdue, Ravi Chirravuri, devel...@tensorflow.org
[Oops, hit "Reply" and not "Reply All" to my response yesterday]

On Mon, Oct 1, 2018 at 3:04 PM Asim Shankar <asha...@google.com> wrote:
Hi Gabe,

Starting with a more detailed proposal/RFC seems like a good idea, and we can iterate on that.
That said, however, would like to follow up on a few details.

You mentioned "we need a thread-safe graph": Execution of TensorFlow graphs is thread-safe (you can certainly call session.run() concurrently and the individual op kernels are thread-safe).
Could you elaborate on what changes you envision?

Regarding "flexibility in the concurrency engine": We have started a recent attempt to centralize scheduling (at least for inter-op execution) through the RunHandler(,Pool) abstraction introduced in this commit: https://github.com/tensorflow/tensorflow/commit/f83da5b0aa37ba55c1b2eaa093e6d043b73f5982. We've also added support to the Eigen::ThreadPool used by TF to allow scheduling work on a subset of threads, so we can play with scheduling ideas given multiple active session-run(s) with heterogenous demand. Eigen::ThreadPool already supports stealing work when a thread is idle, so this idea has started to yield some interesting results (improved median and tail latency) on a bunch of models we've tried internally. If you're interested you could give that a shot and see how that compares with your TBB solution. (Note: There are some extra changes needed to leverage the ScheduleWithHint() functionality at HEAD in Eigen, but we can talk about those).

Thoughts?
Thanks,

-- Asim


On Fri, Sep 28, 2018 at 11:26 AM Gabriel Perdue <gnpe...@gmail.com> wrote:

Gabriel Perdue

unread,
Oct 4, 2018, 10:29:24 AM10/4/18
to TensorFlow Developers, gnpe...@gmail.com, c...@google.com
Thank you very much for the responses.

With respect to thread-safe graphs, it sounds like there may have been a misunderstanding on our side. It appears they already do what we want. We're investigating, etc.

As for threading, we can't really use the `Eigen::ThreadPool` since we are heavily designed around the TBB task system model. Why don't I paste some snippets of the modifications we made to get the Session class to work with TBB? These changes work for us, but they're hacky. It would be better if things were arranged such that we could write TBB support in a fashion that could be accepted as a pull request on top of 2.0 when it arrives. (So, we're not asking for TBB support, just a clean way to add it durably ourselves.)

```
void TBBSession::SchedClosure(tbb::task_arena& arena, tbb::task_group& g, std::function<void()> c) {
  arena.execute( [&g, &c] () {g.run( c ); } );
}
```

```
===========
Then in TBBSession::Run

… Before creating the ExecutorBarrier
  // Use a task_arena to avoid having unrelated tasks start
  // running on this thread (which could start deadlocks)
  tbb::task_arena taskArena;
  tbb::task_group taskGroup;
  // we are required to always call wait before destructor
  auto doneWithTaskGroup = [&taskArena, &taskGroup](void *) { taskArena.execute([&taskGroup]() { taskGroup.wait();}); };
  std::unique_ptr<tbb::task_group, decltype(doneWithTaskGroup) > guard(&taskGroup, doneWithTaskGroup);

… Later in TBBSession::Run

  // pass taskArena and taskGroup to SchedClosure
  // consequently, disable TF's own thread logic inside the loop
  Executor::Args::Runner default_runner = [this, &taskArena, &taskGroup](Executor::Args::Closure c) {
    SchedClosure(taskArena, taskGroup, std::move(c));
  };
  for (const auto& item : executors_and_keys->items) {
    args.runner = default_runner;
    item.executor->RunAsync(args, barrier->Get());
  }

  // WaitForNotification will handle calling wait on taskGroup
  guard.release();
  WaitForNotification(taskArena, taskGroup, &run_state, &step_cancellation_manager,
                      run_options.timeout_in_ms() > 0
                          ? run_options.timeout_in_ms()
                          : operation_timeout_in_ms_);


===========
void TBBSession::WaitForNotification(tbb::task_arena& arena, tbb::task_group& taskGroup,
    RunState* run_state, CancellationManager* cm, int64 timeout_in_ms) {
  // Doing the wait in the arena adds this thread to the arena
  // and therefore tasks associated to the group can run on this thread
  arena.execute([&taskGroup]() { taskGroup.wait();} );

  const Status status =
      WaitForNotification(&run_state->executors_done, timeout_in_ms);
  if (!status.ok()) {
    {
      mutex_lock l(run_state->mu_);
      run_state->status.Update(status);
    }
    cm->StartCancel();
    // We must wait for the executors to complete, because they have borrowed
    // references to `cm` and other per-step state. After this notification, it
    // is safe to clean up the step.
    run_state->executors_done.WaitForNotification();
  }
}
```

Thank you very much for your time and attention!

pax
Gabe

Ravi Chirravuri

unread,
Oct 19, 2018, 4:30:45 PM10/19/18
to Gabriel Perdue, TensorFlow Developers
(posting again after joining tensorflow developers)

On Fri, Oct 19, 2018 at 1:28 PM, Ravi Chirravuri <c...@google.com> wrote:
Thank you for the response Gabe. Sorry for the delayed response.

We would very much love for you to submit an external RFC and I'm happy to sponsor this from TF side.

At a high level, we would prefer that you don't sub-class DirectSession but could instead leverage the newly introduced RunHandlerPool and RunHandler interfaces. The abstraction was designed to improve cross-session threading performance, but seems it could also be re-used for choosing between Eigen vs TBB and perhaps other threading libraries.

Reading through your code snippets, it seems like you want:
1. Control the ability to run inter-op closures executed using the default_runner.
2. You don't seem to be making changes to the EigenCPUDevice, so don't care about intra-op parallelism.
3. You want some control before the session-run completes, to handle taskArena or taskGroup cleanup.

Looking at your requirements, I think you could instead leverage the newly introduced RunHandlerPool instead of a new session Class. At a high level -

I would introduce a new implementation of RunHandlerPool that works with TBB instead of Eigen, and returns a TBBRunHandler on RunHandlerPool::Get().
We may want to make the existing RunHandler an interface with a pure virtual method for ScheduleInterOpClosure(). This should get you the required control over running inter-op closures (1).
This interface currently doesn't support intra-op parallelism, so we assume (2) is valid for your case. At least from the code changes you pasted, I don't see changes to eigen_cpu_device().

Each RunHandler object is active only during a session run, so you should get control before the destruction (3).

We can lay out more concrete implementation details once you start the RFC.

Hope this helps,
Ravi.
Reply all
Reply to author
Forward
0 new messages