Re: batch_join queue leak?

1,951 views
Skip to first unread message

Eugene Brevdo

unread,
Jul 18, 2016, 8:01:40 PM7/18/16
to Rohit Girdhar, Discuss

Please paste the subset of the code including the tf.cond and the batch joins.


On Jul 18, 2016 4:36 PM, "Rohit Girdhar" <rohit...@gmail.com> wrote:
Hi
I'm training a deep network with two data input pipelines, one for training and one for validation. They use `shuffle_batch_join` and `batch_join` respectively for parallel data reading. The data stream that is used in the network is decided using a `tf.cond` operation on top of these two pipelines, which is controlled by a `is_training` placeholder that is set to true for a training iteration and false when doing validation. I have 4 threads for reading training data and 1 thread for validation.

However, I just added the queue summaries to tensorboard, and observed that validation queue's summary (showing fraction of the queue that is full) gets non-zero at one point during the training, and then drops back to 0. This seems very weird because validation runs only after 1K iterations, and those data points should only be removed at that point. Does anyone have a similar experience or can shed some light into what might be happening?

Thanks
Rohit

--
You received this message because you are subscribed to the Google Groups "Discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to discuss+u...@tensorflow.org.
To post to this group, send email to dis...@tensorflow.org.
To view this discussion on the web visit https://groups.google.com/a/tensorflow.org/d/msgid/discuss/a84cb43a-5fc2-4aec-8fce-7c8b88802b43%40tensorflow.org.

Rohit Girdhar

unread,
Jul 18, 2016, 8:17:02 PM7/18/16
to Eugene Brevdo, Discuss
Here's the code snippet (not a MWE though)

def _input_pipeline(self, filenames, batch_size, num_epochs, mode='train', modality='spatial',
            num_read_threads=4, num_samples=25):
        if mode == 'train':
            shuffle = True
        else:
            shuffle = False
        filename_queue = tf.train.string_input_producer(
                filenames, num_epochs=num_epochs, shuffle=shuffle)
        example_list = [self._read_from_disk(filename_queue, modality, num_samples, mode) for _ in range(num_read_threads)]
        if mode == 'train':
            min_after_dequeue = 100
            capacity = min_after_dequeue + 3 * batch_size
            example_batch, label_batch = tf.train.shuffle_batch_join(
                        example_list, batch_size=batch_size, capacity=capacity,
                        min_after_dequeue=min_after_dequeue)
        else:
            example_batch, label_batch = tf.train.batch_join(example_list, batch_size)
        return example_batch, label_batch


    def gen_net(...):

        with tf.variable_scope('input_pipeline_train') as scope:
            train_batch, train_labels_batch = self._input_pipeline(train_filenames, BATCH_SIZE, num_epochs,
                    mode='train', modality=modality, num_read_threads=num_read_threads, num_samples=num_samples)
        with tf.variable_scope('input_pipeline_val') as scope:
            val_batch, val_labels_batch = self._input_pipeline(val_filenames, BATCH_SIZE, num_epochs,
                    mode='test', modality=modality, num_read_threads=1, num_samples=num_samples)

        frames_batch, labels_batch, vlad_dropout_keep_prob_selected = tf.cond(
                is_training,
                lambda: (train_batch, train_labels_batch, tf.constant(vlad_dropout_keep_prob)),
                lambda: (val_batch, val_labels_batch, tf.constant(1.0)),
        )
... then the network that takes as input frames_batch and labels_batch

Rohit Girdhar

unread,
Jul 18, 2016, 8:30:31 PM7/18/16
to Eugene Brevdo, Discuss
Sorry, the indentation got messed up while copy/paste-ing. Fixed here.
Both `_input_pipeline` and `gen_net` are functions of a class, and `gen_net` is called from outside to generate the network

Eugene Brevdo

unread,
Jul 18, 2016, 10:47:53 PM7/18/16
to Rohit Girdhar, Discuss

Look at the documentation of tf.cond.  both enqueue operations are being executed on every iteration.

Rohit Girdhar

