Question about using Hazelcast to implement failover support for a stateful service

1,000 views
Skip to first unread message

mongonix

unread,
Nov 25, 2011, 11:00:13 AM11/25/11
to haze...@googlegroups.com
Hi,

I'm evaluating currently how to apply Hazelcast in a best way for the following use-case:

I have a system with a bunch of nodes running in the cloud. Each node provides the same service exposed via different kinds of interfaces (SOAP, REST, some proprietary protocols). This service is rather stateful. Upon a first request to a service from a given client a new application-specific session is established. Later any further requests to this service from this client should refer to the same application session, so that they can be correlated. Among other things, our application-specific sessions have getAttribute/setAttribute methods to associate key/values with this session (similar to e.g. HTTPSessions).

Currently, we cannot easily replicate the state of the service running on one node to other nodes (due to some difficulties in replicating state of certain proprietary protocol stacks). Therefore, we use a concept of sticky sessions, where all requests related to the same application session should be handled by the same node in our deployment. To spread the load, we have a load balancer in front of our system, which tries to spread initial requests evenly across the nodes and forward all subsequent requests to the node that handled the initial request for a given session. Done this way, there is no big need to replicate the state of the service.

As you can see from the description, we cannot service any request by any node, as it is usually possible for stateless services. But we want to have at least the ability to do a fail-over in rare cases where some of the nodes have major failures. Our idea is to do the following: 
- for each application-specific session, we need to have a replica (or multiple replicas)
- should a node that handles a given application session fail, we want to switch service requests for the application sessions owned by the failed node to the node that maintains the replica

And now the questions:

1) I understand that I can achieve replication of state using Hazelcast just by using a simple distributed map or something similar. But I'm concerned about performance and scalability of this solution.
As I explained, all of the state of a given application session is maintained by a single node currently. And each node may maintain e.g. 100000 application sessions. If there are no failures 100% of all reads/writes from/to this application session will happen from this single node. At the same time, if I understand correctly, Hazelcast will distribute the map (i.e. keys/values) across the whole cluster, i.e. across all nodes in our deployment. And this is probably an unnecessary CPU and memory overhead, as we know in advance that only one node (the one handling the session) needs to access the data most of the time. It is also not clear, if it would slow down the access to the data on this node, as it needs now to talk to other nodes to fetch the data that it produced before, or? So, is there a way to reduce this overhead?

2) In case of a failover, another node will take the role of the failing one. But this failover-node will need to fetch all the data about the session state from other nodes as it owns only a (small) part of the data due to Hazelcast's distributed nature. And this also seems not very efficient. Effectively, we'd like to have a replica of the whole application session state for one session on a single other node, instead of spreading it all over the cluster. May be, we would even like to have one node as a backup of another node, i.e. it contains complete replicas of all application sessions controlled by the other node. The question here is: if and how this can be achieved by means of Hazelcast?

3) If we use one distributed Map per application-specific session and we have 1000000 such sessions in our deployment, is it a performance problem for Hazelcast to have 1000000 distributed maps? Or it can scale nicely even in this situation? I ask because I know that some other frameworks (e.g. Infinispan) have issues with creating a lot of distributed maps dynamically.

Looking forward to see proposals and advices how to achieve the described behavior!

Tim Peierls

unread,
Nov 25, 2011, 1:55:15 PM11/25/11
to haze...@googlegroups.com
Responses interspersed below.

On Fri, Nov 25, 2011 at 11:00 AM, mongonix <romi...@gmail.com> wrote:
1) I understand that I can achieve replication of state using Hazelcast just by using a simple distributed map or something similar. But I'm concerned about performance and scalability of this solution.
As I explained, all of the state of a given application session is maintained by a single node currently. And each node may maintain e.g. 100000 application sessions. If there are no failures 100% of all reads/writes from/to this application session will happen from this single node. At the same time, if I understand correctly, Hazelcast will distribute the map (i.e. keys/values) across the whole cluster, i.e. across all nodes in our deployment. And this is probably an unnecessary CPU and memory overhead, as we know in advance that only one node (the one handling the session) needs to access the data most of the time. It is also not clear, if it would slow down the access to the data on this node, as it needs now to talk to other nodes to fetch the data that it produced before, or? So, is there a way to reduce this overhead?

If you have a natural partition of your application into largely independent chunks of state -- in your case, application-specific sessions -- you can cut down on the communication between nodes by using DistributedTasks with keys drawn from the identifiers of your work units. It adds an additional couple of "hops" for each request, from the node that handles the original request to the node that executes the DistributedTask (and back again), but it means that session-specific state and its processing can be confined to a single node instead of having to pull in data from arbitrarily many other nodes.

