Interrupt during async processing

135 views
Skip to first unread message

Alex Wenckus

unread,
Nov 20, 2014, 12:00:14 PM11/20/14
to rxj...@googlegroups.com
While using the async library is the expectation that we should immediately pass the result onto another thread after the async is complete? I ask mainly because I was running into issues, after the Async processing is complete, the worker is unsubscribed, the future task is cancelled and thus the thread you are running on is interrupted. Leading to the interesting side-affect of my Async logging (SLF4J) no longer functioning because it uses a blocking queue.

So, instead of this:

Observable.defer(Async.toAsync(getFunc, Schedulers.io())).single();

 I need to do this:

Observable.defer(Async.toAsync(getFunc, Schedulers.io())).observeOn(Schedulers.computation()).single()

Seems to me this could lead to other potentially undesirable side-effects. Anyone have any thoughts?

Alex

Dávid Karnok

unread,
Nov 20, 2014, 12:19:00 PM11/20/14
to rxj...@googlegroups.com
Hi. Interrupting the thread is unavoidable because it is the only way one can cancel a (non RxJava) IO or blocking operation. Under general curcumstances, the underlying ExecutorService handles such interrupts so they don't affect unrelated tasks. However, interrupting an already completing task hurts performance, but we (I) haven't been able to come up with a scheme where regular task-end and cleanup should not trigger an interrupt whereas an external unsubscription should.

I use SLF4J with log4j backing and no issues with the regular log messages, but I'm not familiar with your Async logging. Is it an SLF4J feature or you implemented one?

Alexander Wenckus

unread,
Nov 20, 2014, 12:31:32 PM11/20/14
to Dávid Karnok, rxj...@googlegroups.com
I use Logback for the SLF4J impl, which does have an AsyncAppender. Here is the line where my messages are being swallowed:

The put to the queue is triggering an InterruptedException and the message never gets logged.

Dávid Karnok

unread,
Nov 21, 2014, 3:26:18 AM11/21/14
to rxj...@googlegroups.com
I'd call this strange since the ExecutorService should clear the interrupt flag so subsequent runnables are not affected. How do you invoke that particular log action? Also, I posted a PR which avoids self-interrupting on a completing scheduled action (https://github.com/ReactiveX/RxJava/pull/1898) so you could try with that and see if it works for you.

Alex Wenckus

unread,
Dec 3, 2014, 7:18:06 PM12/3/14
to rxj...@googlegroups.com
It wasn't subsequent runnables which were affected. After the cleanup and unsubscribe were triggered and I was doing some processing in onComplete (including logging) the log messages weren't showing up. In order to trigger that code in logback you need to configure an async appender:

<configuration>
    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
      <file>myapp.log</file>
      <encoder><pattern>%logger{35} - [%F:%L] - %msg%n</pattern></encoder>
    </appender>

    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
      <appender-ref ref="FILE" />
      <!-- add the following line -->
      <includeCallerData>true</includeCallerData>
    </appender>

    <root level="DEBUG"><appender-ref ref="ASYNC" /></root>
 </configuration>

Dávid Karnok

unread,
Dec 4, 2014, 2:23:19 AM12/4/14
to rxj...@googlegroups.com
Could you check if 1.0.2 gives you the same issue. In addition, it would be great if you could create a small runnable test that demonstrates the problem (you could replace the log call with a simple ArrayBlockingQueue.put).

Alexander Wenckus

unread,
Dec 4, 2014, 11:32:38 AM12/4/14
to Dávid Karnok, rxj...@googlegroups.com
Thanks David,

Thanks for your responses, I will put a test together and check 1.0.2.
Reply all
Reply to author
Forward
0 new messages