Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

FutureTask.cancel() - can anyone explain the mechanism?

1,888 views
Skip to first unread message

The Dude

unread,
Sep 17, 2009, 10:09:02 AM9/17/09
to
Hi,

I have a class which implements Callable<Integer> which is submitted
to an ExecutorService

two methods of interest are as follows

// Called from producer thread.
public void addMessage(final String message) {

try {
blockingQueue.put(message);
}
catch (InterruptedException ie) {
// Log exception
}
}

// Implement Callable<Integer>
// Consume messages from blocking queue
public Integer call() {

try {
while (true) {
final String message = blockingQueue.take();
}
}
catch (Throwable t) {
// log Throwable
}
finally {
// Log that Thread has been cancelled/stopped
}

return 0;
}

When I submit the Callable to ExecutorService I receive a Future of
concrete class FutureTask.

If I invoke futureTask.cancel() I can see that I jump straight to the
finally clause of my call() method. No exceptions appear to be thrown.

Could someone explain to me

1. Why this happens.
2. If this is a safe mechanism for stopping my consumer thread.
3. If this is safe can I submit a Runnable instead of a callable and
safely cancel the Thread this way?

Many thanks for any help.

markspace

unread,
Sep 17, 2009, 10:32:07 AM9/17/09
to
The Dude wrote:

> If I invoke futureTask.cancel() I can see that I jump straight to the
> finally clause of my call() method. No exceptions appear to be thrown.

In my test, both the catch and the finally where executed, in that order.

Can you produce a more complete example that exhibits the behavior? An
SSCCE is necessary here, I think.


> 2. If this is a safe mechanism for stopping my consumer thread.

Sure, if you can implement it correctly.

> 3. If this is safe can I submit a Runnable instead of a callable and
> safely cancel the Thread this way?

Same answer as #2 above.


For reference, my quick test was:


package fubar;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureTest
{

static ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue( 2 );
static ExecutorService es = Executors.newSingleThreadExecutor();

public static void main( String[] args ) throws InterruptedException {
new Thread( new Producer() ).start();
Future future = es.submit( new Consumer() );
Thread.sleep( 1000 );
future.cancel( true );
}

static class Producer implements Runnable {
int loopCount;


// Called from producer thread.
public void addMessage( final String message ) {
try {
blockingQueue.put( message );
}catch( InterruptedException ie ) {
// Log exception
}
}

public void run() {
for( ;; ) {
addMessage( "Test " + ++loopCount );
}
}
}

// Implement Callable<Integer>
// Consume messages from blocking queue

static class Consumer implements Callable {
public Integer call() {
try {
while( true ) {
final String message = (String) blockingQueue.take();
}
}catch( Throwable t ) {
System.out.println( "caught: "+ t );
// log Throwable
}finally {
System.out.println( "Stopped." );

The Dude

unread,
Sep 17, 2009, 11:34:58 AM9/17/09
to
> }- Hide quoted text -
>
> - Show quoted text -

Thanks for your reply.

SSCCE and output provided.

package example;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class FutureTaskCancelExample {

private static class MyCallable implements Callable<Integer> {

private final BlockingQueue<String> blockingQueue = new
LinkedBlockingQueue<String>();

public void put(final String message) {

try {
this.blockingQueue.put(message);
}
catch (Throwable t) {
System.out.println("Throwable caught in put()" + t.getCause
().getMessage());
}
}

public Integer call() {

try {

while(true) {
final String message = this.blockingQueue.take();
System.out.println(message);
}
}
catch (Throwable t) {
System.out.println("Throwable caught in call() " + t.getCause
().getMessage());
}
finally {
System.out.println("Entered call() finally block.");
}

return new Integer((int)0);
}
}

public static void main(String[] args) {

final ExecutorService executor = Executors.newSingleThreadExecutor
();

final MyCallable myCallable = new MyCallable();

final Future<?> myFuture = executor.submit(myCallable);

myCallable.put("Go Southend United FC!");

myFuture.cancel(true);

executor.shutdown();
}
}

Output:

Go Southend United FC!
Entered call() finally block.

markspace

unread,
Sep 17, 2009, 12:36:15 PM9/17/09
to
The Dude wrote:
>
> Thanks for your reply.
>
> SSCCE and output provided.


Yup, it does what you say. I don't see why it's skipping the catch
block, since that part seems identical to my code.

I have to get some real work done now, maybe someone can figure this
out. I'll check it out again later this evening.

Piotr Kobzda

unread,
Sep 17, 2009, 1:48:55 PM9/17/09
to
The Dude wrote:

> public Integer call() {
>
> try {
>
> while(true) {
> final String message = this.blockingQueue.take();
> System.out.println(message);
> }
> }
> catch (Throwable t) {
> System.out.println("Throwable caught in call() " + t.getCause
> ().getMessage());


The problem seems to be here (also in your other exception handlers).
Instead of the above try:

System.out.println("Throwable caught in call() " + t);

> }
> finally {
> System.out.println("Entered call() finally block.");
> }
>
> return new Integer((int)0);
> }
> }
>
> public static void main(String[] args) {
>
> final ExecutorService executor = Executors.newSingleThreadExecutor
> ();
>
> final MyCallable myCallable = new MyCallable();
>
> final Future<?> myFuture = executor.submit(myCallable);
>
> myCallable.put("Go Southend United FC!");
>
> myFuture.cancel(true);

This may cause no execution of your callable at all. Some delay is
needed before cancel.


HTH,
piotr

markspace

unread,
Sep 17, 2009, 3:08:20 PM9/17/09
to
Piotr Kobzda wrote:

> The problem seems to be here (also in your other exception handlers).
> Instead of the above try:
>
> System.out.println("Throwable caught in call() " + t);


Ah, of course. There is no cause, so getCause() returns null, and the
getMessage() call throws an NPE inside the exception handler, aborting
the whole handler.

I shoulda thought of that, good one Piotr. I'm a little surprised that
the NPE isn't propagated, I guess I should re-read that section of the JLS.

Daniel Pitts

unread,
Sep 17, 2009, 4:28:15 PM9/17/09
to
Actually, its not the JLS, but the Executor Service that specifies what
to do with that re-thrown error. It does get propagated, but probably
caught and ignored by the worker thread.

--
Daniel Pitts' Tech Blog: <http://virtualinfinity.net/wordpress/>

markspace

unread,
Sep 17, 2009, 5:36:01 PM9/17/09
to
Daniel Pitts wrote:

> Actually, its not the JLS, but the Executor Service that specifies what
> to do with that re-thrown error. It does get propagated, but probably
> caught and ignored by the worker thread.
>

Yup, I re-read the JLS, and that's what it says. I don't see any docs
on how to set the error handler for a ExecutorService. I tried setting
an uncaught exception handler and it didn't work. Bummer.

package example;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.ThreadFactory;

public class FutureTaskCancelExample
{

private static class MyCallable implements Callable<Void>
{

private final BlockingQueue<String> blockingQueue =
new LinkedBlockingQueue<String>();

public void put( final String message )
{

try {
blockingQueue.put( message );
}catch( Throwable t ) {
t.printStackTrace();
}
}

public Void call()
{

try {

for( ;; ) {
String message = null;
message = blockingQueue.take();
System.out.println( message );
}
}catch( InterruptedException t ) {
t.printStackTrace();
// preserve for error testing
System.err.println( "Throwable caught in call() " +
t.getCause().getMessage() );


}finally {
System.out.println( "Entered call() finally block." );
}

return null;
}
}

public static void main( String[] args ) throws InterruptedException
{

final ExecutorService executor =
Executors.newSingleThreadExecutor( new ThreadFactory()
{

ThreadFactory defaultFact = Executors.defaultThreadFactory();

public Thread newThread( Runnable r )
{
Thread t = defaultFact.newThread( r );
t.setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler()
{
public void uncaughtException( Thread t, Throwable e )
{
System.err.println( "Uncaught " + e + " in " + t );
}
} );
return t;
}
} );

final MyCallable myCallable = new MyCallable();

final Future<?> myFuture = executor.submit( myCallable );

myCallable.put( "Go Southend United FC!" );

Thread.sleep( 300 ); // ADDED

myFuture.cancel(true);

executor.shutdown();

}
}

The Dude

unread,
Sep 18, 2009, 4:03:38 AM9/18/09
to
>
> Actually, its not the JLS, but the Executor Service that specifies what
> to do with that re-thrown error.  It does get propagated, but probably
> caught and ignored by the worker thread.
>
> --
> Daniel Pitts' Tech Blog: <http://virtualinfinity.net/wordpress/>

Many thanks for your help gentlemen.

It is indeed a shame that the Executor service does not seem to
propagate the exception. If you have critical tasks running on an
ExectorService I wonder how you check if any have failed.....

As an aside I have been looking at Java - Concurrency in practice and
they seem to recommend resetting the interrupted status of the thread
when catching an InterruptedException in the run() method of a
Runnable (and presumably the call() method of a Callable).

Could anyone please explain why this should be necessary and what
could subsequently check the interrupted status of the (possibly
terminated) Thread.

John B. Matthews

unread,
Sep 18, 2009, 10:25:02 AM9/18/09
to
In article
<1b508a20-cef0-4227...@k26g2000vbp.googlegroups.com>,
The Dude <matthew...@googlemail.com> wrote:

> It is indeed a shame that the Executor service does not seem to
> propagate the exception. If you have critical tasks running on an
> ExectorService I wonder how you check if any have failed.....

Goetz, et al. discuss this further in section 7.3, "Handling Abnormal
Thread Termination." [1]

> As an aside I have been looking at Java - Concurrency in practice and
> they seem to recommend resetting the interrupted status of the thread
> when catching an InterruptedException in the run() method of a
> Runnable (and presumably the call() method of a Callable).
>
> Could anyone please explain why this should be necessary and what
> could subsequently check the interrupted status of the (possibly
> terminated) Thread.

This is discussed further in section 5.4, "Blocking and Interruptible
Methods." [1] Simple examples may be found in this discussion of exec().
[2]

[1]<http://www.javaconcurrencyinpractice.com/>
[2]<http://mindprod.com/jgloss/exec.html>

--
John B. Matthews
trashgod at gmail dot com
<http://sites.google.com/site/drjohnbmatthews>

0 new messages