[erlang-questions] node to node message passing

300 views
Skip to first unread message

Morten Krogh

unread,
Sep 12, 2010, 6:48:38 AM9/12/10
to erlang-q...@erlang.org
Hi Erlangers.

During some test with node to node communication, I sent a large binary
from a process on node A
to a process on another node, node B. I also sent some smaller messages
from other processes on node A to other
processes on node B. It turned out that the large message blocked the
later messages. Furthermore, it even blocked
the net tick communication, so node A and B disconnected from each other
even though the large message was being transferred!

After looking a bit around, I have come to the understanding that Erlang
uses one tcp connection between two nodes, and messages are sent
sequentially from the sending node A to the receiving node.

If that is correct, I think some improvements are needed.

The problem to solve is basically that small messages, including the net
tick, should get through more or less independently of
the presence of large messages.

The simplest would be to have several connections, but that doesn't
fully solve the problem. A large message will still take up
a lot of the hardware bandwidth even on another tcp connection.

My suggestion is something like the following.

For communication between node A and node B, there is a process (send
process) on each node, that coordinates all messages. The send process
keeps queues of different priorities around, e.g., a high priority,
medium priority and low priority. Messages are split up into fragments of
a maximum size. The receiver(node B) send process assembles the
fragments into the original message and delivers it locally to the
right process. The fragments ensure that no single transfer will occupy
the connection for very long.
There will be a function send_priority where the user can specify a
priority. The usual send will default to medium, say.
Net tick will use high priority, of course. Small messages that are
needed to produce a web application response can have high priority.
File transfers
for backup purposes can have low priority.
The send process then switches between the queues in some way, that
could be very similar to context switching priorities.

More advanced, the send processes could occasionally probe the
connection with packets to estimate latency and bandwidth. Those figures
could then be used
to calculate fragment sizes. High bandwidth, high latency would require
large fragments. Low bandwidth, low latency small fragments for instance.
There could even be a function send_estimated_transfer_time that sends a
message and has a return value of estimated transfer time, which could
be used in
a timeout in a receive loop.


I have actually implemented my own small module for splitting messages
into fragments, and it solves the issues; net tick goes through, and small
messages can overtake large ones.

There is of course an issue when the sending and receiving process is
the same for several messages. Either the guaranteed message order
should be given up, or the
coordinators should keep track of that as well. Personally, I think
guaranteed message order should be given up. Erlang should model the
real world as
much as possible, and learn from it. In the real world, two letters
going from person A to person B, can definitely arrive in the opposite
order
of the one in which they were sent. And as node to node communication
will be over larger and larger distances, it is totally unnatural to
require
a certain order.

I am relatively new to Erlang and I really enjoy it. Kudos to all involved!

Cheers,

Morten Krogh.


________________________________________________________________
erlang-questions (at) erlang.org mailing list.
See http://www.erlang.org/faq.html
To unsubscribe; mailto:erlang-questio...@erlang.org

Jan Huwald

unread,
Sep 12, 2010, 1:51:17 PM9/12/10
to erlang-q...@erlang.org

Am Sunday 12 September 2010 12:48:38 schrieb Morten Krogh:
> Hi Erlangers.
>
> During some test with node to node communication, I sent a large binary
> from a process on node A
> to a process on another node, node B. I also sent some smaller messages
> from other processes on node A to other
> processes on node B. It turned out that the large message blocked the
> later messages. Furthermore, it even blocked
> the net tick communication, so node A and B disconnected from each other
> even though the large message was being transferred!
>
> After looking a bit around, I have come to the understanding that Erlang
> uses one tcp connection between two nodes, and messages are sent
> sequentially from the sending node A to the receiving node.
>
> If that is correct, I think some improvements are needed.

IMO the programmer has to take precautions on its own, if he expetcs to handle
large messages. This is analogous to large memory requirements of single
process. Erlang is not well suited for one of both, natively - which is good,
because it keeps things simple.

In case of large messages (or large process heaps) tradeoffs have to be made.
Your proposed solution is one example for (a) the fact that these tradeoffs
can be excluded from the language core (b) involving tradeoffs not everybody
is agreeing on.

Actively probing seems to me like a recipe for far more failures than it
offers benefit. Passively probing might be ok, if the run-time overhead is
small. But an actual implementation, which does not get biased by
nonstationary activity of the application, seems quite complex to me.


