Hazelcast local entry listener and migration

966 views
Skip to first unread message

Zack Radick

unread,
Sep 14, 2011, 2:01:00 PM9/14/11
to haze...@googlegroups.com
I am prototyping using a MultiMap for storing grouped lists of tasks to be processed with a LocalEntryListener on the map to make sure that workers know when there is work to be done.  I have a couple of questions related to this and my observations from prototyping.
 
1) I am registering my interest in receiving local entries as:
Hazelcast.getMultiMap( MAP_NAME ).addLocalEntryListener( this );
This subscribes me to the map and makes the local Thread available for accepting a partition of said map (as I understand it).  Is there inherently a race condition between subscribing to the map and adding the local entry listener?  Is there a possibility that local entries will be added before the listener is in place?  I have not observed this behavior, but it looks a bit contentious at a glance.
 
2) In the case of partition migration I have observed that the new partition does not necessarily receive a call for each key/value pair in the multi map, though I believe that it does get called for each key (which seems reasonable to me but is different than the behavior I have observed when not migrating).  Is this expected?
 
3) In a few circumstances I have observed cases where an Object is not removed from the underlying list on the backup node (we are using default backup of 1).  This happens very rarely (In my last two tests it happened 2 and 3 times respectively out of ~350,000 total entries) but I am fairly certain of the cause and result.  The steps of the symptom are:
a) Instance A receives ID1/value1 via LocalEventEntry listener
b) Instance A processes ID1/value1
c) Instance A removes ID1/value1 and sees removal on LocalEventEntry listener
d) some time passes... (multiple minutes sometimes)
e) Instance A is killed
f) Instance B receives ownership of the partition containing ID1
g) Instance B receives ID1/value2 via LocalEventEntry listener (this is potentially not part of the migration)
h) Instance B processes ID1/value1 (previously worked)
i) Instance B removes ID1/value1 and sees removal on LocalEventEntry listener
j) Instance B processes ID1/value2
k) Instance B removes ID1/value2 and sees removal on LocalEventEntry listener
 
In my test scenario I am using a random distribution of 1000 keys while generating 100 new events per second on each JVM with a ramp up to 4 total JVMs running and a ramp down to 1 followed by halting production of events on the last machine and finally exiting when everything has been processed.  With heavy logging and a simple log parsing utility I observe that all expected events occur only once, except in these couple of cases where the same item is worked twice.  I am certain we can work around this, but it seemed worth noting.
 
4) I have not observed any cases where an item that was being worked has had it's partition moved while working, but I am not entirely confident that this is a given.  I have been simulating a 5ms "IO" work time for worked items but it will definitely be more dynamic in reality.  I was imagining I would need a global mutex around keys such that two processes would never work the same key at the same time even during migration (due to a new member being added), does that seem like a necessity?  In general my process is to make a Queue of keys to work from the keys given to a local process manager via the LocalEventEntry listener and to work them with a pool of worker threads (locally ensuring that only one Thread operates on a given key at a time for safety).  Since the "work" is fast in my test case the queue never builds up significantly, but it might spike in a real environment and then slowly drain down.  In such a case if a new member is added and a partition migrated will the member that previously "owned" the partition get a call on it's LocalEventEntry listener indicating removal of the key or is a MigrationListener needed (or a different strategy)?  Does a distributed Queue work better for this scenario (ignoring that it does not group like updates as is desirable to reduce persistence times)?
 
Sorry for the laundry.
Thanks in advance,
--Zack

Zack Radick

