Hazelcast IQueue in clusterd environment receives item on all the members

342 views
Skip to first unread message

Dummy

unread,
Sep 23, 2016, 6:30:29 PM9/23/16
to Hazelcast

hazelcast version: 3.6.4

I am using client server topology in myproject and trying to execute some operations on my hazelcast server.

I am putting the serialized bean on my hazelcast queue from client as below

IQueue<SerializedBean> queue = HzUtil.getInstance().getQueue("myqueue");

boolean  status = queue.offer(serializedBean,<timeoutvalue>,TimeUnit.MILLISECONDS );

on my hazelcast server I have registered listener and I am running hazelcast in cluster mode.

<hz:queue name="myqueue">
    <hz:item-listeners>
        <hz:item-listener implementation="myqueueImpl" include-value="true"/>
    </hz:item-listeners>
</hz:queue>


    public class MyqueueImpl implements ItemListener<SerializedBean> {

        public void itemAdded(ItemEvent<SerializedBean> inputMessage) {
            System.out.println("Item added to the queue ");
            //sometask
        }

    public void itemRemoved(ItemEvent<SerializedBean> removedItem) {

            System.out.println("Item removed from the queue ");
        }
}

Issue:

what I have observed that the queue item is received and picked up and executed on both the members of the cluster. I want the item to be picked up only once once on any of the cluster member. Please let me know what exactly I am missing

Ali Gurbuz

unread,
Sep 23, 2016, 6:35:22 PM9/23/16
to Hazelcast

You have to add 'local' listener, otherwise all members will receive the event.


--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+unsubscribe@googlegroups.com.
To post to this group, send email to haze...@googlegroups.com.
Visit this group at https://groups.google.com/group/hazelcast.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast/98dcd968-c6d6-493a-851f-377dc459da78%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dummy

unread,
Sep 24, 2016, 3:43:31 AM9/24/16
to Hazelcast
thanks Ali for your response, I was referring to the documentation and I see that there is no local property available for itemListener  - http://docs.hazelcast.org/docs/3.5/manual/html/listenerconfig.html
for queue - I am using itemListener as suggested in the document.

Thanks !


On Friday, September 23, 2016 at 3:35:22 PM UTC-7, Ali Gurbuz wrote:

You have to add 'local' listener, otherwise all members will receive the event.

On Sep 24, 2016 01:30, "Dummy" <nitin...@gmail.com> wrote:

hazelcast version: 3.6.4

I am using client server topology in myproject and trying to execute some operations on my hazelcast server.

I am putting the serialized bean on my hazelcast queue from client as below

IQueue<SerializedBean> queue = HzUtil.getInstance().getQueue("myqueue");

boolean  status = queue.offer(serializedBean,<timeoutvalue>,TimeUnit.MILLISECONDS );

on my hazelcast server I have registered listener and I am running hazelcast in cluster mode.

<hz:queue name="myqueue">
    <hz:item-listeners>
        <hz:item-listener implementation="myqueueImpl" include-value="true"/>
    </hz:item-listeners>
</hz:queue>


    public class MyqueueImpl implements ItemListener<SerializedBean> {

        public void itemAdded(ItemEvent<SerializedBean> inputMessage) {
            System.out.println("Item added to the queue ");
            //sometask
        }

    public void itemRemoved(ItemEvent<SerializedBean> removedItem) {

            System.out.println("Item removed from the queue ");
        }
}

Issue:

what I have observed that the queue item is received and picked up and executed on both the members of the cluster. I want the item to be picked up only once once on any of the cluster member. Please let me know what exactly I am missing

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+...@googlegroups.com.

nitin...@gmail.com

unread,
Sep 24, 2016, 10:59:11 AM9/24/16
to Hazelcast

I am using client server topology in myproject and trying to execute some operations on my hazelcast server.

I am putting the serialized bean on my hazelcast queue from client as below

IQueue<SerializedBean> queue = HzUtil.getInstance().getQueue("myqueue");

boolean  status = queue.offer(serializedBean,<timeoutvalue>,TimeUnit.MILLISECONDS );

on my hazelcast server I have registered listener and I am running hazelcast in cluster mode.

<hz:queue name="myqueue">
    <hz:item-listeners>
        <hz:item-listener implementation="myqueueImpl" include-value="true"/>
    </hz:item-listeners>
</hz:queue>


    public class MyqueueImpl implements ItemListener<SerializedBean> {

        public void itemAdded(ItemEvent<SerializedBean> inputMessage) {
            System.out.println("Item added to the queue ");
            //sometask
        }

    public void itemRemoved(ItemEvent<SerializedBean> removedItem) {

            System.out.println("Item removed from the queue ");
        }
}

Issue:

what I have observed that the queue item is received and picked up on both the members of the cluster. I want the item to be picked up only once once on any of the cluster member. 


Please let me know what exactly I am missing.


Michael Pilone

unread,
Sep 24, 2016, 11:09:30 AM9/24/16
to haze...@googlegroups.com
All registered item listeners will be notified of the addition, but the removal of the item from the queue (via queue.take()) will only return an item to the first node. So a common use case is to just have a blocking poll() on the queue to get the item or use the item listener event to start a task on an executor to poll() the queue. Basically the item listeners are designed to tell you of a queue change while poll() and take() are designed to remove the item from the queue in a mutually exclusive way.

-mike

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+...@googlegroups.com.
To post to this group, send email to haze...@googlegroups.com.
Visit this group at https://groups.google.com/group/hazelcast.