So the generic request handler under such an approach has this kind of structure, leaving out many details:

    Response handle(Request req) throws ... {
        Callable<Response> task = getSessionSpecificTask(req);
        Object key = getSessionKey(req);
FutureTask<Response> futureTask = new DistributedTask<Response>(task, key);
Hazelcast.getExecutorService().execute(futureTask);
return futureTask.get();
    }

The real work is done on the target node derived from key

 
2) In case of a failover, another node will take the role of the failing one. But this failover-node will need to fetch all the data about the session state from other nodes as it owns only a (small) part of the data due to Hazelcast's distributed nature. And this also seems not very efficient. Effectively, we'd like to have a replica of the whole application session state for one session on a single other node, instead of spreading it all over the cluster. May be, we would even like to have one node as a backup of another node, i.e. it contains complete replicas of all application sessions controlled by the other node. The question here is: if and how this can be achieved by means of Hazelcast?

Serialize session-specific state (or break it into components and serialize those), and after each state change write the new state to a corresponding IMap using the same key that you used to dispatch the DistributedTask. Use transactions or not, according to your needs. This call to IMap.put will cause a local write for the primary copy and remote writes to whatever nodes are providing the backup copies. Depending on your requirements, you might be able to decrease latency by using IMap.putAsync and not waiting (or not waiting long).

On failure of the primary node for a given session key, the new primary node will have local copies (taken from one of the backup copies) of all the data it needs, and DistributedTasks for the given session will be dispatched to it.

 
3) If we use one distributed Map per application-specific session and we have 1000000 such sessions in our deployment, is it a performance problem for Hazelcast to have 1000000 distributed maps? Or it can scale nicely even in this situation? I ask because I know that some other frameworks (e.g. Infinispan) have issues with creating a lot of distributed maps dynamically.

I don't have any pointers, but the Hazelcast team has in the past seemed to recommend against creating very large numbers of distributed data structures.

--tim

mongonix

unread,
Nov 25, 2011, 4:28:18 PM11/25/11
to haze...@googlegroups.com
Hi Tim,

Thanks a lot for your reply! Please see my comments below.


On Friday, November 25, 2011 7:55:15 PM UTC+1, Tim Peierls wrote:
Responses interspersed below.

On Fri, Nov 25, 2011 at 11:00 AM, mongonix <romi...@gmail.com> wrote:
1) I understand that I can achieve replication of state using Hazelcast just by using a simple distributed map or something similar. But I'm concerned about performance and scalability of this solution.
As I explained, all of the state of a given application session is maintained by a single node currently. And each node may maintain e.g. 100000 application sessions. If there are no failures 100% of all reads/writes from/to this application session will happen from this single node. At the same time, if I understand correctly, Hazelcast will distribute the map (i.e. keys/values) across the whole cluster, i.e. across all nodes in our deployment. And this is probably an unnecessary CPU and memory overhead, as we know in advance that only one node (the one handling the session) needs to access the data most of the time. It is also not clear, if it would slow down the access to the data on this node, as it needs now to talk to other nodes to fetch the data that it produced before, or? So, is there a way to reduce this overhead?

If you have a natural partition of your application into largely independent chunks of state -- in your case, application-specific sessions -- you can cut down on the communication between nodes by using DistributedTasks with keys drawn from the identifiers of your work units. It adds an additional couple of "hops" for each request, from the node that handles the original request to the node that executes the DistributedTask (and back again), but it means that session-specific state and its processing can be confined to a single node instead of having to pull in data from arbitrarily many other nodes.

So the generic request handler under such an approach has this kind of structure, leaving out many details:

    Response handle(Request req) throws ... {
        Callable<Response> task = getSessionSpecificTask(req);
        Object key = getSessionKey(req);
FutureTask<Response> futureTask = new DistributedTask<Response>(task, key);
Hazelcast.getExecutorService().execute(futureTask);
return futureTask.get();
    }

The real work is done on the target node derived from key


Yes. This is a very nice approach and I was playing with this idea since a while. Good that you confirmed that this may be a right direction.
 
 
2) In case of a failover, another node will take the role of the failing one. But this failover-node will need to fetch all the data about the session state from other nodes as it owns only a (small) part of the data due to Hazelcast's distributed nature. And this also seems not very efficient. Effectively, we'd like to have a replica of the whole application session state for one session on a single other node, instead of spreading it all over the cluster. May be, we would even like to have one node as a backup of another node, i.e. it contains complete replicas of all application sessions controlled by the other node. The question here is: if and how this can be achieved by means of Hazelcast?