unread,
Sep 16, 2011, 2:27:05 PM9/16/11
to haze...@googlegroups.com
Experimentally it seems that the distributed Queue is significantly slower than the MultiMap in ops/sec (especially when considering the IO advantages of being able to perform several updates to a single Object with one call to persist) so I have decided to stick with the MultiMap and deal with the couple of minor issues around my process that intersect with using it.  Since I am building a queue of work based on the local event entry listener I need to ensure that two separate processes don't work the same item when a new node is added.  I can solve this relatively simply with locking, but locking reduces my throughput by almost 1/3, so I would like to pay that penalty only when necessary.  I was hoping to use the migration listener interface to know when a member was added and the partitions redistributed, but it does not appear to work that way.  My listener only seems to be notified when a member dies.  Is this the intent?  Is there any way to be notified of partition migrations that happen when a new node is added?
 
I am also still curious about questions 1-3 in my previous post.
 
Thanks,
--Zack

Fuad Malikov

unread,
Sep 23, 2011, 3:54:22 AM9/23/11
to haze...@googlegroups.com

Fuad
http://twitter.com/fuadm




On Wed, Sep 14, 2011 at 9:01 PM, Zack Radick <zra...@conducivetech.com> wrote:
I am prototyping using a MultiMap for storing grouped lists of tasks to be processed with a LocalEntryListener on the map to make sure that workers know when there is work to be done.  I have a couple of questions related to this and my observations from prototyping.
 
1) I am registering my interest in receiving local entries as:
Hazelcast.getMultiMap( MAP_NAME ).addLocalEntryListener( this );
This subscribes me to the map and makes the local Thread available for accepting a partition of said map (as I understand it).  Is there inherently a race condition between subscribing to the map and adding the local entry listener?  Is there a possibility that local entries will be added before the listener is in place?  I have not observed this behavior, but it looks a bit contentious at a glance.

First the entry will be added, later the listeners will be invoked.
 
 
 
2) In the case of partition migration I have observed that the new partition does not necessarily receive a call for each key/value pair in the multi map, though I believe that it does get called for each key (which seems reasonable to me but is different than the behavior I have observed when not migrating).  Is this expected?
 
 Local listeners will not be triggered if an entry is migrated. 
 
 
3) In a few circumstances I have observed cases where an Object is not removed from the underlying list on the backup node (we are using default backup of 1).  This happens very rarely (In my last two tests it happened 2 and 3 times respectively out of ~350,000 total entries) but I am fairly certain of the cause and result.  The steps of the symptom are:
a) Instance A receives ID1/value1 via LocalEventEntry listener
b) Instance A processes ID1/value1
c) Instance A removes ID1/value1 and sees removal on LocalEventEntry listener
d) some time passes... (multiple minutes sometimes)
e) Instance A is killed
f) Instance B receives ownership of the partition containing ID1
g) Instance B receives ID1/value2 via LocalEventEntry listener (this is potentially not part of the migration)
h) Instance B processes ID1/value1 (previously worked)
i) Instance B removes ID1/value1 and sees removal on LocalEventEntry listener
j) Instance B processes ID1/value2
k) Instance B removes ID1/value2 and sees removal on LocalEventEntry listener
 
In my test scenario I am using a random distribution of 1000 keys while generating 100 new events per second on each JVM with a ramp up to 4 total JVMs running and a ramp down to 1 followed by halting production of events on the last machine and finally exiting when everything has been processed.  With heavy logging and a simple log parsing utility I observe that all expected events occur only once, except in these couple of cases where the same item is worked twice.  I am certain we can work around this, but it seemed worth noting.

Do you mean here that: Even though you have removed the ID1/value1 at c), you see it not removed at h) when the ID1 is restored from backup? If yes, please create an issue, we'll try to figure out the cause and fix if it is a bug.  

 
4) I have not observed any cases where an item that was being worked has had it's partition moved while working, but I am not entirely confident that this is a given.  I have been simulating a 5ms "IO" work time for worked items but it will definitely be more dynamic in reality.  I was imagining I would need a global mutex around keys such that two processes would never work the same key at the same time even during migration (due to a new member being added), does that seem like a necessity?  In general my process is to make a Queue of keys to work from the keys given to a local process manager via the LocalEventEntry listener and to work them with a pool of worker threads (locally ensuring that only one Thread operates on a given key at a time for safety).  Since the "work" is fast in my test case the queue never builds up significantly, but it might spike in a real environment and then slowly drain down.  In such a case if a new member is added and a partition migrated will the member that previously "owned" the partition get a call on it's LocalEventEntry listener indicating removal of the key or is a MigrationListener needed (or a different strategy)?  Does a distributed Queue work better for this scenario (ignoring that it does not group like updates as is desirable to reduce persistence times)?
 
