Process Migration at Network Layer

71 views
Skip to first unread message

Pankaj More

unread,
Feb 9, 2013, 11:31:57 PM2/9/13
to parallel...@googlegroups.com
Exploring the issue of live server upgrades in cloud-haskell, process migration (aka failover) would be a major issue. Even if we might be able to extend safecopy to do type versioning and migration in a distributed fashion, the problem is that ProcessId is serializable. So , the failover would only work if ProcessId is opaque enough that when other process send messages to the migrating process , the messages would reach the new node where the process has migrated. 

One possible solution to making the endpoints transparent to migration is by running a port mapper daemon and addressing a node by name rather than ip+port as suggested by Tim. It could be like a DNS service which maps the names of the endpoints to their physical locations(ip+port). Certainly such a port mapper daemon needs to support all possible transport backends. And it would be better to have it at the network layer rather than higher up.

During my discussion with Tim, he suggested that it might present itself as a transparent proxy. What we are not sure is whether doing that for all transports would be possible at the Network Layer?

Tim Watson

unread,
Feb 10, 2013, 6:07:18 AM2/10/13
to panka...@gmail.com, parallel...@googlegroups.com
On 10 Feb 2013, at 04:31, Pankaj More <panka...@gmail.com> wrote:

Exploring the issue of live server upgrades in cloud-haskell, process migration (aka failover) would be a major issue. Even if we might be able to extend safecopy to do type versioning and migration in a distributed fashion, the problem is that ProcessId is serializable. So , the failover would only work if ProcessId is opaque enough that when other process send messages to the migrating process , the messages would reach the new node where the process has migrated. 


Indeed the complexities involved made me think we should try to use hs-plugins to load a new set of definitions an replace the node's remote table with the newly loaded one, avoiding the need to migrate processes from one node to another and allowing us to fix up the process Id changes on the local node as their state is moved to the newly loaded function.

The problem with this solution is that if we ever move static support into ghc proper, then the remote table will be baked into the executable and we won't be able to change it at runtime.

One possible solution to making the endpoints transparent to migration is by running a port mapper daemon and addressing a node by name rather than ip+port as suggested by Tim. It could be like a DNS service which maps the names of the endpoints to their physical locations(ip+port). Certainly such a port mapper daemon needs to support all possible transport backends. And it would be better to have it at the network layer rather than higher up.


At least that's what I'm guessing. We're typically very reluctant to change network-transport at all, because of the complexity involved. But doing this at the CH layer with a port/node mapper daemon would be difficult otherwise I think.

During my discussion with Tim, he suggested that it might present itself as a transparent proxy. What we are not sure is whether doing that for all transports would be possible at the Network Layer?

Or if there is a better way. I'd prefer to update the running executable itself and I'd thought that is possible by loading a new remote table and a set of upgrade instructions via hs-plugins. Certainly today we can replace a remote table and migrate to a new set of definitions, though without safecopy we're unable to migrate between versions of data types. And support for making static values using hs-plugins eval has been demonstrated by Edsko and will be rolled into the next distributed-static release.

But the issue is that if static support makes it into ghc then this will not work, because the 'resolveStatic' function will only look at the data section (presumably) so if hs-plugins uses dlopen/LoadLibrary what will the effect be on static values?

--
You received this message because you are subscribed to the Google Groups "parallel-haskell" group.
To unsubscribe from this group and stop receiving emails from it, send an email to parallel-haske...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Alberto G. Corona

unread,
Feb 10, 2013, 10:06:42 AM2/10/13
to Tim Watson, panka...@gmail.com, parallel...@googlegroups.com
Why not to define in in a higher layer?. I think that this is an application service (at the monitoring level, sort to speak), not a network service. My proposal was as follows:

Once a process need to migrate (or if it is monitored and it has failed)  the monitor or the self-monitor creates a recovered copy of itself  (in another node) , The local process then just receive the pending calls and notify the senders a  failure + the new address. The calling node then execute a reconnection to this new address. If the process has died, the monitor create both processes: the local redirecting process one and the remote one. Once all pending connections or  timeout, the local process dies

Alternatively, instead of a redirecting process that notify about the movement, the monitoring service can track them, so the clients with failed connections could consult the monitor of the node, (that would be a sorth of higher level portmapper) but at the end, the number of re-connections necessary for connection recovery are the same of more, and the software would be more complex and failure prone, since everything would depend of the monitor service (or the portmapper).  

With the first option only a extended version of  connection error and connection recovery is necessary. (may be).


2013/2/10 Tim Watson <watson....@gmail.com>



--
Alberto.

Alexander Kjeldaas

unread,
Feb 10, 2013, 2:46:50 PM2/10/13
to agoc...@gmail.com, Tim Watson, panka...@gmail.com, parallel...@googlegroups.com
TL;DR - Process migration is not a relevant paradigm anymore.

Process migration itself is a slightly narrow focus.

Imagine a "large" set of data, 100 records, served by 4 copies of a process where each process holds 50 records, so that each record is duplicated in two places. 

Imagine the following problems:
1. Restarting a process.
2. Resizing this set of servers in response to increased load (to 5 or 6 servers).
3. Optimal routing of a single request to a given server.
    a) In the case where it is a read-only query
    b) In the case where it is a mutation of a record.

Doing 1 efficiently either involves using the 3 other processes to prime the new process, or to have distributed stable storage (but that might be no faster than shipping the in-memory data from the 3 other processes).

While the process restarts, clients should not in general go to a backup node, but the queries that normally would go to node 4 should instead be evenly distributed among the other 3.  In general, the routing of requests from N clients to a service implemented by M machines would change.

Doing 2 efficiently involves doing an optimal shuffle of the data on all servers, so that typically 10 records from each of the 4 servers are used to prime a new server.  Additionally, traffic might need to be duplicated to the new server for a while to prime caches.

Doing 3 efficiently means having the IP of the server that should be contacted be a function of the request itself.  For example if the records are sharded based on a unique key that is part of the query, then the high-level sharding information should be known to the client doing the request, or a proxy that sits with this knowledge should be used.  In case 3a) any of the two servers that hold the required data could be queried based on a round-robin scheme, a least-loaded scheme or some other application specific load balancing scheme.  In case 3b), a more rigorous scheme might be needed, such as always writing to the "primary server" among the two, and only writing to the backup once consensus that the primary server is down is achieved.

All of the above problems can be solved by getting consensus on 1) How things are sharded 2) Where things are located 3) What things are live.

Thus I think the basic service that is needed is distributed consensus.   I think this is a service that logically (although not implementation wise) sits right above the transport layer.

On top of distributed consensus it is possible to build stable names, and efficient request routing where all clients and all servers have the same view of how the service looks like and how it should be interacted with.

I think this is a better foundation than even the "linked process" model in Erlang, because the "linked process" model still fails in many scenarios, and distributed consensus solves a much larger set of problems.

Whether a process is live or dead can be solved by consensus, and it doesn't have split-brain issues, and can deal with the failure of multiple processes.  Stable name-lookup, that is service name to set of IPs, and a representation for how queries should be routed can be dealt with by distributed consensus.

How I would like to program distributed processes is to separate the state into classes based on availability, consistency, cost, and other factors.  State that need to be highly available and consistent, such as which servers are alive, and where my data resides, would be shared by distributed consensus through paxos.  