Serialize session-specific state (or break it into components and serialize those), and after each state change write the new state to a corresponding IMap using the same key that you used to dispatch the DistributedTask. Use transactions or not, according to your needs. This call to IMap.put will cause a local write for the primary copy and remote writes to whatever nodes are providing the backup copies. Depending on your requirements, you might be able to decrease latency by using IMap.putAsync and not waiting (or not waiting long).

One problem here is the fact that our session-specific state is a essentially a map that is being changed quite often. And this map can have e.g. 1000 entries. Therefore, serializing it every time when a change occurs to one of the entries and letting Hazelcast distribute this copy of the map can be a bit heavy. This is the reason why I was initially thinking about representing a state of each session as an IMap. But this has other problems that were addressed above ;-) 

Any ideas how to solve this problem efficiently?
 
On failure of the primary node for a given session key, the new primary node will have local copies (taken from one of the backup copies) of all the data it needs, and DistributedTasks for the given session will be dispatched to it.

Yes. This would work if serialization of the session state would be efficient and easy.
 
3) If we use one distributed Map per application-specific session and we have 1000000 such sessions in our deployment, is it a performance problem for Hazelcast to have 1000000 distributed maps? Or it can scale nicely even in this situation? I ask because I know that some other frameworks (e.g. Infinispan) have issues with creating a lot of distributed maps dynamically.

I don't have any pointers, but the Hazelcast team has in the past seemed to recommend against creating very large numbers of distributed data structures.

It would be interesting to hear a more definitive answer about this issues from main Hazelcast developers.
 

Tim Peierls

unread,
Nov 25, 2011, 5:20:59 PM11/25/11
to haze...@googlegroups.com
On Fri, Nov 25, 2011 at 4:28 PM, mongonix <romi...@gmail.com> wrote:
Serialize session-specific state (or break it into components and serialize those), and after each state change write the new state to a corresponding IMap using the same key that you used to dispatch the DistributedTask. Use transactions or not, according to your needs. This call to IMap.put will cause a local write for the primary copy and remote writes to whatever nodes are providing the backup copies. Depending on your requirements, you might be able to decrease latency by using IMap.putAsync and not waiting (or not waiting long).

One problem here is the fact that our session-specific state is a essentially a map that is being changed quite often. And this map can have e.g. 1000 entries. Therefore, serializing it every time when a change occurs to one of the entries and letting Hazelcast distribute this copy of the map can be a bit heavy. This is the reason why I was initially thinking about representing a state of each session as an IMap. But this has other problems that were addressed above ;-) 

Any ideas how to solve this problem efficiently?

One approach is to keep all entries for all session states in one IMap (or perhaps decomposed by key and value types into a small number of IMaps), using a compound key for that map that combines the session id with the "real" key and implements PartitionAware.getPartitionKey to return the session id alone. That way all state for a given session is local to the same primary node, each write is small, and entries are backed up to the same backup nodes.

public class SessionStateKey<K> implements PartitionAware {
    public SessionStateKey(SessionId sessionId, K key) {
        this.sessionId = sessionId;
        this.key = key;
    }

    @Override public SessionId getPartitionKey() {    // covariant return type
        return sessionId;
    }

    public K getKey() {
        return key;
    }

    // Define equals and hashCode carefully.

    private final SessionId sessionId;
    private final K key;
}

The big state map is an IMap<SessionState<K>, V>. You'd probably want to use a ForwardingIMap to wrap this for a given sessionId value so that you don't have to explicitly deal with creating SessionStateKey<K> instances all the time:

    IMap<SessionStateKey<Foo>, Bar> underlyingMap = ...;
    IMap<Foo, Bar> sessionState = createSessionSpecificMap(sessionId, underlyingMap);

    // createSessionSpecificMap arranges for sessionState.get(foo) to turn into
    // underlyingMap.get(new SessionStateKey(sessionId, foo))

You give up the ability to efficiently enumerate the keys of the map that are specific to a given session, but if that's important you can find other ways to keep track.

--tim

mongonix

unread,
Nov 28, 2011, 6:01:16 AM11/28/11
to haze...@googlegroups.com
Hi Tim,

Thanks a lot for refining your initial suggestion even further! It looks rather concrete now and should work, I'd say.

At the same time, I had a look at a few other possibilities of doing it, e.g. HttpSession-replication and Tree Caches from JBoss Cache and Infinispan. I'll start a dedicated discussion thread on it to avoid discussing too many different things on one thread.

Thanks again,
  Leo

mongonix

unread,
Jan 5, 2012, 7:40:31 AM1/5/12
to haze...@googlegroups.com
Hi Talip, Hi Fuad,

Tim explained an approach already, but I see certain problems with his poposal. It is too much of a hack, I'd say. And there are potential scalability problems with enumeration of the keys required for removal of elements, if I have e.g. >=100000 of sessions.
 