Sorry for the laundry.
Thanks in advance,
--Zack

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To post to this group, send email to haze...@googlegroups.com.
To unsubscribe from this group, send email to hazelcast+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/hazelcast?hl=en.

Fuad Malikov

unread,
Sep 23, 2011, 4:30:17 AM9/23/11
to haze...@googlegroups.com
The partition listener should be notified both when the new node comes and leaves. I tried and it works just fine. 
Can you come up with a test case?


Fuad Malikov

unread,
Sep 23, 2011, 4:51:13 AM9/23/11
to haze...@googlegroups.com
One thing though. It is not called only if say you started A, it owns all the partitions. But all partitions are empty. Then you start B. The half of the partitions will be automatically assigned to B and you want receive a notification. If you fill the partitions enough and then start B. The partitions will be migrated one by one and you'll see the events. 

We should fix this actually. I have just filed and issue 674.


Fuad
http://twitter.com/fuadm

Joe Planisky

unread,
Sep 23, 2011, 9:38:18 AM9/23/11
to haze...@googlegroups.com
I have also seen this happen a couple times in my testing. Entries removed from a map in node A reappear in the map on node B when node A goes away 30 minutes later. When it happened, it was for a very small number of entries (e.g. 2 out of 60,000). I've not been able to reproduce it at will. I originally saw it in 1.9.4-Final.

(As a work-around, I added an "already processed" flag to my objects to mitigate this issue.)

--
Joe P

On Sep 23, 2011, at 12:54 AM, Fuad Malikov wrote:

[snip]

>
> On Wed, Sep 14, 2011 at 9:01 PM, Zack Radick <zra...@conducivetech.com>wrote:
>

[snip]

Zack Radick

unread,
Sep 23, 2011, 12:33:52 PM9/23/11
to haze...@googlegroups.com
Thanks Fuad, that's just what I was talking about.  I see notifications as expected when the 3rd machine comes online, but the initial split with the second is often silent.
--Zack

Zack Radick

unread,
Sep 23, 2011, 12:45:17 PM9/23/11
to haze...@googlegroups.com
Joe/Fuad,
I think a few missed removes on a backup is likely inevitable, given that we are depending on cross network operations.  Making idempotent work is really the only safe way to handle it (especially in my case where the datastore does not support transactions).  So it is probably not worth obsessing over the very small number of things that end up being done twice.  A handful of operations over multiple thousands of entries is acceptable to me.  It is definitely not "perfect" (and I will submit some code that can reproduce the issue if desired), but given the nature of network communication it is almost inevitable without adding additional re-checks/validation that could slow the framework down.  I would rather it was quick and 99.99% right than slow and 100% right.
Cheers,
--Zack

Fuad Malikov

unread,
Sep 23, 2011, 12:50:01 PM9/23/11
to haze...@googlegroups.com, haze...@googlegroups.com
I think we can have both 100% right and quick. It will be great if we can have the code to reproduce the issue.

Regards,

Fuad Malikov
Sent from Mobile

Zack Radick

unread,
Sep 23, 2011, 12:52:18 PM9/23/11
to haze...@googlegroups.com
Fuad,
Thanks for your responses!  I wanted to clarify one thing in regards to your comment below:

2) In the case of partition migration I have observed that the new partition does not necessarily receive a call for each key/value pair in the multi map, though I believe that it does get called for each key (which seems reasonable to me but is different than the behavior I have observed when not migrating).  Is this expected?

 Local listeners will not be triggered if an entry is migrated.