> I have actually implemented my own small module for splitting messages
> into fragments, and it solves the issues; net tick goes through, and small
> messages can overtake large ones.
>
> There is of course an issue when the sending and receiving process is
> the same for several messages. Either the guaranteed message order
> should be given up, or the
> coordinators should keep track of that as well. Personally, I think
> guaranteed message order should be given up. Erlang should model the
> real world as
> much as possible, and learn from it. In the real world, two letters
> going from person A to person B, can definitely arrive in the opposite
> order
> of the one in which they were sent. And as node to node communication
> will be over larger and larger distances, it is totally unnatural to
> require
> a certain order.
>
> I am relatively new to Erlang and I really enjoy it. Kudos to all involved!
>
> Cheers,
>
> Morten Krogh.

Regards,
Jan

Morten Krogh

unread,
Sep 13, 2010, 3:40:30 PM9/13/10
to erlang-q...@erlang.org
Hi Jan

Thanks for your answer. I guess we really disagree on this:)

I was stunned when I saw the nodes disconnecting in the middle of a
large message passing,
and I think this must be improved at low level in the VM. I see it as
similar to context switching.

Let me explain why I do that. But first, I will comment on the claim,
that you and others make, that Erlang is not suited for large amounts of
data.
Why not? Erlang is implemented in C. Binaries can be stored as
efficiently as in any other language. A binary can be sent to a socket
using C functions.
Where is the fundamental problem?
Is this claim of Erlang being unsuited for large messages not just
because people represent data with a space inefficient data structure,
e.g., using a list of 4 byte integers instead of
a binary.

Back to message passing. A cluster of Erlang nodes, need to solve many
tasks simultaneously. A task could, in Erlang style, be solved by many
cooperating processes, which use
message passing to communicate. There are many "tasks" being solved
simultaneously, and they can have vastly different time profiles, and
priorities. One task could be a fast
response to a web application, and it might be implemented as a
json/html process that communicates with several data base processes,
maybe using a security process as well.
They will communicate with mostly small message of size <100 bytes say.
At the same time, there could be huge file tansfers or backups of the
data base processes going on.
Any example suffices. But the point is that the small task should finish
rather fast independently of when it is started, and indepedently of
what the large task is doing.
This is basically a rationale for multi tasking and context switching.
And the Erlang vm does this for processes, and everybody agrees it is a
good thing.
Now muylti tasking only works if all aspects of the task (computation)
can go ahead without being blocked. You can context switch as much as
you want, but it doesn't help if
the fast task gets stuck waiting for a message pass. so it is essential
that the large messages can be preempted, and that the bandwidth can be
assigned to the fast task.
It is probably obvious what I am saying. All parts of the computation
must switch between tasks including the message passing between nodes on
distinct computers.
Otherwise you get a bottleneck.

It would be like context switching in the registers and CPU, but the
memory bus saying, "sorry CPU, you cannot get the data you are
requesting now, because I am still transferring data for the previous
process, the one you just preempted".


Cheers,

Morten.

Scott Lystig Fritchie

unread,
Sep 13, 2010, 3:51:26 PM9/13/10
to Morten Krogh, erlang-q...@erlang.org
Morten Krogh <m...@amberbio.com> wrote:

mk> I was stunned when I saw the nodes disconnecting in the middle of a
mk> large message passing, and I think this must be improved at low
mk> level in the VM. I see it as similar to context switching.

Morten, if you had some runnable code that demonstrated the behavior, we
would all have a much more precise description of what's happening. :-)
And a description of the networking environment, e.g. 10GBit local
Ethernet vs. 9.6Kbps/sec to lunar orbit. (Running in virtual machine
environments (Xen, VMware, ...) can also play tricky games.)

If I recall correctly ... the net_kernel doesn't rely solely on the tick
messages between nodes: any message from node A -> B should reset B's
tick timeout counter.

-Scott

Joe Armstrong

unread,
Sep 13, 2010, 3:55:57 PM9/13/10
to Morten Krogh, erlang-q...@erlang.org
On Sun, Sep 12, 2010 at 12:48 PM, Morten Krogh <m...@amberbio.com> wrote:
>  Hi Erlangers.
>
> During some test with node to node communication, I sent a large binary from
> a process on node A
> to a process on another node, node B. I also sent some smaller messages from
> other processes on node A to other
> processes on node B. It turned out that the large message blocked the later
> messages. Furthermore, it even blocked
> the net tick communication, so node A and B disconnected from each other
> even though the large message was being transferred!