So, may be you could comment a bit on how sticky sessions for a stateful service (see the original message of this thread for a description) could be efficiently implemented with current version of Hazelcast? And provide an insight if this will become any easier with the upcoming revamped session replication, as indicated by Talip?

Thanks a lot,
   Leo

mongonix

unread,
Jan 9, 2012, 3:59:26 AM1/9/12
to haze...@googlegroups.com
Hi Talip, Hi Fuad,

Sorry, but I'd like to post this question again. I'm about to start implementing the functionality described below and really need a good advice from you.

May be you could comment a bit on how sticky sessions for a stateful service (see the original message of this thread for a description) could be efficiently implemented with current version of Hazelcast? And provide an insight if this will become any easier with the upcoming revamped session replication, as indicated by Talip?

Thanks a lot,
   Leo

----------------------------------------------

Fuad Malikov

unread,
Jan 9, 2012, 6:49:00 AM1/9/12
to haze...@googlegroups.com
Hi Leo,

First of all let me clarify the cost of 1,000,000 Maps. On each new map, the creation process takes on all nodes. Each node creates a CMap object locally for each IMap. And the CMap object is heavy one, takes around 100K. So having 1M maps will blow up your memory and since it is created on all node, this doesn't scale. And you have to destroy to cleanup the resource it takes.

Putting this aside, whatever you are going to do with Hazelcast will be a bit of hack because Hazelcast will always distribute the data everywhere and you'll hack around this. Dealing with Distribtued Taks is nice but still you loose the locality of Sessions in your current architecture.

May be you should still keep the sessions locally for fast reads and put updates to Hazelcast to restore after failover. Assuming that you'll have a Session object which is essentially a Map with 1000 entries and not all of them will be updated, the following approach may help.

Have one giant Map with the following Key:

class MySessionKey implements PartitionAware{
      long sessionId; //This is the id of your session
      String SessionMapKey; //This is the key to the Session Map (a 1000 entry Map)
     

      long getPartitionKey(){
            returns sessionId;
      }
}

And you'll store the sessions locally in your own data structure and Put the updates to Hazelcast With the above key and value. Now you serialize only the changed attributes of the Session. And PartitionAware will make sure that all entries that are tied to particular Session will be stored at  the same node.

And if the server crashes, you'll fail over to another node and the data will not be there. You can run a MultiTask that will iterate on every node through local Keys. Using MySessionKey it will be able to generate the Session objects and return them to the caller. So basically you will restore the Session that was backed up in Hazelcast.

Does it sound good?




-fuad


--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To view this discussion on the web visit https://groups.google.com/d/msg/hazelcast/-/v7_qzVzMd2IJ.

To post to this group, send email to haze...@googlegroups.com.
To unsubscribe from this group, send email to hazelcast+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/hazelcast?hl=en.

mongonix

unread,
Jan 9, 2012, 11:09:09 AM1/9/12
to haze...@googlegroups.com
Hi Fuad,

Thanks a lot for your feedback!


On Monday, January 9, 2012 12:49:00 PM UTC+1, Fuad Malikov wrote:
Hi Leo,

First of all let me clarify the cost of 1,000,000 Maps. On each new map, the creation process takes on all nodes. Each node creates a CMap object locally for each IMap. And the CMap object is heavy one, takes around 100K. So having 1M maps will blow up your memory and since it is created on all node, this doesn't scale. And you have to destroy to cleanup the resource it takes.

This is a very interesting information, especially this 100K per CMap remark. Is it described somewhere in the docs? Could be useful for many people, I guess.
 
Putting this aside, whatever you are going to do with Hazelcast will be a bit of hack because Hazelcast will always distribute the data everywhere and you'll hack around this. Dealing with Distribtued Taks is nice but still you loose the locality of Sessions in your current architecture.

May be you should still keep the sessions locally for fast reads and put updates to Hazelcast to restore after failover. Assuming that you'll have a Session object which is essentially a Map with 1000 entries and not all of them will be updated, the following approach may help.

Have one giant Map with the following Key:

class MySessionKey implements PartitionAware{
      long sessionId; //This is the id of your session
      String SessionMapKey; //This is the key to the Session Map (a 1000 entry Map)
     

      long getPartitionKey(){
            returns sessionId;
      }
}

And you'll store the sessions locally in your own data structure and Put the updates to Hazelcast With the above key and value. Now you serialize only the changed attributes of the Session. And PartitionAware will make sure that all entries that are tied to particular Session will be stored at  the same node.
 
And if the server crashes, you'll fail over to another node and the data will not be there. You can run a MultiTask that will iterate on every node through local Keys. Using MySessionKey it will be able to generate the Session objects and return them to the caller. So basically you will restore the Session that was backed up in Hazelcast.