unread,
Jul 18, 2016, 11:24:33 PM7/18/16
to Eugene Brevdo, Discuss
Okay, I see. I assume you mean both dequeue operation is being executed every iteration (since the enqueue is being done by separate threads I guess).
Also, if I understand correctly, the code is still "correct", right? i.e., data is only being read from the training pipeline when is_training=True (Hopefully no validation data is getting mixed with the training data)

Thank you!
Rohit

Eugene Brevdo

unread,
Jul 18, 2016, 11:39:59 PM7/18/16
to Rohit Girdhar, Discuss
Sorry - you're right.

The data is being read from *both* training and validation pipelines.  *However* it is being *dropped* because you've asked cond to return you the output of only one of the branches.  However, if you have less data on the validation pipeline, your training will end early because the dequeue from the validation pipeline will run out of data.  Also, since you have less threads buffering input data on the validation pipeline, your cond will also be slower because it's waiting on two queues, and one of them takes longer to fill than the other.

Long story short: don't build your models this way.  Build two separate graphs (one for training & one for validation); even if they both do essentially the same thing (though you may skip the gradients/optimization subgraph for the validation); and use two separate sessions; and run the training & validation in the separate sessions.  Best is to run them in separate processes.

Rohit Girdhar

unread,
Jul 18, 2016, 11:57:06 PM7/18/16
to Eugene Brevdo, Discuss
Thank you! This makes a lot of sense now. The training for me had been pretty slow and adding more read threads was not doing much difference. I guess this explains that as well.

Thanks for the suggestion! Is there an example for the same you could point me to (especially with separate processes)? Also, will having two graphs take up significantly more memory? (If I understand correctly, though the parameters can be shared using get_variable, the validation graph will still have to allocate memory for the forward pass activations). Ideally I would want to contain the training/validation to a single GPU, rather than use another one just for validation. I'd really appreciate any pointers/feedback/suggestions on the right way of doing this.

Thanks!

Martin Wicke

unread,
Jul 19, 2016, 12:04:06 AM7/19/16
to Rohit Girdhar, Eugene Brevdo, Discuss
You can look at the Estimator class in contrib/learn which contains plumbing to do just that, create graphs for training and evaluation and run them. It may be more than you want but would certainly work as an example.

Eugene Brevdo

unread,
Jul 19, 2016, 12:05:36 AM7/19/16
to Rohit Girdhar, Discuss

osen...@gmail.com

unread,
Jul 19, 2016, 3:35:04 AM7/19/16
to Discuss, rohit...@gmail.com
Hello there, coming from this github 'issue'/discussion: https://github.com/tensorflow/tensorflow/issues/2514#issuecomment-233502521

In it I describe the idea of using make_template to automatically share the trainable model between different data sources/queues
(e.g. training and validation data). Apart from a few lines of code to iterate over queues (or, the respective dequeu op) no additional
code is necessary to get a training process with one session that also is able to evaluate the trained model at any point on the
validation set using the same session. Could you (@ebrevdo, @martinwicke) comment, why this approach would be not favorable
(in the sense of 'is bad because (a) ... not considered and will lead to problems (e.g. when using multiple gpus, (b) .... not optimal, (c) even more reasons')?

That would be great!

Thanks,
Christian

Martin Wicke

unread,
Jul 19, 2016, 10:19:25 AM7/19/16
to osen...@gmail.com, Discuss, rohit...@gmail.com
This is great if your code for training and eval is exactly the same. That is generally not the case, consider dropout or batch normalization for instance.

Christian Osendorfer

unread,
Jul 19, 2016, 11:29:18 AM7/19/16
to Martin Wicke, Discuss, rohit...@gmail.com
Thank you Martin for your answer! For dropout, I'm feeding keep_prob=1 for the validation/testing sets, for batch norm, i'm using a tf.cond switch. tf.make_template accepts parameters and only makes sure that code/variable reuse works automatically. But maybe I'm understanding/using stuff incorreclty. Could you elaborate?

Martin Wicke