State that need to be consistent could be sharded with replicas, but with a stable access pattern (first go to primary, then secondary, etc).  Then there would be mostly-consistent data that would be spread around in caches etc.

So like in functional programming, I think it is important to look at the data, not the processes.   Nail consistency, load-balancing, sharding etc. of classes of data, and the processes become somewhat invisible.


Alexander

Tim Watson

unread,
Feb 10, 2013, 7:42:37 PM2/10/13
to Alexander Kjeldaas, agoc...@gmail.com, panka...@gmail.com, parallel...@googlegroups.com
Alexander,

I really don't see how this solves the problem at hand. Let's boil it down to the simplest example.

I have code running in two nodes, node1 and node2. There are only three ways that processes in node1 can communicate with processes in node2:

1. sending a message to a ProcessId
2. sending a message to a SendPort
3. sending a message to a named process with `nsend :: String -> NodeId -> Message -> Process ()`

I have made some bug fixes to my code, and I want to deploy them. I shut down node2 and upgrade, then bring it back online. All instances of ProcessId and SendPort in node1 are now invalid. How exactly does Paxos help me solve this problem? It seems to me that you're suggesting I should use a different, higher level protocol to solve the problem of inter node communication. Is that right?

I'm not talking about finding a specific bit of data in a cluster of nodes. The ProcessId/SendPort is the public interface processes use to send data to one another. If this interface is being used, and we want to change the code that is running on one of the nodes, how do we continue to address the process from the other nodes? Are you suggesting that quorum algorithms are the solution to this? I can't see how. Group services, yes, and these can of course be built on consensus protocols, though there are other ways too.

It seems to me that we're talking about introducing architectural complexity that belongs in the application space - i.e., is appropriate for some applications, but not all - but not solving the fundamental problem. Using a consensus/quorum algorithm seems like overkill: why not just communicate over HTTP and use a load balancer, then take the backend nodes offline one at a time for upgrade? Both amount to the same answer: use a different protocol for inter-node communication. That's not what we're trying to achieve and it's a pretty broad stroke.

We *do* have open bugs for looking at distributed algorithms including...

1. group services (process groups, global naming, routing, cluster management, etc)
2. atomic broadcast (think zab, rabbitmq's gm and similar)
3. distributed transactions/locking (we already have Jeff's distributed-process-global, we'd like a Paxos-esque set of consensus protocols too)

I'm not looking at any of these at the moment, as there are many far more mundane but equally important tickets that require my attention. If you feel like adding one or more of them to distributed-process-platform though, we'll *very* gladly take a pull request! ;)

> I think this is a better foundation than even the "linked process" model in Erlang, because the "linked process" model still fails in many scenarios, and distributed consensus solves a much larger set of problems.

These are different tools for different problems, operating at a completely different architectural level of detail. Paxos fails in a number of scenarios too, which is why we are interested in leaderless protocols as well as quorum protocols that depend on distributed leader election (which can deadlock).

> Imagine the following problems:
> 1. Restarting a process.

[snip]
> While the process restarts, clients should not in general go to a backup node, but the queries that normally would go to node 4 should instead be evenly distributed among the other 3.


You're assuming a homogeneous cluster. That's fine - and in general, distributed systems *are* much easier to design if 'all nodes are equal' as it were - but it's an assumption that a low level framework cannot make. Besides, even in a homogeneous cluster, what happens when nodes 1-3 have references to ProcessId/SendPort for node 4 which has been restarted!? You've not solved that problem at all, because you're just looking at it from the 'external client' point of view. On the other hand 'internal clients' (i.e., the other nodes) still need to communicate with one another, and can no longer do so using ProcessId/SendPort if the node has been restarted. Using `nsend` *does* alleviate this ill to some extent, but it's not a panacea.

> All of the above problems can be solved by getting consensus on 1) How things are sharded 2) Where things are located 3) What things are live.
>
> Thus I think the basic service that is needed is distributed consensus.
[snip]
> On top of distributed consensus it is possible to build stable names, and efficient request routing where all clients and all servers have the same view of how the service looks like and how it should be interacted with.

That still doesn't solve the problem outlined above; Unless all nodes interact with one another using the consensus protocol and never use ProcessId/SendPort to communicate with one another!

> Whether a process is live or dead can be solved by consensus, and it doesn't have split-brain issues, and can deal with the failure of multiple processes. Stable name-lookup, that is service name to set of IPs, and a representation for how queries should be routed can be dealt with by distributed consensus.

Right. So this *is* a useful thing to have if you're happy to program the cluster at this level and not to use low level primitives like ProcessId/SendPort. That's fine and as I say, we'd be delighted to offer an API at this level of abstraction on top of the existing infrastructure.

> State that need to be consistent could be sharded with replicas, but with a stable access pattern (first go to primary, then secondary, etc). Then there would be mostly-consistent data that would be spread around in caches etc.

Right. But again, you're assuming that the ProcessId/SendPort is not part of that data and that all inter-node communication is done via group services. Let me tell you now, that no matter how tight your implementation - and e.g., `fast-paxos' is *very* quick - there is significant overhead in making all internode communication go this route compared to sending direct to the target process. Sometimes you actually want to just send a message to a process, without going around the houses.

> TL;DR - Process migration is not a relevant paradigm anymore.

I didn't realise it was considered a paradigm. Erlang doesn't *do* process migration in order to change the code at runtime. You just give the code server the new compiled modules at runtime and the recursive tail call for the servers enters the new version of `loop(State) -> ....' and it's job done. Well, that's the *short* version - it's a bit more involved in practise of course. It's also a goo bit more involved doing this in Haskell, but we're exploring whether or not such a facility might be useful and how we might implement it. The idea of using 'process migration' was one of several suggestions about how to solve the problem that you might want to replace version 1 of the *code* with version 2 without suffering downtime and in a manner transparent to the other nodes which are sending you messages.

Like I said, I'm not going to be working on anything like this for *ages* as there are far too many other things to do (i.e., optimisations to message passing, tracing, supervision trees, etc) so if you know what you want this to look like and are willing to contribute, that would be fantastic!

Cheers,
Tim

On 10 Feb 2013, at 19:46, Alexander Kjeldaas wrote:

> TL;DR - Process migration is not a relevant paradigm anymore.
>
> Process migration itself is a slightly narrow focus.
>
> Imagine a "large" set of data, 100 records, served by 4 copies of a process where each process holds 50 records, so that each record is duplicated in two places.
>
> Imagine the following problems:
> 1. Restarting a process.
> 2. Resizing this set of servers in response to increased load (to 5 or 6 servers).
> 3. Optimal routing of a single request to a given server.
> a) In the case where it is a read-only query
> b) In the case where it is a mutation of a record.
>
> Doing 1 efficiently either involves using the 3 other processes to prime the new process, or to have distributed stable storage (but that might be no faster than shipping the in-memory data from the 3 other processes).
>
> While the process restarts, clients should not in general go to a backup node, but the queries that normally would go to node 4 should instead be evenly distributed among the other 3. In general, the routing of requests from N clients to a service implemented by M machines would change.
>
> Doing 2 efficiently involves doing an optimal shuffle of the data on all servers, so that typically 10 records from each of the 4 servers are used to prime a new server. Additionally, traffic might need to be duplicated to the new server for a while to prime caches.
>
> Doing 3 efficiently means having the IP of the server that should be contacted be a function of the request itself. For example if the records are sharded based on a unique key that is part of the query, then the high-level sharding information should be known to the client doing the request, or a proxy that sits with this knowledge should be used. In case 3a) any of the two servers that hold the required data could be queried based on a round-robin scheme, a least-loaded scheme or some other application specific load balancing scheme. In case 3b), a more rigorous scheme might be needed, such as always writing to the "primary server" among the two, and only writing to the backup once consensus that the primary server is down is achieved.
>
> All of the above problems can be solved by getting consensus on 1) How things are sharded 2) Where things are located 3) What things are live.
>
> Thus I think the basic service that is needed is distributed consensus. I think this is a service that logically (although not implementation wise) sits right above the transport layer.
>
> On top of distributed consensus it is possible to build stable names, and efficient request routing where all clients and all servers have the same view of how the service looks like and how it should be interacted with.
>
>

Alexander Kjeldaas

unread,
Feb 11, 2013, 4:20:48 AM2/11/13
to Tim Watson, agocorona, panka...@gmail.com, parallel...@googlegroups.com
On Mon, Feb 11, 2013 at 1:42 AM, Tim Watson <watson....@gmail.com> wrote:

> Whether a process is live or dead can be solved by consensus, and it doesn't have split-brain issues, and can deal with the failure of multiple processes.  Stable name-lookup, that is service name to set of IPs, and a representation for how queries should be routed can be dealt with by distributed consensus.

Right. So this *is* a useful thing to have if you're happy to program the cluster at this level and not to use low level primitives like ProcessId/SendPort. That's fine and as I say, we'd be delighted to offer an API at this level of abstraction on top of the existing infrastructure.
 
> State that need to be consistent could be sharded with replicas, but with a stable access pattern (first go to primary, then secondary, etc).  Then there would be mostly-consistent data that would be spread around in caches etc.

Right. But again, you're assuming that the ProcessId/SendPort is not part of that data and that all inter-node communication is done via group services.

Yes, the mapping from a global name to a ProcessId/SendPortId is part of the consistent data set.
 
Let me tell you now, that no matter how tight your implementation - and e.g., `fast-paxos' is *very* quick - there is significant overhead in making all internode communication go this route compared to sending direct to the target process. Sometimes you actually want to just send a message to a process, without going around the houses.