Does it sound good?

I like the idea of keeping sessions locally for fast access and replicating the updates. But I'm a bit worried about the giant map. If I understand you correctly, it will contain all attribute values of all sessions that are currently alive (because each attribute is initialized at least once). Is it correct? If so, it may mean that this map will have e.g. 1000000 * 1000 = 1 billion entries assuming there is 1000000 live sessions in the cluster, each one has 1000 attributes and each attribute is initialized at least once. So, this is really a giant distributed map, which is read only if a session is to be restored or deleted. In both cases. a MultiTask will need to iterate over quite some entries (distributed only among replica nodes of a given sessionId key, right?) to find entries belonging to this session which can be rather costly, or? Or do you say it is not an issue or there is a nice workaround for it?

But coming back to my remark about the giant map essentially holding all entries of all maps. If this is the case, wouldn't it be easier to actually build local replica Maps (with 1000 elements) on each replica node of a session sessionId (picked by HZ based on sessionId)? I.e. on an update, we publish a change using e.g. a topic or something like that (may be even a distributed map), and each listener on replica nodes waiting for topic updates gets a notification and changes the local replica of the map accordingly. The memory overhead will be much lower, I guess, as this map is not replicated and not managed by HZ. And there is no need for a giant map holding that much information, after all, we need only a way to share the updates. Once updates are applied, the corresponding update entry can be removed from a distributed data structure. Removal and fetching can be done very efficiently also. The "only" problem is: if the owner node of the session and all replica nodes crash at the same time, the session is gone, as these maps were not distributed and info about updates is not preserved.... OK. This was just a sketch of a solution. But what do you think?

Thanks,
  Leo

Fuad Malikov

unread,
Jan 9, 2012, 1:23:06 PM1/9/12
to haze...@googlegroups.com, haze...@googlegroups.com
Yes having your own maps and using topics to replicate is definitely the way you should go. We also like this approach and in the past have suggested it for others. 
Somehow this time it came the other way:(.

 Even if you store things on hazelcast, if you have two server crashes, you can loose data anyways. In your approach you can have as many replicas as you fill safe.

Regarding the cmap size no it is not documented. I will put it there as soon as I can.

Regards,

Fuad Malikov
Sent from Mobile

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To view this discussion on the web visit https://groups.google.com/d/msg/hazelcast/-/ZkX-UcNi-5IJ.

mongonix

unread,
Jan 10, 2012, 4:58:14 AM1/10/12
to haze...@googlegroups.com
Hi Fuad,


On Monday, January 9, 2012 7:23:06 PM UTC+1, Fuad Malikov wrote:
Yes having your own maps and using topics to replicate is definitely the way you should go. We also like this approach and in the past have suggested it for others. 
Somehow this time it came the other way:(.

OK. Good that some other people picked this approach as well and it worked for them.
 

 Even if you store things on hazelcast, if you have two server crashes, you can loose data anyways. In your approach you can have as many replicas as you fill safe.
 
Yes. I see your point regarding the number of replicas.


But let me ask a few clarification questions, as I only sketched the topic based approach in my previous message, but do not quite understand all the details yet.

1) If others have used this approach, is any example code available anywhere?

2) If I use topics, I probably do not use one topic per session, as it may require 1000000 topics and this will consume too much memory. So, I probably need to multiplex sending of updates for different sessions using the same topic?

3) How can I reuse at least replica selection and control mechanism of HZ to pick replicas based on the sesionId and to control that all replicas are alive and replicating again, if some of replicas are lost? Is it possible at all?

4) I need to be able to have at least N replicas at any time (similar to defining the number of replicas for HZ distributed maps). For that, I need to reuse HZ distribution mechanisms or introduce my own, which are functionally almost equivalent. Among other things I'd need the following functionality:
    - decide which nodes should keep replicas for a session with a given sessionId 
    - publish updates only to those replica nodes (how? Topics? Queues? Anything else?)
    - monitor that the number of replicas for a sessionId is N at any point
    - if one of the replica nodes dies, pick another one for a replica and propagate current state and further updates to it

If I look at this list I have a very, very strong feeling I'd re-implement HZ internal mechanisms currently used for distributed data structures like Maps, Lists, Queues, etc. The only difference is the format of updates and the action taken by the (replica) node, when it gets update notification. In HZ case this action changes the entry in a distributed data structure and has a detailed control at entry-level (TTL, etc). In my case, I'd need to delegate the action to a custom method, that rebuilds/updates the local Map on the replica node.  But other than that the mechanisms are the same, IMHO. So, I'm wondering what is the best way to reuse HZ mechanisms instead of re-implementing them? Is it possible somehow? If not, would it make sense in the future to extend HZ to enable such things? Actually, it sounds and looks very much like delta-aware data structures or something like that.