Nick Pratt

unread,
Sep 24, 2016, 5:13:42 PM9/24/16
to haze...@googlegroups.com
Are you confusing queue *listeners* with actual push/pop/offer/take operations on the queue ?

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+unsubscribe@googlegroups.com.

Dummy

unread,
Sep 25, 2016, 7:08:28 PM9/25/16
to Hazelcast
Looks like I am confusing both.. ? I am missing something on servers side while handling the queue. I  have shared the code above can u plz point that or share an example ?


On Saturday, September 24, 2016 at 2:13:42 PM UTC-7, Nick Pratt wrote:
Are you confusing queue *listeners* with actual push/pop/offer/take operations on the queue ?
On Fri, Sep 23, 2016 at 5:20 PM, <nitin...@gmail.com> wrote:

I am using client server topology in myproject and trying to execute some operations on my hazelcast server.

I am putting the serialized bean on my hazelcast queue from client as below

IQueue<SerializedBean> queue = HzUtil.getInstance().getQueue("myqueue");

boolean  status = queue.offer(serializedBean,<timeoutvalue>,TimeUnit.MILLISECONDS );

on my hazelcast server I have registered listener and I am running hazelcast in cluster mode.

<hz:queue name="myqueue">
    <hz:item-listeners>
        <hz:item-listener implementation="myqueueImpl" include-value="true"/>
    </hz:item-listeners>
</hz:queue>


    public class MyqueueImpl implements ItemListener<SerializedBean> {

        public void itemAdded(ItemEvent<SerializedBean> inputMessage) {
            System.out.println("Item added to the queue ");
            //sometask
        }

    public void itemRemoved(ItemEvent<SerializedBean> removedItem) {

            System.out.println("Item removed from the queue ");
        }
}

Issue:

what I have observed that the queue item is received and picked up on both the members of the cluster. I want the item to be picked up only once once on any of the cluster member. 


Please let me know what exactly I am missing.


--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+...@googlegroups.com.

Dummy

unread,
Sep 25, 2016, 7:40:32 PM9/25/16
to Hazelcast
Thanks Mike..
I was looking for such implementation as you suggested by couldn't see any sample available, t will be great if you can share one ?

Michael Pilone

unread,
Sep 26, 2016, 9:37:38 AM9/26/16
to haze...@googlegroups.com
An IQueue behaves a lot like a normal Java BlockingQueue so you can refer to that documentation for more examples. So to remove an item from the queue, you would do something like:

IQueue<SerializedBean> queue = HzUtil.getInstance().getQueue("myqueue");
SerializedBean item = queue.poll();
if (item != null) {
  // do something with the item
}

If you want to wait for an item, you can poll with a timeout:
SerializedBean item = queue.poll(10, TimeUnit.SECONDS);

Or you can take an item which blocks indefinitely:
SerializedBean item = queue.take();

So you can have a simple Runnable and submit it to an executor or run it in a separate thread such as:

class MyTask implements Runnable {

  private volatile boolean shutdown;

  public void run() {
IQueue<SerializedBean> queue = HzUtil.getInstance().getQueue("myqueue");
while (!shutdown) {
        SerializedBean item = queue.poll(5, TimeUnit.SECONDS);
if (item != null) {
  // do something with the item
}
  }
}

executor.submit(new MyTask());

Or from the ItemListener, instantiate a Runnable and submit it to an executor when you get the item added event. You could also already have a thread running and waiting on a Java Condition and signal the condition when you get the event in the ItemListener. Most of this isn't Hazelcast specific, just normal multi-threading and event handling. The only thing Hazelcast specific is that the ItemListeners may live on different nodes (or JVMs on the same node) and will compete to consume from the queue just like multiple threads in a single JVM. Hazelcast handles the thread/node safety within the queue to ensure that only one node gets an item on each poll/take operation.

One thing to avoid is consuming from the queue in the ItemListeners. While it is technically possible, the event listeners should be very fast operations that simply notify some other thread or create a task for future execution and return. Otherwise you run the risk of blocking other operations as Hazelcast waits for your item listener to return.

-mike


Vishal Patil

unread,
Aug 1, 2020, 11:56:25 AM8/1/20
to Hazelcast
I would rather create my own partition key hash for the items in the queue, rather that indefinite while loop- 

on server/member - hz.getQueue("JobQueue").offer("jobId"+ itemPartitionKey);  //   itemPartitionKey  => 12, 3, 4, 5, 6 (or any hashing)

and on client listeners I can simply do this-  

if( (itemPartitionKey % numberOfClientInstances != thisClientIndex) { return; }

So my items are evenly distributed on clients. I can scale client as I want and still I will get a good distribution.

mat...@openlattice.com

unread,
Aug 1, 2020, 3:40:01 PM8/1/20
to Hazelcast
As long as your clients have roughly equivalent load and processing power just make an indefinite loop that properly handles interruption and you'll get a roughly evenly distributed items among all your clients (use take instead of poll).

If you really must be able to do this, your queue configuration supports wild cards so you could do hz.getQueue("JobQueue" + itemPartitionKey) and then on your client get hz.getQueue("JobQueue" + itemPartitionKey) for each satisfying itemPartitionKey % numberOfClientInstances != thisClientIndex. Make sure you use poll so entries all don't block.

I suspect that the large number of queues will have more of a performance impact than just doing one queue with an infinite loop on each client.

Reply all
Reply to author
Forward
0 new messages