Just for clarification could you say what you mean by "large", "small"
etc. - I have no idea what this means - it might mean 10's of MBytes
it might mean GBytes - without knowing I have no idea as to how
realistic your expectations are.

/Joe

Morten Krogh

unread,
Sep 13, 2010, 4:41:42 PM9/13/10
to erlang-q...@erlang.org
Hi Scott and Joe

Sure, let us start with the disconnection, even though there is more to it than that.

Two mac computers on a local wireless network. Slow connection, using the mac activity I see a transfer speed of around 400kB/s ~ 3Mb/s
But hang on, this is a fundamental issue, I believe. It is just more clear at slow connections.


Start two nodes, one on each computer

erl -name 'no...@10.0.1.2'
erl -name 'no...@10.0.1.10'

connect them with net_adm:ping. nodes() gives the right result.

Create 50MB binary.

(no...@10.0.1.2)7> B = binary:copy(<<"b">>, 50000000).

Send the binary to the registered shell at the other node.
The shell is registered as ten.

(no...@10.0.1.2)7> {ten, 'no...@10.0.1.10'} ! B.

Watch the network and see transfers of around 300-440 kB/s.

On the other node

(no...@10.0.1.10)12> now().
{1284,408613,791811}

(no...@10.0.1.10)13>
=ERROR REPORT==== 13-Sep-2010::22:11:27 ===
** Node 'no...@10.0.1.2' not responding **
** Removing (timedout) connection **

(no...@10.0.1.10)13> now().
{1284,408693,688267}

The disconnect happened after around 1 minute, which is the default net_ticktime.

Let us check this assumption.

(no...@10.0.1.10)14> net_kernel:set_net_ticktime(5).
change_initiated
(no...@10.0.1.10)15> net_kernel:get_net_ticktime().
{ongoing_change_to,5}
(no...@10.0.1.10)16> net_kernel:get_net_ticktime().
{ongoing_change_to,5}
(no...@10.0.1.10)17> net_kernel:get_net_ticktime().
5

The net_ticktime is now 5 seconds.


(no...@10.0.1.10)18> now().
{1284,408904,738430}
(no...@10.0.1.10)19>
=ERROR REPORT==== 13-Sep-2010::22:15:11 ===
** Node 'no...@10.0.1.2' not responding **
** Removing (timedout) connection **

(no...@10.0.1.10)19> now().
{1284,408913,335812}

8 seconds, but I was very slow at typing now().
Try again.

(no...@10.0.1.10)20> now().
{1284,408927,35639}
(no...@10.0.1.10)21> now().
=ERROR REPORT==== 13-Sep-2010::22:15:32 ===
** Node 'no...@10.0.1.2' not responding **
** Removing (timedout) connection **

=ERROR REPORT==== 13-Sep-2010::22:15:32 ===
The global_name_server locker process received an unexpected message:
{{#Ref<0.0.0.155>,'no...@10.0.1.2'},true}

{1284,408932,778710}

5 seconds as expected. Anyway, net_tick is not precise. It could have been 4-6.

The file was 50MB, but I could have made it smaller, obviously.

But the real point is not just the disconnect. The real point is that messages block others.

I am not giving the code here, but you can easily reproduce it, if it is reproducible :)

Start two processes on 'no...@10.0.1.2' say two1 and two2.
Start two processes on 'no...@10.0.1.10;, say ten1 and ten2.


Set net_ticktime to a very high number.

Send the 50 MB binary or so from two1 to ten1.
Shortly after send a 10 byte message from two2 to ten2.

Let ten2 and ten2 write to io:format when they receive a message.

Nothing happens for a long time, the network is busy, and then both ten1 and ten2 write their output.


But does anyone actually know? Do messages queue up like this test seems to indicate.


Cheers,

Morten.

Tony Rogvall

unread,
Sep 13, 2010, 5:38:39 PM9/13/10
to Morten Krogh, erlang-q...@erlang.org
Hi Morten!

On 12 sep 2010, at 12.48, Morten Krogh wrote:

> Hi Erlangers.
>
> During some test with node to node communication, I sent a large binary from a process on node A
> to a process on another node, node B. I also sent some smaller messages from other processes on node A to other
> processes on node B. It turned out that the large message blocked the later messages. Furthermore, it even blocked
> the net tick communication, so node A and B disconnected from each other even though the large message was being transferred!
>

This is one of the things that should have been fixed a long time ago (I think)
May be I am also the one that should have done it while I had the chance ;-)