5) If I want to rebuild my own Maps on replica nodes upon receiving updates, I need to have listeners only on those nodes, which react only to updates related to their respective maps. What is the efficient way of doing it? If I use topics, then listeners and topics in HZ are cluster wide, as far as I understand. So, every node in the cluster will receive the update notification and then figure out if they are related to it or not. This is most likely too much overhead. I guess HZ internally sends e.g. distributed map updates directly and only to replica nodes (instead of all nodes), or? How I can do a similar thing? 

Or may be I should use a giant distributed map with keys as you described to distribute updates? Then replicas are picked automatically by HZ and they receive updates for entries. Then I need to have a listener for such update entries on each replica node (Can I actually have in HZ such listeners on replica nodes listening to changes in backup maps???). Such a listener would take the update entry, update the local copy of the session map replica accordingly and remove the update entry from a giant map, as it was already processed. But then I see the problem in removing such an entry. Which replica needs to do it? How do they know that others are done with it already?

Distributed queues are probably not a good match for communicating updates, or?

6) Eventually, I over-complicate the whole issue and there is a much easier way to implement the whole thing? I'd be very grateful for any hints!

Thanks,
  Leo

mongonix

unread,
Jan 12, 2012, 3:30:19 AM1/12/12
to haze...@googlegroups.com
Hi Fuad, Hi Talip,

It is a bit pity that discussion got stuck just as it got particularly interesting! ;-)  
Would be nice too see some replies from you and continue the discussion.

Thanks,
  Leo

Fuad Malikov

unread,
Jan 13, 2012, 6:41:56 AM1/13/12
to haze...@googlegroups.com
It really takes a lot of time on our end to digest the information and come up with something that doesn't suck:)

Honestly when I said yes Topic, I remembered others replicating everywhere. This was when there was many reads, less update and relatively small cluster. If it is not your case then it will not be that dumb simple. Probably there is no any public code, but as you guess to replicate everywhere is quite straight.
 
I understand that you are trying to replicate only to certain nodes with Topic. How do you think that you'll be able to fail over to the next right node(that has the backup data) with your load balancer?

-fuad




--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To view this discussion on the web visit https://groups.google.com/d/msg/hazelcast/-/FIquxdI7lZkJ.

mongonix

unread,
Jan 13, 2012, 8:59:02 AM1/13/12
to haze...@googlegroups.com
Hi Fuad,


On Friday, January 13, 2012 12:41:56 PM UTC+1, Fuad Malikov wrote:
It really takes a lot of time on our end to digest the information and come up with something that doesn't suck:)

I see. And I really appreciate your willingness to help!
 
Honestly when I said yes Topic, I remembered others replicating everywhere. This was when there was many reads, less update and relatively small cluster. If it is not your case then it will not be that dumb simple. Probably there is no any public code, but as you guess to replicate everywhere is quite straight.

 Yes. Replicating everywhere is easy.
 
I understand that you are trying to replicate only to certain nodes with Topic. How do you think that you'll be able to fail over to the next right node(that has the backup data) with your load balancer?

Let me try to explain.

As I indicated in the previous message, I'm not sure that HZ Topics in their current form can be used to achieve what I need.
My feeling is that HZ has almost all of the required failover mechanisms in place, e.g. it can decide which node is to be used for a given key (i.e. become the owner) and where to put replicas. It also takes care that the required number of replicas is maintained, if nodes come and go. And it does fail over, when something bad happens with the node. Of course, currently HZ does it only for its supported distributed data structures.

So, my idea is/was to reuse this as much as possible  (if it is possible). E.g. when we get a new request (with the sessionId contained in it), it may even land on any of our nodes. And then we create a task and use a distributed ExecutorService to execute it on the node that is owning the sessionId key. In case of a failover, it is supposed to be executed on a replica node that became the new owner.

The only big trouble that I see is that I cannot see how to reuse HZ replica selection and maintenance mechanisms for my purposes. I want my data structures (e.g. 1000000 maps) to be replicated. But as we discussed in the previous messages, it will not scale if the standard HZ replication is used due to a huge overhead. We discussed previously that eventually only update notifications can be communicated somehow to the replica nodes ("how" is still open!). The replica nodes would build/maintain local copies of the maps based on these update notifications. When the onwer node fails, each replica node contains a complete Map and can become a new master.

So, to summarize, the following is missing to understand how this can be done practically:

1) How to reuse HZ replica selection and failover node selection mechanisms that are used for current distributed data structures supported by HZ?

2) How to reuse HZ mechanisms used to send updates of a given distributed HZ structure (only!) to its replica nodes (in order to reduce the overhead of sending to everyone)? Can one somehow reuse it for her custom data-structure? If not, what are the technical problems?

