Interrupt Before Queue Poll Not Working

88 views
Skip to first unread message

Michael Pilone

unread,
Aug 2, 2016, 10:43:50 AM8/2/16
to haze...@googlegroups.com
I ran into an issue where interrupting a thread before doing a queue poll doesn't cause the poll operation to return immediately or throw an interrupted exception. From the best I can tell the interrupt is properly detected in AbstractInvocationFuture line 156 - 160 but the park() call is in an infinite for loop so after detecting the interrupt the loop just goes back to being parked. I didn't try interrupting from a separate thread but I suspect the same issue exists. This makes it tough to interrupt a queue polling thread.

Am I missing something or is this a bug introduced with the move to park/unpark with Hazelcast 3.7?

Thanks.

Here is a simple test case:

public static void main(String[] args) {

    Config config = new Config();
    config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);

    HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
    try {

      // Interrupt the thread.
      Thread.currentThread().interrupt();

      // Verify that park detects existing interrupt.
      System.out.println("Time before lock: " + System.currentTimeMillis());
      LockSupport.parkNanos(TimeUnit.MINUTES.toNanos(5));
      System.out.println("Time after lock: " + System.currentTimeMillis());

      // Interrupt the thread again.
      Thread.currentThread().interrupt();

      // Verify that queue poll detects existing interrupt.
      System.out.println("Time before poll: " + System.currentTimeMillis());
      hz.getQueue("test").poll(1, TimeUnit.MINUTES);
      System.out.println("Time after poll: " + System.currentTimeMillis());
    }
    catch (Exception ex) {
      ex.printStackTrace();
    }
    finally {
      hz.shutdown();
    }

  }


The output is:

Aug 02, 2016 10:41:36 AM com.hazelcast.core.LifecycleService
INFO: [10.52.1.26]:5701 [dev] [3.7-SNAPSHOT] [10.52.1.26]:5701 is STARTED
Time before lock: 1470148896606
Time after lock: 1470148896606
Time before poll: 1470148896606
Aug 02, 2016 10:41:36 AM com.hazelcast.internal.partition.impl.PartitionStateManager
INFO: [10.52.1.26]:5701 [dev] [3.7-SNAPSHOT] Initializing cluster partition table arrangement...
Time after poll: 1470148957368
Aug 02, 2016 10:42:37 AM com.hazelcast.core.LifecycleService
INFO: [10.52.1.26]:5701 [dev] [3.7-SNAPSHOT] [10.52.1.26]:5701 is SHUTTING_DOWN

Michael Pilone

unread,
Aug 2, 2016, 10:59:22 AM8/2/16
to haze...@googlegroups.com
I verified that the same problem exists when the poll() is executing concurrently in another thread. This seems like an issue. I included a slightly longer test case. This is on 3.7-SNAPSHOT from master. The issue appears to be in InvocationFuture where onInterruptDetected simply sets a flag but doesn't throw an InterruptedException so AbstractInvocationFuture simply goes back to park().

-mike


 public static void main(String[] args) {

    Config config = new Config();
    config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);

    HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
    try {

      // Interrupt the thread.
      Thread.currentThread().interrupt();

      // Verify that park detects existing interrupt.
      System.out.println("Time before lock: " + System.currentTimeMillis());
      LockSupport.parkNanos(TimeUnit.MINUTES.toNanos(1));
      System.out.println("Time after lock: " + System.currentTimeMillis());

      // Clear the interrupt because park doesn't clear it via an exception.
      Thread.interrupted();

      // TEST 1: Interrupt before poll.
//      // Interrupt the thread again.
//      Thread.currentThread().interrupt();
//
//      // Verify that queue poll detects existing interrupt.
//      System.out.println("Time before poll: " + System.currentTimeMillis());
//      hz.getQueue("test").poll(1, TimeUnit.MINUTES);
//      System.out.println("Time after poll: " + System.currentTimeMillis());
      //
      //
      // TEST 2: Interrupt during poll.
      Thread poller = new Poller(hz.getQueue("test"));
      poller.start();

      // Give time for the poller to start.
      Thread.sleep(2000);

      // Interrupt the poller.
      poller.interrupt();
      poller.join();
    }
    catch (Exception ex) {
      ex.printStackTrace();
    }
    finally {
      hz.shutdown();
    }

  }

  private static class Poller extends Thread {

    private final IQueue<String> queue;

    public Poller(IQueue<String> queue) {
      this.queue = queue;
    }

    @Override
    public void run() {

      try {
        System.out.println("Time before poll: " + System.currentTimeMillis());
        queue.poll(1, TimeUnit.MINUTES);
        System.out.println("Time after poll: " + System.currentTimeMillis());
      }
      catch (InterruptedException ex) {
        ex.printStackTrace();
      }
    }

  }

Noctarius

unread,
Aug 3, 2016, 12:06:46 AM8/3/16
to Christoph Engelbert - Hazelcast
Hey Mike,

Thanks of looking into the issue and mentioning it. Not that deep into the code anymore but I bet Peter has an idea. Can you open a bug report on github?

Chris

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+...@googlegroups.com.
To post to this group, send email to haze...@googlegroups.com.
Visit this group at https://groups.google.com/group/hazelcast.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast/17D1AD4E-4BB6-47B2-A747-DE5CFF6674BA%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Peter Veentjer

unread,
Aug 3, 2016, 3:12:27 AM8/3/16
to Hazelcast
It isn't a bug; it is a feature :)

This is how the interruption system is supposed to work. If a thread is interrupted, it will mark the 'interrupted requested' flag on the Invocation. Blocking operations are periodically 'retried' (purely an internal matter) and on retrying this interrupt flag is checked. If it is set, the operation isn't retried and the InterrupedException is thrown.

If something like this would not be done, then you would locally interrupt the future; but remotely the operation will get executed.Imagine doing a lock.acquire which gets interrupted; then without this approach, it could be that you locally would throw the InterruptedException, but remotely the lock is acquired!

Michael Pilone

unread,
Aug 3, 2016, 9:28:21 AM8/3/16
to haze...@googlegroups.com
Wow, that's a rough "feature". I can understand that the operation can't be immediately interrupted because it is distributed but it seems like the distributed object should make every attempt to cancel the operation (remote or local) and return when interrupted. Without honoring thread interrupts the blocking operations, especially those that block indefinitely like queue.take(), become somewhat useless because there is no timely way to safely shutdown an application or thread using those operations. I can understand some delay (in the low millisecond range) to wait for the interrupt/cancel to propagate to the remote node but my simple test showed it waits the full minute for the timeout to complete.

I'm making updates to HazelcastMQ and I want to reduce the message dispatch overhead by directly polling a queue. However, if the application can't shutdown a channel in a timely manor this probably isn't possible. It seems like my options are to use a busy loop with a short poll timeout or rely on the distributed event notification to know when I can do a no-wait poll. Either case isn't great and adds overhead. Even the busy loop with a low poll timeout could take minutes to stop if many queues are being polled.

Ideally I could just have a thread sitting on queue.take() and interrupt it when the application wants to shutdown or in the case of HazelcastMQ, when the message channel is closed.

This seems more like a bug than a feature to me. At a minimum it should be documented on each distributed object that blocking operations cannot be interrupted (and depending on the use case, may need to be avoided).

-mike


--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+...@googlegroups.com.
To post to this group, send email to haze...@googlegroups.com.
Visit this group at https://groups.google.com/group/hazelcast.
Reply all
Reply to author
Forward
0 new messages