SocketTimeoutExceptions terminates listening process

31 views
Skip to first unread message

Thomas Kristensen

unread,
May 21, 2014, 3:28:23 AM5/21/14
to clojure-...@googlegroups.com
Hi all,

We're seeing SocketTimeoutExceptions in long running processes, with the consequence that no other messages are delivered to the subscribe function on that machine. We're doing ack-unless-exception, so I just sort of assumed any exceptions like that would result in the message being not ack'ed, and the next one being picked up. Restarting the process correctly picks up new messages from RabbitMQ.

Is there an undocumented best practice for handling SocketTimeoutExceptions? I'd really very much like to continue consuming from the channel :)

Related to this, the troubleshooting link seems to be dead


We are using the following version of langohr

[com.novemberain/langohr "2.10.1"]

Any thoughts?

Thanks!

Thomas

uSwitch is a trading name of uSwitch Ltd. Registered in England and Wales (Company No. 03612689). Registered Address: Notcutt House, 36 Southwark Bridge Road, London, SE1 9EU

This communication and any attachments contains information which is confidential and may be subject to legal privilege. It is for intended recipients only. If you are not the intended recipient you must not copy, distribute, publish, rely on or otherwise use it without our consent. Some of our communications may contain confidential information which it could be a criminal offence for you to disclose or use without authority. If you have received this email in error please notify the sender immediately and delete the email from your computer.

uSwitch Ltd reserves the right to monitor all email communications for compliance with legal, regulatory and professional standards.

Michael Klishin

unread,
May 21, 2014, 3:31:05 AM5/21/14
to Thomas Kristensen, clojure-...@googlegroups.com
On 21 May 2014 at 11:28:24, Thomas Kristensen (thomas.k...@uswitch.com) wrote:
> > We're seeing SocketTimeoutExceptions in long running processes,
> with the consequence that no other messages are delivered to
> the subscribe function on that machine. We're doing ack-unless-exception,
> so I just sort of assumed any exceptions like that would result
> in the message being not ack'ed, and the next one being picked
> up. Restarting the process correctly picks up new messages from
> RabbitMQ.
>
> Is there an undocumented best practice for handling SocketTimeoutExceptions?
> I'd really very much like to continue consuming from the channel
> :)

You did not post a stack trace to suggest anything. Socket exceptions in the RabbitMQ client should trigger a
connection recovery. They also will be raised on the I/O thread so you cannot
catch them from "user" code, e.g. ack-unless-exception.

If SocketTimeoutExceptions is thrown by something else, it should be caught like
any other exception. 
--
MK

Software Engineer, Pivotal/RabbitMQ

Michael Klishin

unread,
May 21, 2014, 3:42:31 AM5/21/14
to Thomas Kristensen, clojure-...@googlegroups.com
On 21 May 2014 at 11:37:03, Thomas Kristensen (thomas.k...@uswitch.com) wrote:
> > DefaultExceptionHandler: Consumer langohr.consumers.proxy$com.rabbitmq.client.DefaultConsumer$ff19274a@31bdf573
> (amq.ctag-GGj0_JfnXHvzpI8YHuHY3w) method handleDelivery
> for channel AMQChannel(amqp://gu...@10.0.0.0:5672/,1(http://gu...@10.0.0.0:5672/,1))
> threw an exception for channel AMQChannel(amqp://gu...@10.0.0.0:5672/,1(http://gu...@10.0.0.0:5672/,1)):
> java.net.SocketTimeoutException: Read timed out
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.read(SocketInputStream.java:152)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> at sun.security.ssl.InputRecord.read(InputRecord.java:480)
> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
> at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
> at org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:206)
> at org.apache.http.impl.conn.LoggingSessionInputBuffer.read(LoggingSessionInputBuffer.java:82)
> at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
> at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:138)
> at java.io.FilterInputStream.read(FilterInputStream.java:133)
> at java.security.DigestInputStream.read(DigestInputStream.java:161)
> at com.amazonaws.services.s3.internal.DigestValidationInputStream.read(DigestValidationInputStream.java:59)
> at java.io.FilterInputStream.read(FilterInputStream.java:133)
> at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:238)
> at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
> at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
>
>
> ... secret corporate stuff
>
> at langohr.consumers$ack_unless_exception$fn__11098.invoke(consumers.clj:124)

Right, so you have an S3 request that times out in a consumer delivery handler.

langohr.consumers/ack-unless-exception does not currently involve any exception handling.
It was contributed by someone else so I'm no longer sure what exactly the intent was.

What behaviour do you want to see? 

Michael Klishin

unread,
May 21, 2014, 4:09:28 AM5/21/14
to Thomas Kristensen, clojure-...@googlegroups.com
 On 21 May 2014 at 11:55:46, Thomas Kristensen (thomas.k...@uswitch.com) wrote:
> > Regardless of my handler throwing an exception or not, I would
> expect the same handler to be invoked on the next message coming
> in. That's what I find confusing. ack-unless-exception does
> nothing to actually handle thrown exceptions, and it probably
> shouldn't. But I assumed a thrown exception wouldn't de-register
> the handler(?)

RabbitMQ Java client dispatches deliveries to a thread pool and handles
all exceptions. They are passed to an exception handler, which is now possible
to override in 3.3 and Langohr needs to provide a way to do that with Clojure
functions.