Paxos is only involved when there is a change to the consistent data set, i.e. when a node goes down and there needs to be a decision on whether the node is down or not, and thus a change to the mapping from a global name to a ProcessId/SendPort.  There is no overhead in the fast path.

 
> TL;DR - Process migration is not a relevant paradigm anymore.

I didn't realise it was considered a paradigm.

I mean the idea that a new process is started from some copy of the state in the old process.  I do not think that is a good technique.

In the examples I gave, the state required to start the new process is distributed across many other processes, thus there is not one place to go to get it.

 
Erlang doesn't *do* process migration in order to change the code at runtime. You just give the code server the new compiled modules at runtime and the recursive tail call for the servers enters the new version of `loop(State) -> ....' and it's job done. Well, that's the *short* version - it's a bit more involved in practise of course. It's also a goo bit more involved doing this in Haskell, but we're exploring whether or not such a facility might be useful and how we might implement it. The idea of using 'process migration' was one of several suggestions about how to solve the problem that you might want to replace version 1 of the *code* with version 2 without suffering downtime and in a manner transparent to the other nodes which are sending you messages.


Ok, let me moderate myself.  If there is a way to completely replace the process and take over the TCP connections in an atomic manner, invisible to clients, then that is a useful thing to have.  But I think this is an optimization that focuses on just one operational issue, fast upgrades.  Other operational issues like crash recovery or re-sharding are not handled.  However if the steps are:

1. Start a new process with new code.
2. Let new process join process group (group of servers responsible for a set of data) with responsibility for a data subset (paxos, sharding).
3. Migrate/copy/prime data.
4. Release old process from process group.

Then slight modifications cover crash-recovery, re-sharding, hardware migration, replication, upgrades, rollbacks, and possibly other scenarios.  The key building block is distributed consensus on the process group, mapping from logical name to host:port, what function to use for sharding etc.  Thus the distributed consensus requirement is at a very low level in the architecture.

If we look at the cluster level, before any workload is started, distributed consensus should be available.  Either start three nodes to run Paxos or get a Zookeeper installation up and running or something like that.  Then for all other processes, assume distributed consensus is available and build the rest on top of that.

I think for example adding Zookeeper as a low-level requirement to getting higher-level functionality would be okay for now, since the distributed consensus abstraction should be what the above functionality is built on, and zookeeper is one implementation of that.

None of the consistent data should impact the fast path, except that there needs to be a way to signal that change has happened.  But that is required in any case.

Alexander

Alexander Kjeldaas

unread,
Feb 11, 2013, 5:53:55 AM2/11/13
to Tim Watson, Alberto Gómez Corona, panka...@gmail.com, parallel...@googlegroups.com
On Mon, Feb 11, 2013 at 1:42 AM, Tim Watson <watson....@gmail.com> wrote:
Alexander,

I really don't see how this solves the problem at hand. Let's boil it down to the simplest example.

I have code running in two nodes, node1 and node2. There are only three ways that processes in node1 can communicate with processes in node2:

1. sending a message to a ProcessId
2. sending a message to a SendPort
3. sending a message to a named process with `nsend :: String -> NodeId -> Message -> Process ()`

I have made some bug fixes to my code, and I want to deploy them. I shut down node2 and upgrade, then bring it back online. All instances of ProcessId and SendPort in node1 are now invalid. How exactly does Paxos help me solve this problem? It seems to me that you're suggesting I should use a different, higher level protocol to solve the problem of inter node communication. Is that right?
 
 
I'm not talking about finding a specific bit of data in a cluster of nodes. The ProcessId/SendPort is the public interface processes use to send data to one another. If this interface is being used, and we want to change the code that is running on one of the nodes, how do we continue to address the process from the other nodes?

I think that in all cases I have come across, there is always some piece of data you want to either reach or operate on when sending a message to node2. Like a primary key in a table, a session id, a url, or something like that.

There are specific exceptions like ping/pong/alive? messages, but they typically somehow deal with the cluster state.  Is this node alive?  What is the load? Give me some statistics etc.  In these cases we can still imagine that the NodeId itself is the key to the data.

Given the above, I am suggesting that there should be a mapping layer that takes care of your upgrade scenario.  It goes like this:

focusKey :: Key a => SomeFocusData a -> a -> Shard a

This function concentrates the keyspace down to the shard space.  This is done to have distributed consistency on less data.  It also covers strategies such as distributed hash table, HBase (shard by string key) and simple hashing.

Then the mapping from Shard to ProcessId is N:M.

mapShard :: RequestStrategy b => SomeShardData a -> Shard a -> ([(ProcessId, ShardState)], b)

The set of ProcessIds are the set of processes that are relevant to contact regarding Shard a.  The ShardState sais whether that particular Shard is in the process of shutting down, restarting, coming up, etc.  RequestStrategy is various other load-balancing information or consistency information, such as whether access should be round-robin, least-loaded, whether the ProcessIds should be contacted in-order (primary, secondary, ...).  It could be anything and which ProcessId to contact would be a function of the type of request (read-only or mutation, how up to date the data must be etc) and this data.

"SomeFocusData a" and "SomeShardData a" is known to all clients through distributed consensus, and can change. 