> After looking a bit around, I have come to the understanding that Erlang uses one tcp connection between two nodes, and messages are sent
> sequentially from the sending node A to the receiving node.
>
> If that is correct, I think some improvements are needed.
>

I can only agree here.

> The problem to solve is basically that small messages, including the net tick, should get through more or less independently of
> the presence of large messages.
>
> The simplest would be to have several connections, but that doesn't fully solve the problem. A large message will still take up
> a lot of the hardware bandwidth even on another tcp connection.
>
> My suggestion is something like the following.
>
> For communication between node A and node B, there is a process (send process) on each node, that coordinates all messages. The send process
> keeps queues of different priorities around, e.g., a high priority, medium priority and low priority. Messages are split up into fragments of
> a maximum size. The receiver(node B) send process assembles the fragments into the original message and delivers it locally to the
> right process. The fragments ensure that no single transfer will occupy the connection for very long.
> There will be a function send_priority where the user can specify a priority. The usual send will default to medium, say.
> Net tick will use high priority, of course. Small messages that are needed to produce a web application response can have high priority. File transfers
> for backup purposes can have low priority.
> The send process then switches between the queues in some way, that could be very similar to context switching priorities.

Yes, multiplexing over one TCP channel I think is a reasonable way to go.

>
> More advanced, the send processes could occasionally probe the connection with packets to estimate latency and bandwidth. Those figures could then be used
> to calculate fragment sizes. High bandwidth, high latency would require large fragments. Low bandwidth, low latency small fragments for instance.
> There could even be a function send_estimated_transfer_time that sends a message and has a return value of estimated transfer time, which could be used in
> a timeout in a receive loop.
>

Why not take one step at a time, Erlang/OTP is and should be a process of improvements. Trying to do too much
is just going to delay the implementation finding it's way into the main branch.

>
> I have actually implemented my own small module for splitting messages into fragments, and it solves the issues; net tick goes through, and small
> messages can overtake large ones.
>

Can't wait to see the implementation, do you have a git somehere ?

> There is of course an issue when the sending and receiving process is the same for several messages. Either the guaranteed message order should be given up, or the
> coordinators should keep track of that as well. Personally, I think guaranteed message order should be given up. Erlang should model the real world as
> much as possible, and learn from it. In the real world, two letters going from person A to person B, can definitely arrive in the opposite order
> of the one in which they were sent. And as node to node communication will be over larger and larger distances, it is totally unnatural to require
> a certain order.

Lets try to stick to the existing "semantics" here, then the implementation will have a chance to get realized.

>
> I am relatively new to Erlang and I really enjoy it. Kudos to all involved!
>

Well you certainly stumbled into the right place ;-)

Regards

/Tony

Joe Armstrong

unread,
Sep 14, 2010, 5:04:28 AM9/14/10
to Tony Rogvall, Morten Krogh, erlang-q...@erlang.org
Interesting - I think what you want is a message passing layer that "just works"
this should survive virtually all failures. What happens if you send a
large message
between A and B and B is "totally destroyed" during the sending?

Think B is a laptop and has just been crushed with a sledge hammer.

You buy a new B - reinstall the software and ... should the data
transfer proceed?

I think you need an introspection layer for things like this - you
need to start the
transfer, then go back later and ask how it it doing? - you might get the answer
"B is broken - waiting to be repaired" - but having said "transfer X
from A to B"
the system should "try forever" to do this.

You might want "transfer X from A to B and tell Y if you can't do this
within time T"
which is completely different.

Exactly what is "built-in" to the standard inter-process message
passing semantics
is a mute point. I think what you want belongs to a separate library function.

Actually I have > 1 macs at home - with bad wireless connectivity between
certain rooms - so this is a not uninteresting problem.

Some kind of "bit-torrent" algorithm might be better - suppose
connectivity between A and
C is bad - but between A and B and B and C is good - what then?

A very interesting problem ... some kind of gossip algorithm with
bit-torrent type
fragment sharing seems in order. Since it's your own network, bit-torreent type
fairness is not relevant ...

You have twiched my programming nerve ...

/Joe

Jesper Louis Andersen