In a migration Local listeners don't get ANY calls?  I thought it looked like they were being called once per key that migrated (with one of the values), but it could be that my key space is small enough that I was triggering work on them shortly anyway.  Is the intention that they should get called?  If not, what is the preferred way to handle migration with local listeners?
 
Thanks!
--Zack

Zack Radick

unread,
Sep 23, 2011, 12:55:08 PM9/23/11
to haze...@googlegroups.com
Alright, I will try to make the simplest case I can that will reproduce the issue and get some code posted.  I really like the API you guys have developed, keep up the good work!
Cheers,
--Zack

Zack Radick

unread,
Sep 23, 2011, 7:56:23 PM9/23/11
to haze...@googlegroups.com
Fuad,
I created Issue 680 for this.  If I can provide further information let me know.  I don't think it is really a big deal personally, but there is some code to try to reproduce this linked to the issue if you want to pursue it.
Cheers,
--Zack

Fuad Malikov

unread,
Sep 27, 2011, 4:03:30 AM9/27/11
to haze...@googlegroups.com


Fuad
http://twitter.com/fuadm




On Fri, Sep 23, 2011 at 7:52 PM, Zack Radick <zra...@conducivetech.com> wrote:
Fuad,
Thanks for your responses!  I wanted to clarify one thing in regards to your comment below:

2) In the case of partition migration I have observed that the new partition does not necessarily receive a call for each key/value pair in the multi map, though I believe that it does get called for each key (which seems reasonable to me but is different than the behavior I have observed when not migrating).  Is this expected?

 Local listeners will not be triggered if an entry is migrated.
In a migration Local listeners don't get ANY calls?  I thought it looked like they were being called once per key that migrated (with one of the values), but it could be that my key space is small enough that I was triggering work on them shortly anyway. 

LocalListeners are and should be triggered when there will be an update on the entry. A migration is rather a special case, where the entry was there but the owner is changed. You can listen migration events through Partition Service ( Hazelcast.getPartitionService().)
 
Is the intention that they should get called?  If not, what is the preferred way to handle migration with local listeners?

Well, above this there is no perfect way to solve this. I would recommend to check the following article. http://highscalability.com/blog/2011/4/12/caching-and-processing-2tb-mozilla-crash-reports-in-memory-w.html

It may give some idea at least.
 
 
Thanks!
--Zack

Zack Radick

unread,
Sep 27, 2011, 12:45:07 PM9/27/11
to haze...@googlegroups.com
Fuad,
 
Thanks again for the responses.  I have read and watched the presentation about the Mozilla crash reports, very cool.
 
I wrote a quick test to verify (just to be empirically sure) and the local listeners definitely don't get notified when the partitions migrate.  This seems a little surprising to me given that the locality of the data has changed, but with that knowledge in hand I can certainly work around it.
 
A brute force approach would seem to be to ask the MultiMap for the local key set after a migration even has ended and compare it to the currently known set.  If there are not any existing patterns around this I will put something together that works for my use case.
 
Thanks,
--Zack

Zack Radick

unread,
Sep 27, 2011, 1:29:50 PM9/27/11
to haze...@googlegroups.com
In case it is of help to anyone else, this basic pattern seems to mostly do the trick for me as far as migrations and local listeners (this method is from the MigrationListener interface):
 
public void migrationCompleted( MigrationEvent migrationEvent )
  {
   if( migrationEvent.getNewOwner().localMember() )
   {
    int migratedPartition = migrationEvent.getPartitionId();
    final Set<TYPE> localKeys = map.localKeySet();
    for( TYPE value : localKeys )
     if( partitionService.getPartition( value ).getPartitionId() == migratedPartition )
      keys.add( value ); // Here you would do whatever you need with a newly migrated event
   }
  }
No warranties expressed or implied.  ;-)
YMMV,
--Zack
Reply all
Reply to author
Forward
0 new messages