Optionally, SomeFocusData a is fixed for the application since it typically does not change much, as sharding strategies are pretty much set in stone for a given application.  SomeShardData a will change from time to time.

To model your example, we assume that Node2 only holds one piece of data, so the Key only has one value, and there is only one shard.  focusKey basically does nothing.

When you shut down node2, the link from node2 to the distributed consensus system (say Zookeeper) will be broken which leads to a new version of "SomeShareData a" to be distributed to clients.  Alternatively, node1, when its TCP connection is broken will get distributed consensus on a new "SomeShardData a" value that indicates that node2 is down.

Anyways, the result is that "SomeShardData a" should change its distributed consensus value, all clients of "Key a" will get new "SomeShardData a", and they will see that mapShard returns the empty list.

In this system, processes are not migrated, data is.  A new process could for example look at the shard mapping and decide to replicate the shards that have the currently weakest replication.  In such a strategy you could just add nodes and they would figure out what to do themselves, even in your upgrade scenario.

The upgrade scenario could be further supported by a RequestStrategy that prefers to send requests to the ProcessId with the newest build.  Then, in a cloud deployment you could simply start a new copy of node2, and clients would automatically use the new copy based on the RequestStrategy.  No need to take down the old node2, and if the new node2 doesn't work, just kill it and the old node2 will take over.  No downtime.

Another RequestStrategy would send *your* personal requests to the new node2, but other production requests to the old node2.  This would give you the option of testing your new code in a complex production setup without affecting customer requests.  It would give you the option of running regression tests on partially live data.

The actual request strategy could be extended to be a function of both the Shard *and* the Key, in which case it is possible to move a shard gradually from one node to another node.  Basically, a given Shard is split between new and old based on key-space percentage.

There are lots and lots of complex operational issues that are outside the scope of a simple upgrade.

Alexander

Tim Watson

unread,
Feb 11, 2013, 6:07:21 AM2/11/13
to Alexander Kjeldaas, agocorona, panka...@gmail.com, parallel...@googlegroups.com
Hi Alexander,

On 11 Feb 2013, at 09:20, Alexander Kjeldaas wrote:
> >> State that need to be consistent could be sharded with replicas, but with a stable access pattern (first go to primary, then secondary, etc). Then there would be mostly-consistent data that would be spread around in caches etc.
>
> > Right. But again, you're assuming that the ProcessId/SendPort is not part of that data and that all inter-node communication is done via group services.
>
> Yes, the mapping from a global name to a ProcessId/SendPortId is part of the consistent data set.
>

Yes I understood that part.

