AsynchHttpClient-Reaper is getting interrupted during the onThrowable callback

111 views
Skip to first unread message

Ken Michie

unread,
Jan 23, 2013, 10:46:32 AM1/23/13
to asyncht...@googlegroups.com
My application is using the Async Client to try and POST messages to provided URLs.  Occasionally the URLs are bad and hence timeout (set to 5 seconds).  Upon encountering this error, I will attempt the POST again in two seconds by dispatching an object into a TimerTask which when fired, will attempt to post a new request destined to the same place.  I try this 3 times and on the 3rd time if it doesn't go through, I essentially abandon the request.  The problem arrises when I abandon it because this operation includes 'acking' a Storm based tuple so that the message doesn't timeout in storm and get redelivered.  If I don't explicitly catch this InterruptedException off the Reaper thread during the acking operation in a try, catch, try again mechanism, the Tuple times out which is very bad under large volumes.  What is strange is that the time from when I get the TimeoutException to the point where I ack the tuple is hardly 1 ms so it's not like the callback operation is taking a long time.  I want to make sure I am not missing something here, so any light you shed would be awesome!

Jar is version 1.7.5

Here is my hacky try try catch:
try {
    // Update pertinent state data
} finally {
                try {
                    synchronized (outputCollector) {
                        outputCollector.ack(request.tuple)
                    }
                } catch (InterruptedException ie) {
                    // try again - async failures seem to cause this behavior
                    log.warn("Interrupted, trying to ack again reqId: ${request.reqId}:", ie)
                    synchronized (outputCollector) {
                        outputCollector.ack(request.tuple)
                    }
                }
            }

Config:
        def config = new AsyncHttpClientConfig.Builder()
                .setMaximumConnectionsTotal(500)
                .setConnectionTimeoutInMs(5000)
                .setRequestTimeoutInMs(5000)
                .setMaxRequestRetry(0)
                .setIdleConnectionInPoolTimeoutInMs(20 * 1000)
                .setIdleConnectionTimeoutInMs(20 * 1000)
                .addRequestFilter(new ThrottleRequestFilter(500))
                .setSSLContext(SslHttpClientHelper.getSSLContext())
                .setHostnameVerifier(SslHttpClientHelper.getHostNameVerifier())
                .build()

Exception encountered:
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312)
        at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:294)
        at backtype.storm.daemon.worker$mk_transfer_fn$fn__3535.invoke(worker.clj:87)
        at backtype.storm.daemon.task$send_unanchored.invoke(task.clj:157)
        at backtype.storm.daemon.task$fn$reify__3463.ack(task.clj:469)
        at backtype.storm.task.OutputCollector.ack(OutputCollector.java:196)
        at backtype.storm.task.IOutputCollector$ack.call(Unknown Source)
        at com.fullcontact.webhook.storm.bolt.BaseWebHookDeliverBolt.ackTuple(BaseWebHookDeliverBolt.groovy:139)
        at com.fullcontact.webhook.storm.receiving.TupleAckable$ackTuple.callCurrent(Unknown Source)
        at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCallCurrent(CallSiteArray.java:46)
        at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:133)
        at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:141)
        at com.fullcontact.webhook.storm.bolt.BaseWebHookDeliverBolt.handleFailureForRequest(BaseWebHookDeliverBolt.groovy:168)
        at com.fullcontact.webhook.storm.receiving.TupleAckable$handleFailureForRequest.call(Unknown Source)
        at com.fullcontact.webhook.storm.receiving.StatusCompletionHandler.handleRequestResponseStatus(StatusCompletionHandler.groovy:94)
        at com.fullcontact.webhook.storm.receiving.StatusCompletionHandler.onThrowable(StatusCompletionHandler.groovy:84)
        at com.ning.http.client.extra.ThrottleRequestFilter$AsyncHandlerWrapper.onThrowable(ThrottleRequestFilter.java:88)
        at com.ning.http.client.providers.netty.NettyResponseFuture.abort(NettyResponseFuture.java:300)
        at com.ning.http.client.providers.netty.NettyAsyncHttpProvider.abort(NettyAsyncHttpProvider.java:1326)
        at com.ning.http.client.providers.netty.NettyAsyncHttpProvider.access$700(NettyAsyncHttpProvider.java:137)
        at com.ning.http.client.providers.netty.NettyAsyncHttpProvider$ReaperFuture.run(NettyAsyncHttpProvider.java:1809)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

Thanks,
Ken

Ken Michie

unread,
Feb 26, 2013, 5:31:30 PM2/26/13
to asyncht...@googlegroups.com
Solution found!  In the event that someone else stumbles across this same symptom, the problem is that the Thread invoking the onThrowable callback has its Thread.interrupted flag set to 'true'.  Therefore any blocking operation (Thread.sleep, Thread.wait, etc) will immediately throw an InterruptedException.  In my case I was noticing bizarre behavior when logging a message that had a Gelf appender associated with the logger, ultimately destined for our Graylog server.  When the UDP channel was used to send the message Graylog, the interrupted flag caused the channel to immediately close and throw an exception, leaving the channel forever-after closed.

The solution for me was to take the Throwable, and inside a try/catch, throw it.  Upon catching the IOException, the interrupted flag was cleared and I could proceed with my normal behavior:
    public void onThrowable(Throwable t) {
        try {
            throw t
        } catch (IOException ioEx) {
            log.warn("I can now log to Graylog via GELF!")
            // Process the response
            processResponse(t)
        } catch (Exception e) {
            log.error("Non IOException for reqId: ${request.reqId}", e)
        } finally {
            if (log.isTraceEnabled()) {
                log.trace("onThrowable: Processed onThrowable $request, throwable: $t")
            }
            handleRequestResponseStatus()
        }

This behavior would be nice to have some more documentation around it, or even fix it in a bug.

Ken Michie

unread,
Feb 26, 2013, 5:46:30 PM2/26/13
to asyncht...@googlegroups.com
Correction, I mistakenly made the assumption that TimeoutException inherited from IOException when I posted the above - tested wrong error case :/.  Anyway, I ended up resorting to using 'Thread.interrupted()' at the beginning of the call to clear the flag which still works to clear the interrupted flag.

Reply all
Reply to author
Forward
0 new messages