3) If (2) would be possible, how can one define a handler/listener on the replica node, which would receive the data structure update notifications and apply its own logic for handling it?
    Let me explain: Updates for standard distributed HZ data structures are handled by the standard HZ handler/listeners. It adds/removes entries into/from its internal representation of e.g. distributed Maps. HZ controls each entry separately in a very flexible way (e.g. per entry TTL, eviction times, etc). But this leads also to a per entry overhead.
    What I'd like to achieve with custom listeners is the following: They handle update notifications (delivered by reusing usual HZ mechanisms) for my custom maps. When such an update is received, such a listener would lookup the corresponding local replica map on this replica node and then perform the update on it. With this approach, there is no per entry fine-grained control, like with standard HZ data structures. Instead we control at the whole data structure level, i.e. whole map. HZ knows nothing about entries of these local maps in this case - all that is handled by my custom update listeners.
    May be this approach also required some sort of registration/deregistration of a such custom distributed data structures at the local node, so that the nodes knows which kind of update notifications it needs to send/receive and which listeners to activate/deactivate.

As you can see from this description, this approach relies a lot on the hope, that it is possible to reuse HZ mechanisms related to replication and to customize it a bit, especially the listeners related part. If this is not possible and not planned in HZ, then almost all those mechanisms are to be re-implemented, which would be really pity...

Regards,
  Leo

Fuad Malikov

unread,
Jan 13, 2012, 9:50:10 AM1/13/12
to haze...@googlegroups.com
Well, even though internally Hazelcast does all the dynamic, backup thing, I don't think that you'll be able to reuse that part. It is very deeply buried inner and right now with 2.0 we are changing the way we backup entries. So relying there wouldn't be a wise move. If you'll use you should only use the goodies that are available on the public API.

It looks like there is no clean solution. You could use Topic per each instance. Topic name could be instance Address. And somehow you can define who is the backup of whom and according that plan every member would be listening to the topic that it is backup and etc.

While we communicate among the memers(like sending updates and etc) everything is really under the hood. And it doesn't use any listener type of mechanism. It just creates a binary packet. Puts all required information in and sends to the node through the TCP/IP layer. So again resing it wouldn't be possible.

It looks that the tools you have are Hazelcast.getCluster().getMembers() gives you the member list. Membership Listener to listen if any member crashed. And topics.

Honestly I really don't know what would be my final suggestion. Based on your domain you'll have to choose any of these approaches. And for sure all of them hos their downsides and upsides.

-fuad

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To view this discussion on the web visit https://groups.google.com/d/msg/hazelcast/-/DIGujHToobMJ.

mongonix

unread,
Jan 13, 2012, 11:36:39 AM1/13/12
to haze...@googlegroups.com
Hi Fuad,


On Friday, January 13, 2012 3:50:10 PM UTC+1, Fuad Malikov wrote:
Well, even though internally Hazelcast does all the dynamic, backup thing, I don't think that you'll be able to reuse that part. It is very deeply buried inner and right now with 2.0 we are changing the way we backup entries. So relying there wouldn't be a wise move. If you'll use you should only use the goodies that are available on the public API.

I see. It is really a pity it is not reusable in any form currently. But do you think it could be an idea to support some sort of re-usability in the future and support custom distributed data-structures?

After all, according to my understanding, the "only" thing required to support my proposal is the ability to extend/replace the reactions to update/backup notifications. And the ability to produce "custom" update notifications (binary packets with serialized updates,  whatever) when e.g. a value implements a certain special interface, e.g. "DeltaAware".

BTW, IIRC, this is roughly how Infinispan implements/supports custom DeltaAware data structures. You just implement such an interface which essentially handles receiving and producing update notifications and reacts on them. This is enough for Infinispan to plug in a custom DeltaAware data structure into their standard replication framework. But may be I'm wrong...
 
It looks like there is no clean solution. You could use Topic per each instance. Topic name could be instance Address. And somehow you can define who is the backup of whom and according that plan every member would be listening to the topic that it is backup and etc.

I see your point. This way I only need as many topics as the number of nodes in the cluster and not as many as session-maps.
 
While we communicate among the memers(like sending updates and etc) everything is really under the hood. And it doesn't use any listener type of mechanism. It just creates a binary packet. Puts all required information in and sends to the node through the TCP/IP layer. So again reusing it wouldn't be possible.

Just out of curiosity. I understand that "listener" is may be not the best name for it. But what do you do with the binary packets that you receive? You have some sort of a HZ logic that decides what to do when a packet is received (This is what I called "listener" or handler) But it is probably not exposed via a public API.
 