> Let me tell you now, that no matter how tight your implementation - and e.g., `fast-paxos' is *very* quick - there is significant overhead in making all internode communication go this route compared to sending direct to the target process. Sometimes you actually want to just send a message to a process, without going around the houses.
>
> Paxos is only involved when there is a change to the consistent data set, i.e. when a node goes down and there needs to be a decision on whether the node is down or not, and thus a change to the mapping from a global name to a ProcessId/SendPort. There is no overhead in the fast path.
>

Sure that's fine, but what it means is that anyone wanting to use the mechanism has to use Paxos, which *can* fail in some circumstances. I'm not opposed to providing a consensus layer in distributed-process-platform - in fact I think it's a wonderful idea. But I think you should be able to choose the quorum algorithm that fits your needs - I'm a fan of Paxos, but it's not the only game in town. I'm also of the opinion that most paxos implementations - and I've worked on one written in Erlang, so I do have some background in this - are horribly slow and very difficult to get right in the edge cases. Fast-paxos, which sits over IP multicast, is a particularly neat variant that performs very nicely in the majority of cases. Unfortunately this is completely decoupled from the network-transport layer that Cloud Haskell uses.

>
> Ok, let me moderate myself. If there is a way to completely replace the process and take over the TCP connections in an atomic manner, invisible to clients, then that is a useful thing to have.

Exactly. In fact, what I've been trying to say in previous threads is that if there is a way to have a running process call some *new* function with its current state, then you don't even need to replace the TCP connections - or other kinds of connection, remember that Cloud Haskell is designed to run over multiple network-transport backends, so whatever schemes we come up with would need to work for an HPC cluster using CCI or local unix domain sockets, or whatever, not just work with TCP.

If we solved the 'merging/handling different sets of static values loaded at runtime' problem, what we're talking about is a very low level primitive, that replaces the code that a single thread is using at runtime. No addressing problems occur, because we've upgraded our code without worrying about whether or not we're even running a distributed application. You could use that to upgrade, for example, a solitary node that is not even in a cluster.

> But I think this is an optimization that focuses on just one operational issue, fast upgrades.

Yes that's true, it is a narrow focus. I think that's partly because we've been thinking about this in terms of what Erlang does and whether or not it makes sense for Cloud Haskell. From the OPs point of view, he's looking at masters thesis topics, so something innovative but narrow, like solving the 'hot/live upgrade problem' is quite compelling.

> Other operational issues like crash recovery or re-sharding are not handled. However if the steps are:
>
> 1. Start a new process with new code.
> 2. Let new process join process group (group of servers responsible for a set of data) with responsibility for a data subset (paxos, sharding).
> 3. Migrate/copy/prime data.
> 4. Release old process from process group.
>
> Then slight modifications cover crash-recovery, re-sharding, hardware migration, replication, upgrades, rollbacks, and possibly other scenarios.

Yes, process groups *do* solve some of these problems. Now, as you intimate below, consensus is just a mechanism that the process group uses to provide 'group services' to the cluster members. In fact, we've had open tickets to work on process groups and consensus protocols for some time, e.g.,

https://cloud-haskell.atlassian.net/browse/DPP-3
https://cloud-haskell.atlassian.net/browse/DPP-47

All that we're lacking is a spare pair of hands!

> If we look at the cluster level, before any workload is started, distributed consensus should be available. Either start three nodes to run Paxos or get a Zookeeper installation up and running or something like that. Then for all other processes, assume distributed consensus is available and build the rest on top of that.
>

In theory you simply send all messages to a named process via the process group. Then if an upgrade (or some other cluster wide operation) is taking place, the process group blocks the sender (or buffers the message) until the global transaction completes. There is still the need to acquire a read lock on the local routing tables for each send, which *does* mean that the 'happy/fast path' is still slower than an ordinary send. That's a trade off that users can choose to make though.

> I think for example adding Zookeeper as a low-level requirement to getting higher-level functionality would be okay for now, since the distributed consensus abstraction should be what the above functionality is built on, and zookeeper is one implementation of that.
>

Are we suggesting to use a different transport layer technology to handle the consensus protocol? I would prefer to build process groups without throwing the network-transport capabilities out with the bath water. And to do that, you've *got* to solve the lower level issue of addressing *without* distributed consensus, because you haven't built it yet!

You want to build a consensus protocol using Cloud Haskell primitives, and you've got to address the peers in your process group using their ProcessId. Now the problem of how to change that process without downtime becomes pressing again. Either you say this isn't possible and that upgrades rely completely on communication going via the process group only, or you address the OPs question (and my comments) about whether proxy-ing or hot code loading is useful etc. This is a matter of opinion. Erlang's opinion is that live upgrades are such a pressing need, that they need to be implemented at a very low level in the architecture, so that for example the code which is performing paxos (the proposers and learners) can itself be upgraded at runtime! The philosophy that you're describing is that upgrades rely on paxos, which is quite a different beast. I'm not suggesting one or the other is right, just pointing out the different perspectives here.

I have *no objections* to implementing consensus - as you can see from Jira, we've had these issues on our minds for many months now, but there is a high volume of other (less exciting) work that needs to be done first. And it is *much* easier to say 'upgrades (and other nice features) are only available if you build on top of process groups'.

> None of the consistent data should impact the fast path, except that there needs to be a way to signal that change has happened. But that is required in any case.
>

For the narrow focus of 'changing the code that a thread is running' it's only necessary if you use 'process migration' instead of a hot code loading/changing. But yes, I take your point about the 'fast path' when using a quorum algorithm to handle the routing tables in a cluster. As I mentioned earlier though, there *is* still the cost of a local read on the routing tables, which means either

(a) all sends go via a single thread/process which performs the actual routing
(b) all sends require a lookup in the calling thread/process that uses STM to read the routing table

So the happy path is not cost free. Not unless you're putting this routing information right inside the guts of the node controller, and that would mean implementing consensus as part of the distribution protocol, which I think is the wrong thing to do. That's too big an architectural decision to force on every user, when only some users care about it.

Cheers,
Tim

Tim Watson

unread,
Feb 11, 2013, 6:38:03 AM2/11/13
to Alexander Kjeldaas, Alberto Gómez Corona, panka...@gmail.com, parallel...@googlegroups.com
Hi Alexander

On 11 Feb 2013, at 10:53, Alexander Kjeldaas wrote:
>
> I'm not talking about finding a specific bit of data in a cluster of nodes. The ProcessId/SendPort is the public interface processes use to send data to one another. If this interface is being used, and we want to change the code that is running on one of the nodes, how do we continue to address the process from the other nodes?
>
> I think that in all cases I have come across, there is always some piece of data you want to either reach or operate on when sending a message to node2. Like a primary key in a table, a session id, a url, or something like that.
>
> There are specific exceptions like ping/pong/alive? messages, but they typically somehow deal with the cluster state. Is this node alive? What is the load? Give me some statistics etc. In these cases we can still imagine that the NodeId itself is the key to the data.
>

This is a very narrow view. You can use Cloud Haskell to build anything, so we have no idea what the process states are or whether message passing is query/call style or cast, what kinds of response we're after, etc. We have no idea if the data even has a total ordering, let alone whether sharding makes sense.

> I think that in all cases I have come across, there is always some piece of data you want to either reach or operate on when sending a message to node2.

How does that not violate encapsulation? Doesn't that mean I need to know about a process' internal state in order to communicate with it? What if the process state it not serializable nor totally ordered?

> In this system, processes are not migrated, data is.

This goes to my previous point and the first one too. We don't *know* anything about the code that people will write with Cloud Haskell, so how can we know whether migrating data makes any sense whatsoever for someone else's application!?

Having said that, as a generally useful mechanism, what you're proposing sounds great and I'm sure we'll find a myriad of uses for it.

> The actual request strategy could be extended to be a function of both the Shard *and* the Key, in which case it is possible to move a shard gradually from one node to another node. Basically, a given Shard is split between new and old based on key-space percentage.
>

I've seen some pretty cool applications of this in production systems - kafka uses a similar strategy.

> There are lots and lots of complex operational issues that are outside the scope of a simple upgrade.
>

Sure I'm fully aware of that. RabbitMQ doesn't use Erlang's hot-code-loading mechanism because doing so imposes too great a burden on the team to maintain upgrade functions for every possible change set. When our users want to upgrade major versions of a cluster, they have to take the whole cluster down to do the upgrade. I'd be fascinated to see how this proposed 'sharding' approach could be applied to a production system like Rabbit. We'd have to deal with all the queue processes, the vhost, queue, exchange and binding data, persistent message stores and in-memory queue index, all the mirrored/ha queues would need to handle this transparently with regards the rest of their multicast groups and of course the *data* has to be maintained in FIFO (insertion) order and there is no concept of sharding the data nor is there any ordering possible on the data - the AMQP messages are just opaque binary blobs and the only ordered key is the server generated 'message sequence id' and of course you couldn't 'shard' this without making the entire cluster handle each write to every queue as a single transaction. And I guess I don't need to point out why that's a bad idea.

Cheers
Tim


Alexander Kjeldaas

unread,
Feb 11, 2013, 6:55:28 AM2/11/13
to Tim Watson, agocorona, panka...@gmail.com, parallel...@googlegroups.com
Only temporarily while there is no Haskell implementation.  I imagine creating a pretty narrow API that can be backed by two implementations.  First by replicated Zookeeper, and secondly by a singleton Cloud Haskell server (I think there is a "name server" in a CH example somewhere that does exactly/mostly this).  That gives a production-ready version (replicated Zookeeper), and a pure Cloud Haskell one.  The pure CH one will not be production ready because it isn't replicated.
 
I would prefer to build process groups without throwing the network-transport capabilities out with the bath water. And to do that, you've *got* to solve the lower level issue of addressing *without* distributed consensus, because you haven't built it yet!


When a process starts, it only needs one "low-level" address to one or a few nodes giving the consensus service.  For the servers that are in the quorum, these low-level addresses should survive a restart, but that doesn't have to be harder than have a configuration file or let it be a command line parameter.

Zookeeper can be upgraded in a rolling upgrade.  The addresses of the other servers are in a configuration file.

Alexander

Alberto G. Corona

unread,
Feb 11, 2013, 8:10:33 AM2/11/13
to Alexander Kjeldaas, Tim Watson, panka...@gmail.com, parallel...@googlegroups.com
I see these protocols as too tightly coupled and centralized. I prefer a more decoupled model based on a market, because that is the paradigm that support flexibility and growth, maintaining the intelligence in each of the nodes. 

In the example of sharding.. ¿Why each shard must be of the same size?. A partition in the database could receive much more queries. It is the node that receive the request the one that has to decide either split in two or not, and how.  it can duplicate itself or partition the data in two shards.  For reconnection problems, it is better not to reconnect at all, but to forward requests. The node that has splitted, forward the request to the new node in the way that I mentioned above. A process that do nor receive requests could die (or can fail). Upon the next request for his data, the originator process can detect the condition and re-spawn it. 

That way, by sucessive automatic splits, we obtain a distributed fault tolerant database starting from a single node. Each node has the same software and intelligence, and it is application dependent, programmed by the application programmer, but based on some basic primitives for monitoring error detection and load management,  state migration, execution etc.


2013/2/11 Alexander Kjeldaas <alexander...@gmail.com>



--
Alberto.

Alberto G. Corona

unread,
Feb 11, 2013, 8:27:36 AM2/11/13
to Alexander Kjeldaas, Tim Watson, panka...@gmail.com, parallel...@googlegroups.com
Forwarding and reconnection can be complementary and use them alternatively.

For example, this scenario:

n1 ask n2 which support too much load, so it forward it to n3, that n2 has created to do a part of his job.

In the response n1 verify that n3 is the node that answered the request.
Upon that, n1 can choose either continuing asking  to n2 or to connect with n3 so it can ask n3 directly

In the first case, nothing to say. In the second case all goes well until n3 fail,  for example, because n2 no longer has too much load an has finished n3 or because n3 has failed. Then n1 can ask again to n2 as before, and n2 can answer the request doing whathever necessary.

An intelligent invocation primitive above the transport layer can do these forwarding-reconnections, backtraking transparently for the programmer, so  n1 simply ask to "the cloud"


2013/2/11 Alberto G. Corona <agoc...@gmail.com>



--
Alberto.

Alexander Kjeldaas

unread,
Feb 11, 2013, 8:33:03 AM2/11/13
to Tim Watson, Alberto Gómez Corona, panka...@gmail.com, parallel...@googlegroups.com
On Mon, Feb 11, 2013 at 12:38 PM, Tim Watson <watson....@gmail.com> wrote:
Hi Alexander

On 11 Feb 2013, at 10:53, Alexander Kjeldaas wrote:
>
> I'm not talking about finding a specific bit of data in a cluster of nodes. The ProcessId/SendPort is the public interface processes use to send data to one another. If this interface is being used, and we want to change the code that is running on one of the nodes, how do we continue to address the process from the other nodes?
>
> I think that in all cases I have come across, there is always some piece of data you want to either reach or operate on when sending a message to node2. Like a primary key in a table, a session id, a url, or something like that.
>
> There are specific exceptions like ping/pong/alive? messages, but they typically somehow deal with the cluster state.  Is this node alive?  What is the load? Give me some statistics etc.  In these cases we can still imagine that the NodeId itself is the key to the data.
>

This is a very narrow view. You can use Cloud Haskell to build anything, so we have no idea what the process states are or whether message passing is query/call style or cast, what kinds of response we're after, etc. We have no idea if the data even has a total ordering, let alone whether sharding makes sense.


Even for data that does not have a total ordering, graph data etc, there needs to be a way to serialize and "linearize" the data.  Think of it as creating a database for the data.  There needs to be a way to map it to a simpler form in order to get it down to disk or into memory.
 
> I think that in all cases I have come across, there is always some piece of data you want to either reach or operate on when sending a message to node2.

How does that not violate encapsulation? Doesn't that mean I need to know about a process' internal state in order to communicate with it? What if the process state it not serializable nor totally ordered?


It does not violate encapsulation more than exposing ProcessIds or even channels.  If there is no other identifier, then the ProcessId *is* the identifier for the data.  If you make no assumptions you cannot know which channel or ProcessId to communicate with. What I am saying is that this is typically a low-level identifier, and there are more natural identifiers that are associated with the data itself.  That identifier can then be sharded.
 
> In this system, processes are not migrated, data is.

This goes to my previous point and the first one too. We don't *know* anything about the code that people will write with Cloud Haskell, so how can we know whether migrating data makes any sense whatsoever for someone else's application!?


There is always data.  Unless there isn't :-).  There is also for "pure processes" which are just a way to wrap some other resource such as CPU or network reachability (a way to get out of a private cloud).  So you send a piece of data to be worked on without querying nor updating any state.  All you are interested in is the resource i.e. executing the function.  This is the case in your 'node1, node2' example.  You specified nothing about what node1 wanted from node2.  This can be modeled by having a key and shard space that contains a single item, and the CPU resources are handled by "mapShard".

 
Having said that, as a generally useful mechanism, what you're proposing sounds great and I'm sure we'll find a myriad of uses for it.

> The actual request strategy could be extended to be a function of both the Shard *and* the Key, in which case it is possible to move a shard gradually from one node to another node.  Basically, a given Shard is split between new and old based on key-space percentage.
>

I've seen some pretty cool applications of this in production systems - kafka uses a similar strategy.

> There are lots and lots of complex operational issues that are outside the scope of a simple upgrade.
>

Sure I'm fully aware of that. RabbitMQ doesn't use Erlang's hot-code-loading mechanism because doing so imposes too great a burden on the team to maintain upgrade functions for every possible change set. When our users want to upgrade major versions of a cluster, they have to take the whole cluster down to do the upgrade. I'd be fascinated to see how this proposed 'sharding' approach could be applied to a production system like Rabbit.

In the RabbitMQ system, each queue would probably be mapped to a separate shard, as long as the number of queues are reasonable.  If there are millions of queues, then some compression of the key space would be needed.

According to:

RabbitMQ does not support sharding of a single queue, so the Shard=Queue design is probably correct in this case.

What happens in RabbitMQ if there are billions of queues and they should all be mirrored?  Will the synchroniztion traffic for the directory kill the system?  In what I describe this is handled by handling multiple queues in a given shard.  Think of RabbitMQ implemented as an API on top of Bigtable.

Scaling a single queue across multiple Shards is also possible by only exposing two shards per queue, the producer shard and the consumer shard, and a fallthrough "focus" function since we're mapping keys and shards 1:1.

data QueueKey = PushAt String | ConsumeAt String
data QueueShard = PushAtShard String | ConsumeAtShard String

So we have consensus on where data for a given queue name should be pushed and where they should be consumed.  For internal consistency, assuming a data item gets some sort of mostly monotonic ticket id, another name-space which deals with which ticket-number lies at the boundary of a given internal storage shard.  What I am describing here as the internal storage is basically bigtable-like.

 
We'd have to deal with all the queue processes, the vhost, queue, exchange and binding data, persistent message stores and in-memory queue index, all the mirrored/ha queues would need to handle this transparently with regards the rest of their multicast groups and of course the *data* has to be maintained in FIFO (insertion) order and there is no concept of sharding the data nor is there any ordering possible on the data - the AMQP messages are just opaque binary blobs and the only ordered key is the server generated 'message sequence id' and of course you couldn't 'shard' this without making the entire cluster handle each write to every queue as a single transaction. And I guess I don't need to point out why that's a bad idea.


I'm far from being familiar with RabbitMQ, but I would expect vhosts to be part of the distributed consensus data.  Implementing mirroring of queues as RabbitMQ does can be done by a RequestStrategy that mirrors data.

For RabbitMQ to be able to scale a queue across multiple servers, I think the "message sequence id" needs to be partially exposed.  I could consist of a rough timestamp that doesn't need to be synchronized, and that would make it possible to shard a message queue across multiple nodes.  The exposed shards could be "hours" while the internal message sequence ids could be "milliseconds".  Thus the internal key for a queue would be something like

type QueueName = String
type PushTime = Int
data InternalQueueKey = QueueItem QueueName PushTime
type PushHour = Int
data InternalQueueShard = QueueShard QueueName PushHour

and the "focus" function truncates the PushTime to hours.  This does not give shards of equal size so another option is to have
data InternalQueueShard = QueueShard QueueName ShardId

and have the "focus" function be a range-table that maps ranges of PushTime to ShardId. But anyways, this is not the time to rearchitect RabbitMQ.

Alexander

Tim Watson

unread,
Feb 11, 2013, 9:04:41 AM2/11/13
to Alexander Kjeldaas, Alberto Gómez Corona, panka...@gmail.com, parallel...@googlegroups.com
On 11 Feb 2013, at 13:33, Alexander Kjeldaas wrote:
>
> ... But anyways, this is not the time to rearchitect RabbitMQ.
>

Now you sound like my boss! :D

Alexander Kjeldaas

unread,
Feb 11, 2013, 9:11:28 AM2/11/13
to Alberto G. Corona, Tim Watson, panka...@gmail.com, parallel...@googlegroups.com
On Mon, Feb 11, 2013 at 2:10 PM, Alberto G. Corona <agoc...@gmail.com> wrote:
I see these protocols as too tightly coupled and centralized. I prefer a more decoupled model based on a market, because that is the paradigm that support flexibility and growth, maintaining the intelligence in each of the nodes. 

In the example of sharding.. ¿Why each shard must be of the same size?.

It doesn't need to be.  The only requirement is that there needs to be a function that can compress the key-space (of any size) to a smaller shard-space that is of "reasonable size".  The "reasonable size" requirement is there so that it is possible to distribute the shard-to-process-or-node information to lots of clients.
 
A partition in the database could receive much more queries. It is the node that receive the request the one that has to decide either split in two or not, and how.  it can duplicate itself or partition the data in two shards. 

This is within what I talk about.  I am not specifying *who* changes the sharding function, or *how* the sharding function can change.  
If a concept of "owner" of a shard exists, a node that "owns" a shard can split it and thus the "owner" would be the one that modifies (part of) the sharding function.
Some external monitor that looks at the whole system could do it.
Clients that detect that a node is down could change the map from shard to node/process to indicate that a process is dead.
The process itself that handles lots of shards can decide to let some of them go or request extra replication.
 
For reconnection problems, it is better not to reconnect at all, but to forward requests. The node that has splitted, forward the request to the new node in the way that I mentioned above.

I would file this under "RequestStrategy".  I'd say that in general, letting a node forward requests does not work.  Without a fine-tuned request flow-control system that can deal with such things as predicting the cost of a request, a node will be overloaded from time to time.  An overloaded node cannot be expected to give any QoS for forwarded requests.  Rather it must terminate requests and reject new clients.  An overloaded node is a node that tries to give certain QoS guarantees, and if a client is not in the QoS bucket, all bets are off.  Somewhere in the system unbounded queues will accumulate.  Rejected clients will have to find some other way of accessing the data, by trying some other ProcessId in the mapShard result.

Therefore, the RequestStrategy, and the set of ProcessIds that can be used to access the given data should be distributed not through the node itself, but through a different mechanism, and using distributed consensus ensures both that the data is available in the node overload scenario, as well as giving all clients a consistent view.

Well, I guess it could be distributed through the node itself, but tying those two together is less flexible than treating the information as coming from "somewhere else" where I suggest "somewhere else" being a distributed consensus service.

Actually, parts of the RequestStrategy such as load information does not need to be consistent, only "statistically significant" I guess :-).  It can be distributed more efficiently.

In my experience, there are two cases where forwarding requests work.  The first is processing real-time data.  In such systems there is typically lots of spare CPU.  The other example is a proxy/load-balancer process where the whole point is to forward requests.  However proxies and load-balancer, although practical are not optimal from a performance or latency point of view.

One way of viewing what I have described is as a proxy or load-balancer that is replicated inside all clients.  The replication of the state that is held by the load-balancer is done by distributed consensus.


A process that do nor receive requests could die (or can fail). Upon the next request for his data, the originator process can detect the condition and re-spawn it. 


An overloaded process is in an in-between state where some requests should fail. 
 
That way, by sucessive automatic splits, we obtain a distributed fault tolerant database starting from a single node. Each node has the same software and intelligence, and it is application dependent, programmed by the application programmer, but based on some basic primitives for monitoring error detection and load management,  state migration, execution etc.


I think you are assuming a single owner of data, so that there is a 1:1 relationship between a shard and a process.  I have described a N:M relationship, but I guess a 1:M relationship could work given that processes are cheap to create.  In a 1:M relationship a shard is backed by a set of processes, but a single process is only associated with a single shard.  This might have some desirable properties such as giving a guarantee that if the process is reachable, the shard is reachable.

Alexander

Alberto G. Corona

unread,
Feb 12, 2013, 10:29:43 AM2/12/13
to Alexander Kjeldaas, Tim Watson, panka...@gmail.com, parallel...@googlegroups.com

No doubt the network layer , or better, a " resource distribution layer" should provide the basic infrastructure for whatever strategy for the distribution of resources , either static, dynamic, with intelligence centralized of distributed. Some application would prioritize flexibility over availability others, flexibility, replicability or whatever criteria. I have a requirement for explicit avoidance of master servers and verificability and replicability of results, since I´m interested in a decentralized  system for electronic democracy where anyone in his house can connect  at any time his personal computer and replicate the vote count  process -in real time or afterwards- to avoid the danger of tampering of the software or  the data in a central server

It is clear that no  preconceived solution can cover all the cases. But these different architectures will be particular for some particular requirements, so they should be located in another layer more close to the concrete application, not in the network layer.

My wish is to have an architecture-independent resource distribution layer with services for node load information, failover, reconnection and perhaps, some more higher level functionalities like some protocol for load distribution and  some primitives, like  "ask to the cloud" that I mentioned above.  




2013/2/11 Alexander Kjeldaas <alexander...@gmail.com>



--
Alberto.

Tim Watson

unread,
Feb 12, 2013, 11:13:14 AM2/12/13
to Alberto G. Corona, Alexander Kjeldaas, panka...@gmail.com, parallel...@googlegroups.com
Alberto,

On 12 Feb 2013, at 15:29, Alberto G. Corona wrote:
> It is clear that no preconceived solution can cover all the cases. But these different architectures will be particular for some particular requirements, so they should be located in another layer more close to the concrete application, not in the network layer.
>

None of the things we've been discussing are going to end up in the network layer, that's for sure.

> My wish is to have an architecture-independent resource distribution layer with services for node load information

You might like to vote for some Jira issues to be prioritised then:

https://cloud-haskell.atlassian.net/browse/NT-3 - network-transport management API
https://cloud-haskell.atlassian.net/browse/NT-2 - network-transport stats API
https://cloud-haskell.atlassian.net/browse/DP-46 - process management API
https://cloud-haskell.atlassian.net/browse/DPP-33 - node monitoring/stats API
https://cloud-haskell.atlassian.net/browse/DPP-36 - node admin/management API
https://cloud-haskell.atlassian.net/browse/DPP-50 - SNMP agent

https://cloud-haskell.atlassian.net/browse/DPP-40 - distributed tx/consensus protocols
https://cloud-haskell.atlassian.net/browse/DPP-47 - group services
https://cloud-haskell.atlassian.net/browse/DPP-48 - atomic broadcast
https://cloud-haskell.atlassian.net/browse/DPP-66 - process groups

If nobody votes for features/bugs then I'll only ever prioritise them based on what I feel like working on! ;)


> failover

Would you and/or Alexander be willing to write up a high level ticket for this?

Cheers,
Tim

Tim Watson

unread,
Feb 12, 2013, 11:14:28 AM2/12/13
to Alexander Kjeldaas, Alberto Gómez Corona, panka...@gmail.com, parallel...@googlegroups.com
Ironically we're talking about adding support for sharded queues in RabbitMQ this year. It'll be interesting to look at how that gets implemented - I'll keep you posted! :)

Alberto G. Corona

unread,
Feb 13, 2013, 7:53:40 AM2/13/13
to Tim Watson, Alexander Kjeldaas, panka...@gmail.com, parallel...@googlegroups.com
Tim, 
thanks

I prefer that other create the Jira entry for failover . I have no broad perspective to know if mine is general enough and compatible with the current design. Perhaps it is heterodox but simple at first sight, because it does not need special monitor-portmapper, combining forwarding and reconnection depending on the application requirements. This is a description as an user: 

In my proposal every process has a monitor, that is the parent process, except the first one, that may have many copies. the client ever connect to one of these top level processes  ( that´ts why it is ask-to the cloud). The top level can delegate (this is application dependent) a part of his work to other processes in other nodes. So it forwards the requests to the new processes. By looking at the response, the client know the address of the process that answered the request, the client process can choose either to direct the next request to the old process (that will forward again) or to reconnect to the new one. the programmer could choose either option, depending on the requirements

That cover a lot of scenarios with some good properties for monitoring, fail-over, clustering, process move, and load balancing:

 If the client process do not reconnect to the new instance (and this may be application dependent) , the old node can act as a monitor of the spawned processes. it can detect failures on them and do the appropriate action for the application requirements. it can do load balancing too if the programmer decides to do so.

If the client process reconnect to the spawned process to avoid forwarding (this makes sense when a lot of requests will be sent to the same data partition) , once a request fails, it can connect (or reuse the active connection) to the parent process and re-send the request. this older process will do the appropriate to answer the request. This regression may proceed recursively, if there is a failure in the tree of processes. This kind of invocation with failover can be a primitive. 

Note that a failure in the request does not have to mean that the process has failed. It can mean that the process can not answer it (for example, his  partition has no such data). The parent process can redirect the request to the correct child process in that case. if it is due to a failure, the parent will  redirect to the same node, detect the failure  and will spawn a new one.

If the process just want to migrate, then it forwards all the requests to the new process. the clients detect the new process address in the responses and reconnect to this new address. Once all connections are redirected or timeout is reached, the old process will die, since  any process that do not receive requests after a timeout should die. If necessary, upon request, a parent process will re create it (since there is ever a parent)


2013/2/12 Tim Watson <watson....@gmail.com>



--
Alberto.

Tim Watson

unread,
Feb 13, 2013, 8:26:36 AM2/13/13
to Alberto G. Corona, Alexander Kjeldaas, panka...@gmail.com, parallel...@googlegroups.com
Hi Alberto,

Much of what you've written below can be easily implemented with the ManagedProcess (gen_server) and supervisor APIs, so there's no need to write up a Jira issue. Once distributed-process-platform is on Hackage (soon) then I can show you how you can easily put together these features that you're after.

Cheers
Tim

Duncan Coutts

unread,
Feb 14, 2013, 12:40:39 PM2/14/13
to watson....@gmail.com, Alberto G.Corona, Alexander Kjeldaas, panka...@gmail.com, parallel...@googlegroups.com
On Wed, 2013-02-13 at 13:26 +0000, Tim Watson wrote:
> Hi Alberto,
>
> Much of what you've written below can be easily implemented with the
> ManagedProcess (gen_server) and supervisor APIs, so there's no need to
> write up a Jira issue. Once distributed-process-platform is on Hackage
> (soon) then I can show you how you can easily put together these
> features that you're after.

My 2p on this issue is that I think it's exceedingly hard to try to do
code upgrades within the OS process, or by switching transparently to a
new instance at the network layer.

I would suggest going for an approach higher up the stack (which might
be ManagedProcess). The general approach there would be to bring up the
new node, shutdown the old and get all peers/clients to start talking to
the new one. So the peers/clients do have to know that there's a new
process id, and possibly have to deal with resetting states or whatever.
Or ideally just use the supervision tree to shut things down and restart
them. Point is, the higher level library can force its users to act in a
certain way (e.g. in their management of state) and so can deal with
restarts sanely.

I know Erlang can do the hot code upgrade and preserve process state
etc, but my vague understanding is that people do not use it much (you
say RabbitMQ doesn't). My (again vague) understanding is that people do
use higher level approaches, like treating a node upgrade just like any
other case where a node dies and restarts.

Now, even in that case: where a node dies and restarts with newer code,
we still have exciting problems if any of the message types change. Acid
state does it with one way data upgrade functions, but I don't think we
could do it just one-way in the CH context.

--
Duncan Coutts, Haskell Consultant
Well-Typed LLP, http://www.well-typed.com/

Alberto G. Corona

unread,
Feb 14, 2013, 12:51:10 PM2/14/13
to Tim Watson, Alexander Kjeldaas, panka...@gmail.com, parallel...@googlegroups.com
Tim,
Ok. Thanks.




2013/2/13 Tim Watson <watson....@gmail.com>



--
Alberto.

Tim Watson

unread,
Feb 14, 2013, 1:20:30 PM2/14/13
to dun...@well-typed.com, Alberto G.Corona, Alexander Kjeldaas, panka...@gmail.com, parallel...@googlegroups.com
Hi Duncan,

On 14 Feb 2013, at 17:40, Duncan Coutts wrote:
> My 2p on this issue is that I think it's exceedingly hard to try to do
> code upgrades within the OS process, or by switching transparently to a
> new instance at the network layer.
>
> I would suggest going for an approach higher up the stack (which might
> be ManagedProcess).

I'm starting to lean towards this idea too.

> Point is, the higher level library can force its users to act in a
> certain way (e.g. in their management of state) and so can deal with
> restarts sanely.
>

This fits with Alberto's needs as well.

> I know Erlang can do the hot code upgrade and preserve process state
> etc, but my vague understanding is that people do not use it much (you
> say RabbitMQ doesn't). My (again vague) understanding is that people do
> use higher level approaches, like treating a node upgrade just like any
> other case where a node dies and restarts.
>

This is a pretty accurate picture of most sizeable Erlang projects
outside of Ericsson that I'm aware of, yeah.

> Now, even in that case: where a node dies and restarts with newer code,
> we still have exciting problems if any of the message types change. Acid
> state does it with one way data upgrade functions, but I don't think we
> could do it just one-way in the CH context.

That's food for thought too.

Cheers,
Tim



Alberto G.Corona

unread,
May 6, 2013, 5:35:09 AM5/6/13
to parallel...@googlegroups.com, dun...@well-typed.com, Alberto G.Corona, Alexander Kjeldaas, panka...@gmail.com
Hi
 
It seems to me that code upgrades could be managed in the same way than migration. I do not like the idea of an ad-hoc protocol and a new soft for each purpose if this can be avoided.
 
According with my proposal If the process just want to migrate the same above failover-delegation protocol apply: The old proces in this cases forward all the requests to the new process. the clients detect the new process address (and possibly a flag that indicates migration mode) in the responses and reconnect to this new address. Once the old process detect that all connections are redirected or timeout is reached, the old process will die, since  any process that do not receive requests after a timeout should die (it will be restarted by a supervisor higher in the three if necessary).  Thus, if I´m right, here is no need for a new ad-hoc protocol !  Only different behaviours of the same basic protocol for migration, failover, clustering, monitoring and load balancing.

Tim Watson

unread,
May 6, 2013, 7:43:04 AM5/6/13
to agoc...@gmail.com, parallel...@googlegroups.com, dun...@well-typed.com, Alberto G.Corona, AlexanderKjeldaas, panka...@gmail.com
Hi Alberto,

I just admit I struggle to understand how that would work in the general case, however... The ManagedProcess API is now stable, so feel free to send a patch or pull request. ;)

In terms of server code upgrade within the same executable (I.e., without code change but just swapping to an alternative handler, such as "forward everything to X", there was a branch on which I got that working previously, though it's quite out of date now and the module structure will have changed quite a bit. See https://github.com/haskell-distributed/distributed-process-platform/tree/upgrades for details.

Cheers,
Tim
Reply all
Reply to author
Forward
0 new messages