unread,
Jul 19, 2016, 11:52:31 AM7/19/16
to Christian Osendorfer, Discuss, rohit...@gmail.com
Nope, you're doing everything correctly. Note that it is extraordinarily easy to shoot yourself in the foot using tf.cond, so we don't recommend this method for everyone. Also, I believe that using fed keep_prob will still create ops in the keep_prob=1 case,  and the performance hit may be unacceptable to some. Finally, we rather often don't have training and evaluation on the same computer, and especially not in the same session.

Either way, what you're doing is perfectly fine, there's nothing wrong with it if it works for you (and it sure is elegant).

osen...@gmail.com

unread,
Jul 19, 2016, 2:03:09 PM7/19/16
to Discuss, osen...@gmail.com, rohit...@gmail.com
Thanks for the detailed explanation Martin! I see how the multiple queueing approach is definitly a bottleneck when training and evaluation should happen on different machines. Didn't consider this setup yet.

Mark Woodward

unread,
Jul 27, 2016, 8:24:26 PM7/27/16
to osen...@gmail.com, Discuss, rohit...@gmail.com
@rohit.iiith, if you still want to do things your original way, github user josh11b just brought tf.QueueBase.from_list() to my attention. I tested it and it works great; no extra dequeue's like you experienced with tf.cond. It returns a QueueBase object that you can then dequeue from, which will pull from the appropriate queue depending on a tensor fed to tf.QueueBase.from_list().



Rohit Girdhar

unread,
Jul 28, 2016, 1:23:42 AM7/28/16
to Mark Woodward, osen...@gmail.com, Discuss
Thanks Mark, this is great! Definitely a good solution for people who don't want to allocate additional resources for validation while training

Rob.Rowe.NM

unread,
Jul 28, 2016, 8:26:01 AM7/28/16
to Martin Wicke, Christian Osendorfer, Discuss, rohit...@gmail.com
Martin —

You said, "Note that it is extraordinarily easy to shoot yourself in the foot using tf.cond, so we don't recommend this method for everyone.” Could you say a bit more about the type of errors/misuse that may be associated with tf.cond?

Thanks
Rob

Martin Wicke

unread,
Jul 28, 2016, 11:18:10 AM7/28/16
to Rob.Rowe.NM, Christian Osendorfer, Discuss, rohit...@gmail.com
Cond takes two lambdas. Only the nodes that are created inside those lambdas are conditional. Nodes that exist prior to the cond call are not affected.

This doesn't sound complicated but if it does go wrong it's quite hard to diagnose.

Eugene Brevdo

unread,
Jul 28, 2016, 11:22:00 AM7/28/16
to Martin Wicke, Rob.Rowe.NM, Christian Osendorfer, Discuss, Rohit Girdhar
To add to what Martin said:

- ops used by *either* lambda are *always* executed, even though any ops created *inside* the lambdas are only conditionally executed.

Rob.Rowe.NM

unread,
Jul 28, 2016, 11:30:26 AM7/28/16
to Eugene Brevdo, Martin Wicke, Christian Osendorfer, Discuss, Rohit Girdhar
Very helpful. Thank you Martin and Eugene.

Rohit Girdhar

unread,
Feb 13, 2017, 5:00:48 PM2/13/17
to Discuss
Sorry for bumping up this old thread, but I seem to have a solution that works on de-queuing from different queues (one for train and other for validation), while using the same graph. It doesn't dequeue from both the queues like my original solution. The trick seems to be to put the dequeue operation in tf.cond.

Hoping to get some comments on if this makes sense, or if I am be missing something obvious.

def queue_construct(val, batch_size):
    q = tf.FIFOQueue(200*batch_size, dtypes=[tf.float32, tf.float32], shapes=[[batch_size,224,224,3], [batch_size, 80]])
    images = np.zeros((200, batch_size, 224, 224, 3))
    labels = np.zeros((200, batch_size, 80))
    for i in range(200):
        images[i,...]=i*val;
    init_q = q.enqueue_many((images,labels))
    return init_q, q
def wrapper_simple_train(tq, a, b):
    tf.logging.info('TRAIN')
    with tf.control_dependencies([tf.assign(a,10)]):
        x, y = tq.dequeue()
        return tf.tuple([x,y,a, b])
