I have a class which implements Callable<Integer> which is submitted
to an ExecutorService
two methods of interest are as follows
// Called from producer thread.
public void addMessage(final String message) {
try {
blockingQueue.put(message);
}
catch (InterruptedException ie) {
// Log exception
}
}
// Implement Callable<Integer>
// Consume messages from blocking queue
public Integer call() {
try {
while (true) {
final String message = blockingQueue.take();
}
}
catch (Throwable t) {
// log Throwable
}
finally {
// Log that Thread has been cancelled/stopped
}
return 0;
}
When I submit the Callable to ExecutorService I receive a Future of
concrete class FutureTask.
If I invoke futureTask.cancel() I can see that I jump straight to the
finally clause of my call() method. No exceptions appear to be thrown.
Could someone explain to me
1. Why this happens.
2. If this is a safe mechanism for stopping my consumer thread.
3. If this is safe can I submit a Runnable instead of a callable and
safely cancel the Thread this way?
Many thanks for any help.
> If I invoke futureTask.cancel() I can see that I jump straight to the
> finally clause of my call() method. No exceptions appear to be thrown.
In my test, both the catch and the finally where executed, in that order.
Can you produce a more complete example that exhibits the behavior? An
SSCCE is necessary here, I think.
> 2. If this is a safe mechanism for stopping my consumer thread.
Sure, if you can implement it correctly.
> 3. If this is safe can I submit a Runnable instead of a callable and
> safely cancel the Thread this way?
Same answer as #2 above.
For reference, my quick test was:
package fubar;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureTest
{
static ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue( 2 );
static ExecutorService es = Executors.newSingleThreadExecutor();
public static void main( String[] args ) throws InterruptedException {
new Thread( new Producer() ).start();
Future future = es.submit( new Consumer() );
Thread.sleep( 1000 );
future.cancel( true );
}
static class Producer implements Runnable {
int loopCount;
// Called from producer thread.
public void addMessage( final String message ) {
try {
blockingQueue.put( message );
}catch( InterruptedException ie ) {
// Log exception
}
}
public void run() {
for( ;; ) {
addMessage( "Test " + ++loopCount );
}
}
}
// Implement Callable<Integer>
// Consume messages from blocking queue
static class Consumer implements Callable {
public Integer call() {
try {
while( true ) {
final String message = (String) blockingQueue.take();
}
}catch( Throwable t ) {
System.out.println( "caught: "+ t );
// log Throwable
}finally {
System.out.println( "Stopped." );
Thanks for your reply.
SSCCE and output provided.
package example;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
public class FutureTaskCancelExample {
private static class MyCallable implements Callable<Integer> {
private final BlockingQueue<String> blockingQueue = new
LinkedBlockingQueue<String>();
public void put(final String message) {
try {
this.blockingQueue.put(message);
}
catch (Throwable t) {
System.out.println("Throwable caught in put()" + t.getCause
().getMessage());
}
}
public Integer call() {
try {
while(true) {
final String message = this.blockingQueue.take();
System.out.println(message);
}
}
catch (Throwable t) {
System.out.println("Throwable caught in call() " + t.getCause
().getMessage());
}
finally {
System.out.println("Entered call() finally block.");
}
return new Integer((int)0);
}
}
public static void main(String[] args) {
final ExecutorService executor = Executors.newSingleThreadExecutor
();
final MyCallable myCallable = new MyCallable();
final Future<?> myFuture = executor.submit(myCallable);
myCallable.put("Go Southend United FC!");
myFuture.cancel(true);
executor.shutdown();
}
}
Output:
Go Southend United FC!
Entered call() finally block.
Yup, it does what you say. I don't see why it's skipping the catch
block, since that part seems identical to my code.
I have to get some real work done now, maybe someone can figure this
out. I'll check it out again later this evening.
> public Integer call() {
>
> try {
>
> while(true) {
> final String message = this.blockingQueue.take();
> System.out.println(message);
> }
> }
> catch (Throwable t) {
> System.out.println("Throwable caught in call() " + t.getCause
> ().getMessage());
The problem seems to be here (also in your other exception handlers).
Instead of the above try:
System.out.println("Throwable caught in call() " + t);
> }
> finally {
> System.out.println("Entered call() finally block.");
> }
>
> return new Integer((int)0);
> }
> }
>
> public static void main(String[] args) {
>
> final ExecutorService executor = Executors.newSingleThreadExecutor
> ();
>
> final MyCallable myCallable = new MyCallable();
>
> final Future<?> myFuture = executor.submit(myCallable);
>
> myCallable.put("Go Southend United FC!");
>
> myFuture.cancel(true);
This may cause no execution of your callable at all. Some delay is
needed before cancel.
HTH,
piotr
> The problem seems to be here (also in your other exception handlers).
> Instead of the above try:
>
> System.out.println("Throwable caught in call() " + t);
Ah, of course. There is no cause, so getCause() returns null, and the
getMessage() call throws an NPE inside the exception handler, aborting
the whole handler.
I shoulda thought of that, good one Piotr. I'm a little surprised that
the NPE isn't propagated, I guess I should re-read that section of the JLS.
--
Daniel Pitts' Tech Blog: <http://virtualinfinity.net/wordpress/>
> Actually, its not the JLS, but the Executor Service that specifies what
> to do with that re-thrown error. It does get propagated, but probably
> caught and ignored by the worker thread.
>
Yup, I re-read the JLS, and that's what it says. I don't see any docs
on how to set the error handler for a ExecutorService. I tried setting
an uncaught exception handler and it didn't work. Bummer.
package example;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
public class FutureTaskCancelExample
{
private static class MyCallable implements Callable<Void>
{
private final BlockingQueue<String> blockingQueue =
new LinkedBlockingQueue<String>();
public void put( final String message )
{
try {
blockingQueue.put( message );
}catch( Throwable t ) {
t.printStackTrace();
}
}
public Void call()
{
try {
for( ;; ) {
String message = null;
message = blockingQueue.take();
System.out.println( message );
}
}catch( InterruptedException t ) {
t.printStackTrace();
// preserve for error testing
System.err.println( "Throwable caught in call() " +
t.getCause().getMessage() );
}finally {
System.out.println( "Entered call() finally block." );
}
return null;
}
}
public static void main( String[] args ) throws InterruptedException
{
final ExecutorService executor =
Executors.newSingleThreadExecutor( new ThreadFactory()
{
ThreadFactory defaultFact = Executors.defaultThreadFactory();
public Thread newThread( Runnable r )
{
Thread t = defaultFact.newThread( r );
t.setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler()
{
public void uncaughtException( Thread t, Throwable e )
{
System.err.println( "Uncaught " + e + " in " + t );
}
} );
return t;
}
} );
final MyCallable myCallable = new MyCallable();
final Future<?> myFuture = executor.submit( myCallable );
myCallable.put( "Go Southend United FC!" );
Thread.sleep( 300 ); // ADDED
myFuture.cancel(true);
executor.shutdown();
}
}
Many thanks for your help gentlemen.
It is indeed a shame that the Executor service does not seem to
propagate the exception. If you have critical tasks running on an
ExectorService I wonder how you check if any have failed.....
As an aside I have been looking at Java - Concurrency in practice and
they seem to recommend resetting the interrupted status of the thread
when catching an InterruptedException in the run() method of a
Runnable (and presumably the call() method of a Callable).
Could anyone please explain why this should be necessary and what
could subsequently check the interrupted status of the (possibly
terminated) Thread.
> It is indeed a shame that the Executor service does not seem to
> propagate the exception. If you have critical tasks running on an
> ExectorService I wonder how you check if any have failed.....
Goetz, et al. discuss this further in section 7.3, "Handling Abnormal
Thread Termination." [1]
> As an aside I have been looking at Java - Concurrency in practice and
> they seem to recommend resetting the interrupted status of the thread
> when catching an InterruptedException in the run() method of a
> Runnable (and presumably the call() method of a Callable).
>
> Could anyone please explain why this should be necessary and what
> could subsequently check the interrupted status of the (possibly
> terminated) Thread.
This is discussed further in section 5.4, "Blocking and Interruptible
Methods." [1] Simple examples may be found in this discussion of exec().
[2]
[1]<http://www.javaconcurrencyinpractice.com/>
[2]<http://mindprod.com/jgloss/exec.html>
--
John B. Matthews
trashgod at gmail dot com
<http://sites.google.com/site/drjohnbmatthews>