No exception can possibly cancel the consumer. There are opinionated projects
that reject messages that caused an exception but it's not appropriate in
all cases and Langohr does not do that.

Thomas Kristensen

unread,
May 21, 2014, 4:49:19 AM5/21/14
to Michael Klishin, clojure-...@googlegroups.com
Thanks for the clarification - I decided to spike the problem out to get a better understanding of what is going on. I think my conclusion is that my understanding of what ack-unless-exception does is flawed. My understanding was that, if a handler threw and exception, the message wouldn't be ack'ed, but be put back on the exchange. That does not seem to happen, and when qos is 1, we seem to get stuck, as we are holding one outstanding message.

My spike seems to confirm this. It's available at:


Am I correct in assuming I should explicitly indicate that a message failed?

Thomas

Michael Klishin

unread,
May 21, 2014, 4:54:58 AM5/21/14
to Thomas Kristensen, clojure-...@googlegroups.com
On 21 May 2014 at 12:49:19, Thomas Kristensen (thomas.k...@uswitch.com) wrote:
> > My understanding was that, if a handler threw and exception,
> the message wouldn't be ack'ed, but be put back on the exchange.
> That does not seem to happen, and when qos is 1, we seem to get stuck,
> as we are holding one outstanding message.

See my earlier email. This is not an uncommon expectation.
You can do it by handling all exceptions you want to retry on
and re-queueing the message (with basic.nack). But that's not always a good idea so
Langohr does not do this in ack-unless-exception.

Thomas Kristensen

unread,
May 21, 2014, 8:58:38 AM5/21/14
to Michael Klishin, clojure-...@googlegroups.com
Hi Michael,

Once again, thanks for all the explanation. I've experimented a bit this morning, and I've isolated what I find weird.

If you run 


you'll notice that the message gets rejected as we'd expect when the exception is NOT re-thrown, but the reject seems to be ignored in the case where we re-throw. You could argue that you should never have a handler throw exceptions to the langohr-lib, but seeing as this can happen, and since it handles exceptions just fine when :auto-ack is true (not demonstrated in the experiment, but tested and verified), it seems that a consistent behaviour on a reject should be to continue consuming.

Is there something I'm missing?

Once again, thanks for all the help.

Thomas

Michael Klishin

unread,
May 21, 2014, 9:03:45 AM5/21/14
to Thomas Kristensen, clojure-...@googlegroups.com


On 21 May 2014 at 16:58:38, Thomas Kristensen (thomas.k...@uswitch.com) wrote:
> > you'll notice that the message gets rejected as we'd expect
> when the exception is NOT re-thrown, but the reject seems to be
> ignored in the case where we re-throw. You could argue that you
> should never have a handler throw exceptions to the langohr-lib,
> but seeing as this can happen, and since it handles exceptions
> just fine when :auto-ack is true (not demonstrated in the experiment,
> but tested and verified), it seems that a consistent behaviour
> on a reject should be to continue consuming.
>
> Is there something I'm missing?

What do you mean by "ignored"? Is the channel still open? Do you see basic.reject
on the wire? (e.g. using Tracer: http://www.rabbitmq.com/java-tools.html)

Throwing exceptions in delivery handlers is fine, it should not exhaust
consumer thread pool or anything like that.

Thomas Kristensen

unread,
May 21, 2014, 11:15:49 AM5/21/14
to Michael Klishin, clojure-...@googlegroups.com
You are right, the channel has been closed by me throwing on the exception. I attached a debugger and traced it to line 106 of DefaultExceptionHandler which closes the channel in handleChannelKiller. All of this does makes sense as a default behaviour, but it doesn't seem to be documented in langohr, so it took quite a while to figure out.

Once again, thanks for all your help. I think we've established that:

- ack-unless-exception is confusing at best - it should not throw the exception on but handle it to be valuable. Otherwise users of langohr should just call .basicAck and .basicReject themselves. I'd vote for just removing it from langohr.
- The behaviour of langohr in the case where a handler throws an exception is not documented in the doc-string of langohr.consumers/subscribe

Cheers,

Thomas

Michael Klishin

unread,
May 21, 2014, 11:20:13 AM5/21/14
to Thomas Kristensen, clojure-...@googlegroups.com


On 21 May 2014 at 19:15:50, Thomas Kristensen (thomas.k...@uswitch.com) wrote:
> > You are right, the channel has been closed by me throwing on the
> exception. I attached a debugger and traced it to line 106 of DefaultExceptionHandler
> which closes the channel in handleChannelKiller. All of this
> does makes sense as a default behaviour, but it doesn't seem to
> be documented in langohr, so it took quite a while to figure out.

Feel free to contribute doc improvements.

> - ack-unless-exception is confusing at best - it should not throw
> the exception on but handle it to be valuable. Otherwise users
> of langohr should just call .basicAck and .basicReject themselves.
> I'd vote for just removing it from langohr.
> - The behaviour of langohr in the case where a handler throws an
> exception is not documented in the doc-string of langohr.consumers/subscribe

Screw doc strings. They cannot describe much because they can't be too
long. http://clojurerabbitmq.info should cover this, in both Working with Queues
and Error Handling and Recovery (the Queues guide can link to the Error Handling one, for example ).
Reply all
Reply to author
Forward
0 new messages