And BTW is there any architectural write up (besides the source code), that describes at high- or low-level how replication, failover, etc is implemented, how update notifications are sent, etc? It would be very interesting to better understand the implementation of these mechanisms in Hazelcast.

It looks that the tools you have are Hazelcast.getCluster().getMembers() gives you the member list. Membership Listener to listen if any member crashed. And topics.

Yes. Looks like it. I'll implement Hazelcast on top of Hazelcast ;-)

Honestly I really don't know what would be my final suggestion. Based on your domain you'll have to choose any of these approaches. And for sure all of them hos their downsides and upsides.

I'll think more about it and experiment. If I find any interesting solution, I'll let you know...

Thanks a lot for your answers! I really appreciate it.
  Leo

Fuad Malikov

unread,
Jan 13, 2012, 12:23:51 PM1/13/12
to haze...@googlegroups.com
On Fri, Jan 13, 2012 at 6:36 PM, mongonix <romi...@gmail.com> wrote:
Hi Fuad,


On Friday, January 13, 2012 3:50:10 PM UTC+1, Fuad Malikov wrote:
Well, even though internally Hazelcast does all the dynamic, backup thing, I don't think that you'll be able to reuse that part. It is very deeply buried inner and right now with 2.0 we are changing the way we backup entries. So relying there wouldn't be a wise move. If you'll use you should only use the goodies that are available on the public API.

I see. It is really a pity it is not reusable in any form currently. But do you think it could be an idea to support some sort of re-usability in the future and support custom distributed data-structures?
I don't think it is a good idea to expose anything internally. As more we expose it becomes harder to change anything later on. I think It was with you where we discussed about a year ago the Queue and Topic listeners. The way the entries are serialized and etc. Now with 2.0 we are implementing what we decided back than and this change brakes the backward compatibility. Of course we will keep adding good and useful staff to our API, but it should be simple. If someone can come up with something useful and contribute a patch or the community asks loudly for  that functionality we certainly consider them.
 

After all, according to my understanding, the "only" thing required to support my proposal is the ability to extend/replace the reactions to update/backup notifications. And the ability to produce "custom" update notifications (binary packets with serialized updates,  whatever) when e.g. a value implements a certain special interface, e.g. "DeltaAware".

BTW, IIRC, this is roughly how Infinispan implements/supports custom DeltaAware data structures. You just implement such an interface which essentially handles receiving and producing update notifications and reacts on them. This is enough for Infinispan to plug in a custom DeltaAware data structure into their standard replication framework. But may be I'm wrong...
 
I really don't know how Infinispan does it.
 
It looks like there is no clean solution. You could use Topic per each instance. Topic name could be instance Address. And somehow you can define who is the backup of whom and according that plan every member would be listening to the topic that it is backup and etc.

I see your point. This way I only need as many topics as the number of nodes in the cluster and not as many as session-maps.
Exactly
 
While we communicate among the memers(like sending updates and etc) everything is really under the hood. And it doesn't use any listener type of mechanism. It just creates a binary packet. Puts all required information in and sends to the node through the TCP/IP layer. So again reusing it wouldn't be possible.

Just out of curiosity. I understand that "listener" is may be not the best name for it. But what do you do with the binary packets that you receive? You have some sort of a HZ logic that decides what to do when a packet is received (This is what I called "listener" or handler) But it is probably not exposed via a public API.
Yep. Every packet has an operation within it and on every end there are operation handlers. And yes it is not exposed and it is not something that we consider to expose as we re-implement internals often. 
 
And BTW is there any architectural write up (besides the source code), that describes at high- or low-level how replication, failover, etc is implemented, how update notifications are sent, etc? It would be very interesting to better understand the implementation of these mechanisms in Hazelcast.

Not that many except http://hazelcast.com/docs/2.0/manual/single_html/#Internals. unfortunately some of the details there either changed or right now are changing and we need to update them. But in general it will give some idea which will be in general true. 

It looks that the tools you have are Hazelcast.getCluster().getMembers() gives you the member list. Membership Listener to listen if any member crashed. And topics.

Yes. Looks like it. I'll implement Hazelcast on top of Hazelcast ;-)

Yes and this is exactly what I don't like. It is becoming complex.
 

Honestly I really don't know what would be my final suggestion. Based on your domain you'll have to choose any of these approaches. And for sure all of them hos their downsides and upsides.

I'll think more about it and experiment. If I find any interesting solution, I'll let you know...

Yep, I hope you'll be able to come up with simpler solution. 

Thanks a lot for your answers! I really appreciate it.
  Leo

You are welcome,

-fuad

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To view this discussion on the web visit https://groups.google.com/d/msg/hazelcast/-/oQm1b8wOaFQJ.
Reply all
Reply to author
Forward
0 new messages