def wrapper_simple_val(vq, a, b):
    tf.logging.info('VAL')
    with tf.control_dependencies([tf.assign(b,20)]):
        x, y = vq.dequeue()
        return tf.tuple([x,y,a,b])

tinit, train_batch_queue = make_dummy_queue(val=1, batch_size=cfg.TRAIN.BATCH_SIZE)
vinit, val_batch_queue = make_dummy_queue(val=2, batch_size=cfg.TRAIN.BATCH_SIZE)
images, labels, a, b = tf.cond(is_train, \
                lambda : wrapper_simple_train(train_batch_queue, aa, bb),\
                lambda : wrapper_simple_val(val_batch_queue, aa, bb))

rylansc...@gmail.com

unread,
Jun 29, 2017, 5:47:44 PM6/29/17
to Discuss
Eugene, can you explain why building models in the manner you suggest (i.e. two separate graphs, run training and validation separately) is considered good practice? It seems like a lot of duplicate work.

Eugene Brevdo

unread,
Jun 30, 2017, 1:42:59 AM6/30/17
to rylansc...@gmail.com, Discuss
1. It's generally hard to have a single input pipeline and control what data goes to the training model vs. eval/inference model (see thread above).
2. The graphs that perform training vs. eval and inference tend to be different (especially training vs. inference)
3. There's a lot of pain regarding creating an eval/inference graph that has to reuse variables from the training graph (a lot of "with tf.variable_scope(... reuse=True):" pragmas).

I'm not sure I understand "duplicate work".  It's probably less work than trying to do everything in one graph.

Do you mean creating two different graph objects?  That's not computationally hard compared to the training/eval loops.

Do you mean having to duplicate python code?  That's not hard either; just wrap the core computation in a function and call it once for training (then add an optimizer); and later a second time for eval or inference.

To unsubscribe from this group and stop receiving emails from it, send an email to discuss+unsubscribe@tensorflow.org.

To post to this group, send email to dis...@tensorflow.org.

rylansc...@gmail.com

unread,
Jun 30, 2017, 1:39:56 PM6/30/17
to Discuss, rylansc...@gmail.com
Ok, that makes a lot more sense! I went through the Inception/Imagenet example, and now I have a couple more questions:

  1. My understanding was that in order to assess a model's performance while training (and prevent overfitting), we want to occasionally evaluate the model against a validation dataset. That means alternating between training and validation. Is my understanding correct?
  2. In the Inception repository, I don't see any code that switches between training and validation. What is the proper way to handle switching? My guess is that training and validation are run in separate processes, and line 171 in inception_eval.py puts the validation process to sleep to allow for more training to complete and a new checkpoint to be written to disk, but I'm not sure.
  3. The Inception example uses tf.summary.FileWriter and tf.train.Saver to write summaries and model variables to disk. Would you recommend this, or is there a better approach? I was previously using a Supervisor, but I recently learned that the Supervisor is now deprecated.
Thanks once again!

bastiaan...@gmail.com

unread,
Oct 7, 2017, 12:46:06 PM10/7/17
to Discuss, rylansc...@gmail.com
Good points. However, You still need to export all the parameters from your training graph and then import them into your evaluation graph at each train/eval cycle. Taking them out of your GPU, save on disk and then back into (the same) GPU is what I would think of in the "a lot of duplicate work" statement.

Eugene Brevdo

unread,
Oct 7, 2017, 6:34:48 PM10/7/17
to bastiaan...@gmail.com, Discuss, Rylan Schaeffer
If you have a big model, then it probably takes a long time to train and you want to have checkpoints on a regular basis anyway.  And the store/load for eval is going to be very quick compared to the amount of time it takes to train an epoch.  Eval can be done on a separate machine as well; once the checkpoint is stored on the network.

If you have a small model, and it doesn't take a long time to train an epoch, then saving a checkpoint and loading for eval shouldn't take very long at all.

Practically speaking, the benefits usually outweigh the downsides - esp since you tend to regularly checkpoint anyway.

To unsubscribe from this group and stop receiving emails from it, send an email to discuss+unsubscribe@tensorflow.org.

To post to this group, send email to dis...@tensorflow.org.
Reply all
Reply to author
Forward
0 new messages