unread,
Sep 14, 2010, 6:41:40 AM9/14/10
to Joe Armstrong, Tony Rogvall, Morten Krogh, erlang-q...@erlang.org
On Tue, Sep 14, 2010 at 11:04 AM, Joe Armstrong <erl...@gmail.com> wrote:
>
> Some kind of "bit-torrent" algorithm might be better - suppose
> connectivity between A and
> C is bad - but between A and B and B and C is good - what then?

Bittorrent does nothing to protect against a killed TCP connection. If
the protocol gets a disconnect, it will simply grab another peer from
a pool of pending peers and try to connect to it. It may be the same
peer, or it may not. If the line is merely slow, the fragmentation of
the data ensures a grab eventually where the faster connections pass
more data around.

> Since it's your own network, bit-torreent type fairness is not relevant ...

Most of the bittorrent protocol is there to solve a specific problem:
most people are on slow, copper based xDSL type lines. These are
asymmetric and congest easily in the upstream path. Hence, a lot of
the algorithm is devoted to meticulously manage the limited upstream
bandwidth to balance the somewhat unlimited downstream bandwidth and
maximize it (until the file has been downloaded in which case we
change strategy). The algorithm is not that much about fairness. It
doesn't really care if anybody gets a "fair" share of the file. Rather
it tries to optimize the downstream bandwidth for the client - but to
do that the protocol is designed such that you must send data back.

If you ignore the sending, you will eventually get your file. The
point is just that you will get it later than everybody else. Another
interesting property is that clients "sort themselves" into classes
where a class is determined by its upload speed. Thus the fast peers
tend to get the file fast and then distribute it to other peers.

The brilliance of the protocol lies in swarm intelligence. Each peer
knows nothing -- but the information it has gathered itself. Each peer
use no historic data older than 10 seconds. Yet, data is transferred
at speeds that rival or surpass any other P2P network of critical mass
on the internet.

--
J.

Jesper Louis Andersen

unread,
Sep 14, 2010, 6:45:12 AM9/14/10
to Morten Krogh, erlang-q...@erlang.org
On Mon, Sep 13, 2010 at 10:41 PM, Morten Krogh <m...@amberbio.com> wrote:
>
> But the real point is not just the disconnect. The real point is that messages block others.
>

The advantage of a serial stream is probably that you don't need an ID
on each message to order them. If messages are allowed to be
multiplexed, you need the ordering.

The aside: SCTP is the protocol you want for this.

--
J.

Ryan Zezeski

unread,
Sep 18, 2010, 12:51:15 AM9/18/10
to Morten Krogh, erlang-q...@erlang.org
On Sun, Sep 12, 2010 at 6:48 AM, Morten Krogh <m...@amberbio.com> wrote:

>
> For communication between node A and node B, there is a process (send
> process) on each node, that coordinates all messages. The send process
> keeps queues of different priorities around, e.g., a high priority, medium
> priority and low priority. Messages are split up into fragments of
> a maximum size. The receiver(node B) send process assembles the fragments
> into the original message and delivers it locally to the
> right process. The fragments ensure that no single transfer will occupy the
> connection for very long.
> There will be a function send_priority where the user can specify a
> priority. The usual send will default to medium, say.
> Net tick will use high priority, of course. Small messages that are needed
> to produce a web application response can have high priority. File transfers
> for backup purposes can have low priority.
> The send process then switches between the queues in some way, that could
> be very similar to context switching priorities.
>
>

What happens to the low priority, large binary msg when the system becomes
loaded with a constant stream of smaller, high priority messages? It seems
the low priority msg would become neglected and then you'll be complaining
how the msg takes too long to arrive. I think in all but the most trivial
cases you would have to understand the inner workings of the priority
scheduler, if I may call it that, to understand how your messages will be
delivered. To me, this sounds like a problem to be handled by the
application.

Another thought, why are you trying to pass such a large amount of data
anyways? If you are truly constrained by such a slow network then why not
do everything in your power to avoid passing large amounts of data? I'm
thinking of Hadoop here--move the process to the data, not the other way
around.

-Ryan

Claes Wikstrom

unread,
Sep 24, 2010, 8:20:54 PM9/24/10
to erlang-q...@erlang.org
On 09/13/2010 09:40 PM, Morten Krogh wrote:
> Hi Jan
>
> Thanks for your answer. I guess we really disagree on this:)
>
> I was stunned when I saw the nodes disconnecting in the middle of a large message passing,

If this really is the case - I doubt - its a bug.

/klacke

Reply all
Reply to author
Forward
0 new messages