BoundedMailbox and pushTimeOut behavior

220 views
Skip to first unread message

Shannon Carey

unread,
Apr 23, 2016, 2:11:50 PM4/23/16
to Akka User List
I'm a bit confused by the documentation vs. the source code when it comes to BoundedMailbox and pushTimeOut (aka "mailbox-push-timeout-time").

The documentation (http://doc.akka.io/docs/akka/2.4.4/java/mailboxes.html) says, (implicitly regarding BoundedMailbox), "Other bounded mailbox implementations which will block the sender if the capacity is reached and configured with non-zero mailbox-push-timeout-time."

It also says, (again about BoundedMailbox), "The following mailboxes should only be used with zero mailbox-push-timeout-time." but it does not say why.

I found that in Mailboxes#lookupConfigurator(String), there's a warning when pushTimeOut > zero:
"Configured potentially-blocking mailbox [$id] configured with non-zero pushTimeOut (${m.pushTimeOut}), which can lead to blocking behaviour when sending messages to this mailbox. Avoid this by setting `$id.mailbox-push-timeout-time` to `0`."

This all implies that if pushTimeOut is zero, that there will be no blocking and therefore presumably if the mailbox is full the message will go immediately to dead letters.

The reason I am confused by this is that BoundedMailbox uses BoundedMailbox.MessageQueue which is a LinkedBlockingQueue plus BoundedQueueBasedMessageQueue. If pushTimeOut >= 0, BoundedQueueBasedMessageQueue defines enqueue() as queue.put() which is blocking and doesn't send anything to dead letters (contrary to the documentation). What am I missing?

What I would like is to guarantee that even if the consuming Actor is slow that no messages go to dead letters, and instead the producer is blocked while sending the message. If pushTimeOut=0 with a bounded mailbox is not the right way to do that, what is?

Thanks!
Shannon

Patrik Nordwall

unread,
Apr 29, 2016, 4:24:25 AM4/29/16
to akka...@googlegroups.com
Blocking the sender is a bad for scalability and can even result in deadlocks or starvation if you aren't careful. It's also not location transparent, i.e. it will not work for remote receiver.

If you can't loose messages you should implement a resend-ack protocol for at least once delivery, or make sure that the number of outstanding messages are bounded with some kind of flow control, such as work pulling pattern.

It can also be good to consider Akka Streams, since it backpressure is first class in the streams model.

Regards,
Patrik



--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Akka Tech Lead
Lightbend -  Reactive apps on the JVM
Twitter: @patriknw

Shannon Carey

unread,
Apr 29, 2016, 12:02:22 PM4/29/16
to Akka User List
Thanks Patrik.

So you agree that setting timeout to 0 will block the sender when using a local receiver? Do you think I should contribute an update to the documentation?

The code I'm working on has inherited Akka because it's based on another application, but my code is a Kafka consumer. So it is ok to block the "sender" (the kafka reader) if overall throughput is insufficient, allowing developers/auto-scaling time to improve the throughput until the Kafka checkpoint location falls off the horizon in Kafka. The actor graph is not complicated enough to worry about deadlock yet. Akka Streams definitely makes sense though, or possibly work pulling if it's simple to implement.

Thanks again!

Patrik Nordwall

unread,
Apr 29, 2016, 12:59:58 PM4/29/16
to akka...@googlegroups.com
So it's this code we are talking about:

  def enqueue(receiver: ActorRef, handle: Envelope): Unit =
    if (pushTimeOut.length >= 0) {

      if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))

        receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(

          DeadLetter(handle.message, handle.sender, receiver), handle.sender)

    } else queue put handle

That is offer with 0 timeout, that should not be blocking. Don't ask me when queue.put is used.


Reply all
Reply to author
Forward
0 new messages