Does distributed-process leak connections?

52 views
Skip to first unread message

Facundo Domínguez

unread,
May 6, 2013, 5:07:47 PM5/6/13
to parallel...@googlegroups.com
Dear list,
I've found recently a program that seems to leak connections. Can
you spot anything wrong with it?
This program implements a ping server and clients. As time passes,
it prints in the standard output the amount of open connections.

If the program is fine, then any proposals on how to fix the leak are welcome.

Thanks,
Facundo

import Control.Concurrent
import Control.Distributed.Process as P
import Control.Distributed.Process.Node
import Control.Monad ( replicateM_, forever, void )
import Data.IORef
import Network.Transport
import Network.Transport.TCP

main :: IO ()
main = do
Right transport <- createTransport "127.0.0.1" "8080" defaultTCPParameters
c <- newIORef 0
lnid <- newLocalNode (wrapTransport c transport) initRemoteTable
runProcess lnid $ do
self <- getSelfPid
void $ spawnLocal $ forever $ do
spawnLocal $ do
P.send self self
expect :: Process ()
void $ receiveTimeout 100000 []
forever $ do
replicateM_ 20 $ expect >>= flip P.send ()
liftIO $ readIORef c >>= print

wrapTransport :: IORef Int -> Transport -> Transport
wrapTransport c (Transport ne ct) = Transport (fmap (fmap $ wrapEP c) ne) ct

wrapEP :: IORef Int -> EndPoint -> EndPoint
wrapEP c e =
e { connect = \x y z -> fmap (fmap $ wrapConnection c) $ do
atomicModifyIORef' c $ \i -> (i+1,())
connect e x y z
}

wrapConnection :: IORef Int -> Connection -> Connection
wrapConnection c (Connection s closeC) = Connection s $ do
atomicModifyIORef' c $ \i -> (i-1,())
closeC

Edsko de Vries

unread,
May 6, 2013, 5:27:03 PM5/6/13
to facundo....@parsci.com, parallel...@googlegroups.com
Without having studied your email in great depth, do the comments at http://www.well-typed.com/blog/72 help at all?

Edsko

--
You received this message because you are subscribed to the Google Groups "parallel-haskell" group.
To unsubscribe from this group and stop receiving emails from it, send an email to parallel-haske...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.




Tim Watson

unread,
May 6, 2013, 6:28:51 PM5/6/13
to Edsko de Vries, facundo....@parsci.com, parallel...@googlegroups.com
On 6 May 2013, at 22:27, Edsko de Vries wrote:

Without having studied your email in great depth, do the comments at http://www.well-typed.com/blog/72 help at all?

We really should think about how to handle that automatically, if possible. I suppose ticket https://cloud-haskell.atlassian.net/browse/NT-4 deals with that in part, at least when we lose contact with a whole other node, but it would be good to be a bit more pro-active in the case of a single (multiplexed) connection too.

These are spawnLocal processes though, so if Facundo tries this with the development branch of distributed-process, I suspect won't happen at all, because communication between processes on the same logical node won't involve the network-transport layer at all. Not that this is a good thing - it's irritating that the space leak will only show up in the inter-node messaging case, but that's a side effect of the changes in d9fcd8d.

Given that I'm unlikely to have a fix for this (in the remote case) prior to releasing 0.5.0 - not least because I'm not sure what the right solution would be - perhaps we should document this on the cloud haskell wiki (and getting started guide?) a bit more clearly? Or add a 'troubleshooting' page, which is something I've been thinking about since there are a few common gotcha's I've helped folks with repeatedly now anyway.

Cheers,
Tim

Facundo Domínguez

unread,
May 7, 2013, 7:31:15 AM5/7/13
to Tim Watson, Edsko de Vries, parallel...@googlegroups.com
> Without having studied your email in great depth, do the comments at
> http://www.well-typed.com/blog/72 help at all?

Probably reconnect will do. Thanks.

> We really should think about how to handle that automatically, if possible.

Indeed. As connections are hidden from the user, making a reconnect
call is both surprising and easy to forget.

> it's irritating that the space leak will only show up in the inter-node
> messaging case, but that's a side effect of the changes in d9fcd8d.

hm, this will make some issues trickier to find. Perhaps some
compilation flags could be provided so the current behavior is
available for debugging?

Regards,
Facundo

Tim Watson

unread,
May 7, 2013, 10:13:00 AM5/7/13
to Facundo Domínguez, Edsko de Vries, parallel...@googlegroups.com
On 7 May 2013, at 12:31, Facundo Domínguez wrote:

>> We really should think about how to handle that automatically, if possible.
>
> Indeed. As connections are hidden from the user, making a reconnect
> call is both surprising and easy to forget.
>

Yes, though solving this problem in the general case is quite tricky. That isn't to say we should do it, just that we need to define a solution first.

>> it's irritating that the space leak will only show up in the inter-node
>> messaging case, but that's a side effect of the changes in d9fcd8d.
>
> hm, this will make some issues trickier to find. Perhaps some
> compilation flags could be provided so the current behavior is
> available for debugging?
>

I don't think that's really plausible, since the changes are quite significant - having two code paths would be really ugly. It's not difficult to revert to the old behaviour in a single application/test for debugging anyway - just start two different local nodes in the same program instead of using spawnLocal.

Cheers,
Tim

AlanKim Zimmerman

unread,
May 7, 2013, 10:47:37 AM5/7/13
to Tim Watson, Facundo Domínguez, Edsko de Vries, parallel-haskell
A left field thought. Would there be any benefit to managing the underlying connections via zeromq, which has pretty robust reconnection management?

Alan


Tim Watson

unread,
May 7, 2013, 12:55:40 PM5/7/13
to alan...@gmail.com, Facundo Domínguez, Edskode Vries, parallel-haskell
Cloud Haskell would still have to maintain some state, because we're talking about network-transport connections, which a like AMQP channels (I you like) that multiplex over a single network connection, not physical TCP connections. So I'm not sure that zero has anything to add here, apart from the performance drag of communications between bound and inbound threads due to context switching.

What we need to do is figure out a mechanism by which we can more readily clean up these 'lightweight connections' when the other end goes away. Quite what's involved in that I'm not sure, because you won't know about the peer disappearing until you actually try and send to it, at which point the destination node controller knows the target has died but you do not. I suppose we could send a signal back asynchronously to let the caller's node know they can clean up that connection, but that would quickly create a network storm - oops.

One way we might be able to readily solve this is to piggy back on the forthcoming support for inter node heartbeats - intended to provide timely notification that a peer node has gone off the grid. This is somewhat analogous to erlang's net_kernel tick time setting.

Instead of just sending a ping to the heartbeat process on the other node every "tick time" seconds, we could also batch up information about any inbound connections we're aware of that reference an object known to have died. In case there's no interesting deaths, the heartbeat sent to that node would just be a 'ping' intended to stimulate a network transport error (if the peer doesn't respond in a timely fashion - as I understand it, we only need to send these pings not wait for a response, since the transport layer will generate an error for a missing peer all by itself - did I get that right Edsko?).

This will, of course, put addditional load on the network, but if the network's saturated and you're using heartbeats, the peers will disconnect anyway. Another thin we might do (to alleviate pressure on the node controller's listener thread) is use a separate, out f band channel for heartbeats and these control messages under discussion.

If that's a reasonable plan, I'll update the Jira ticket for heartbeats accordingly. But in that case, heartbeats just got way more complicated, so I'll push them back out after the 0.5.0 release - which is kind of nice, because that'll mean we get another release sooner as I'm nearly done bar a couple of other fairly minor tickets.

Cheers,
Tim

AlanKim Zimmerman

unread,
May 7, 2013, 1:17:42 PM5/7/13
to Tim Watson, Facundo Domínguez, Edskode Vries, parallel-haskell
Ok.  And for the record, here is a discussion around heartbeats.

http://zguide.zeromq.org/page:all#Heartbeating-for-Paranoid-Pirate
(I just happen to have been looking over this stuff recently, not really advocating anything)

Alan

Tim Watson

unread,
May 7, 2013, 1:40:24 PM5/7/13
to AlanKim Zimmerman, Facundo Domínguez, Edskode Vries, parallel-haskell
On 7 May 2013, at 18:17, AlanKim Zimmerman <alan...@gmail.com> wrote:

Ok.  And for the record, here is a discussion around heartbeats.


I'll take a look, but we need something that works for all transport backends not just one based on 0mq. That said, I know Pieter through my work at RabbitMQ and I'm a big fan of 0mq, it's just not the only thing we need to support is all.


. So I'm not sure that zero has anything to add here, apart from the performance drag of communications between bound and inbound threads due to context switching.

That was meant to be "bound and unbound" - damned phone. :)

Facundo Domínguez

unread,
May 7, 2013, 1:43:51 PM5/7/13
to AlanKim Zimmerman, Tim Watson, Edskode Vries, parallel-haskell
What if a node disconnects outgoing connections that have not been
used for a while? The timeout length would have to be application
dependent though.

Or this could also be done at the transport level. Whenever a
connection is inactive for a while, CH would receive an
EventConnectionLost event.

Facundo

Tim Watson

unread,
May 7, 2013, 2:04:51 PM5/7/13
to facundo....@parsci.com, AlanKim Zimmerman, Edskode Vries, parallel-haskell
On 7 May 2013, at 18:43, Facundo Domínguez <facundo....@parsci.com> wrote:

What if a node disconnects outgoing connections that have not been
used for a while? The timeout length would have to be application
dependent though.


But if you disconnect, you can break the ordering guarantees of message passing. Oops. This is a connection between each two peer processes, and we rely on that to preserve ordering semantics. If you disconnect the two peers then you'll quite possibly incur unexpected behaviour as a result.

Also, what is the right timeout, as you say, for each application. What about for different kinds of processes within an application? Timeouts are seldom a useful sticking plaster, even in networking applications where they're so commonly resorted to.

Or this could also be done at the transport level. Whenever a
connection is inactive for a while, CH would receive an
EventConnectionLost event.

Not sure how that's different from implementing heartbeats? And we can't do it at the transport level because all the transports are conceptually different under the covers. TCP connection handling is, for example, quite different than in-memory or CCI (?) or SCTP, etc. If we did this in each transport layer, the semantics of each transport technology would be exposed to the user differently - surely that's a leaky abstraction we want to avoid? And layering the heartbeats below the transport implementations would involve moving the policy based machinery down into what is currently just an API layer - we'd have to cope with all the idiosyncratic implementations details there and possibly keep changing them whenever  new transports arrive.

The CH level seems more like the right place to do heartbeats - the question is whether to overload that mechanism with 'control information' as well. If we did, then nodes could communicate stale connection info with one another seamlessly, using an application specific timeout (I.e., sent each tick time * heartbeat count based on an environment variable or config file) where appropriate.

Cheers,
Tim

Edsko de Vries

unread,
May 12, 2013, 6:37:30 AM5/12/13
to Tim Watson, alan...@gmail.com, Facundo Domínguez, parallel-haskell
So actually I think this is orthogonal to the issue of disconnect and heartbeats, and Tim's remark about ordering goes to the very core of this problem. In the case of of a disconnect between A and B (however it is detected), ordering of messages between A and B simply cannot be guaranteed anymore. Erlang would do an implicit reconnect at that point, but Cloud Haskell does not because implicit reconnects means that we can no longer provide strong ordering guarantees. Instead, we provide the "reconnect" primitive which, effectively, allows the programmer to say "I voluntarily give up some ordering guarantees; I'll deal with it".

The harder problem is to garbage collect connections that *haven't* broken, but are no longer in use (for some definition of "no longer in use"). We cannot simply disconnect, and forget everything to do with that (logical) connection, because then when a next message is sent Cloud Haskell must still be able to guarantee the same ordering guarantees. You need some sort of protocol that allows the two nodes to establish that all messages sent on the old (garbage collected) connection have been received, so that messages sent on the new connection are delivered in order. Whether or not you can do this without maintaining any state at all (i.e., in linear space) I'm not sure.

Edsko

Jeff Epstein

unread,
May 12, 2013, 7:27:24 AM5/12/13
to ed...@well-typed.com, Tim Watson, alan...@gmail.com, Facundo Domínguez, parallel-haskell
Maybe the application can decide when it's safe to forget connection
information between nodes. That is, there could be a "disconnect"
primitive analogous to the "reconnect."

This has the advantage of not breaking existing behavior, while
allowing applications to let CH clean up connections that aren't being
used any more.

Jeff

Edsko de Vries

unread,
May 12, 2013, 7:30:52 AM5/12/13
to Jeff Epstein, Tim Watson, alan...@gmail.com, Facundo Domínguez, parallel-haskell
Actually, reconnect can already be used for this purpose (and I recommend that it is). Perhaps that's a hack, but I don't really think so. "reconnect" really says: "It's *ok*' to reconnect now (but not necessary)". Perhaps it should be renamed.

Tim Watson

unread,
May 12, 2013, 9:07:45 AM5/12/13
to Edsko de Vries, Jeff Epstein, alan...@gmail.com, Facundo Domínguez, parallel-haskell
On 12 May 2013, at 12:30, Edsko de Vries wrote:

> Actually, reconnect can already be used for this purpose (and I recommend that it is). Perhaps that's a hack, but I don't really think so. "reconnect" really says: "It's *ok*' to reconnect now (but not necessary)". Perhaps it should be renamed.
>

In fact, the `reconnect' primitive in distributed-process calls a lower level primitive (exported from Messaging.hs) which is called 'disconnect' already. :)

Tim Watson

unread,
May 12, 2013, 2:21:27 PM5/12/13
to Edsko de Vries, alan...@gmail.com, Facundo Domínguez, parallel-haskell
On 12 May 2013, at 11:37, Edsko de Vries wrote:

> So actually I think this is orthogonal to the issue of disconnect and heartbeats, and Tim's remark about ordering goes to the very core of this problem
> The harder problem is to garbage collect connections that *haven't* broken, but are no longer in use (for some definition of "no longer in use").

Indeed. And just so that all the contributors to this thread have a clear picture of what's going on - right now, cloud haskell *will* close connections when one of the two processes dies. But so long as the processes remain alive, closing the connections would be wrong, as per the ordering issue.

> We cannot simply disconnect, and forget everything to do with that (logical) connection, because then when a next message is sent Cloud Haskell must still be able to guarantee the same ordering guarantees.
> You need some sort of protocol that allows the two nodes to establish that all messages sent on the old (garbage collected) connection have been received, so that messages sent on the new connection are delivered in order. Whether or not you can do this without maintaining any state at all (i.e., in linear space) I'm not sure.

What makes this tricky is the twofold problem that "no longer in use" is not only fuzzy but difficult to detect, and establishing "that all messages send on the old connection have been received" is theoretically impossible without the presence of infinite storage. This is perhaps a bit like the classic problem of "exactly once delivery" semantics in messaging systems, that lots of vendors promise without explaining the caveats properly. I won't bore the list with a diatribe about that, as it's pretty obvious where the gaps are if you think about trying to implement it properly.

Now keep-alive becomes useful is in the case where a remote endpoint disappears without getting through the shutdown protocol properly, due to a crash or network failure for example. But yes, that's an orthogonal issue.

Cheers,
Tim

Facundo Domínguez

unread,
May 12, 2013, 9:33:15 PM5/12/13
to Tim Watson, Edsko de Vries, alan...@gmail.com, parallel-haskell
> establishing "that all messages send on the old connection have been received" is theoretically impossible without the presence of infinite storage.

One way to address this is to have the process in the receiver side
complain if an earlier connection from the same sender is still open.
The sender would need to issue a reconnect call in such a case, and
the receiver would be told to ignore the old connection on the next
connection attempt. If no earlier connection is open, the receiver can
accept the new connection.

Facundo

Tim Watson

unread,
May 13, 2013, 5:39:54 AM5/13/13
to Facundo Domínguez, Edsko de Vries, alan...@gmail.com, parallel-haskell
On 13 May 2013, at 02:33, Facundo Domínguez wrote:

>> establishing "that all messages send on the old connection have been received" is theoretically impossible without the presence of infinite storage.
>
> One way to address this is to have the process in the receiver side
> complain if an earlier connection from the same sender is still open.
> The sender would need to issue a reconnect call in such a case, and
> the receiver would be told to ignore the old connection on the next
> connection attempt.

That's exactly what we're suggesting. There's no work to do here - this is already how both the network-transport and the distributed-process layers do things today. What I was driving at is that without that explicit call to 'reconnect', there is no way to do this without turning our lightweight asynchronous message passing semantics into a heavyweight, synchronous protocol requiring some degree of consensus.

Note that even in the case where the sender *does* explicitly call 'reconnect' - there is *still* no guarantee that the receiver will be able to release the connection in a timely fashion, because in the presence of communication failure, *ALL* methods for handling lost peers are highly timing dependant. For the simplest case, viz a lost TCP connection, take a look at `man tcp` and consider the function of tcp_retries1 (defaults to 3 on linux) and tcp_retries2, plus the configured RTO (and various other parameters that interact with this behaviour). This can mean delays of between 10 and 30 minutes before the OS networking layer "notices" that a peer socket is gone and returns ETIMEDOUT to the application layer. That's why I was talking about keep-alives, since they guarantee *some* timeliness in detecting that peers have disconnected.

We might handle this by writing an empty byte string on open connections every `n' seconds, thus stimulating a network-transport failure if the connection is down. This is *STILL* subject to the OS configuration however, since for TCP the various retry settings combined with the RTO are what determines when ETIMEDOUT is returned as I mentioned previously. One can use TCP keep-alives, but these are of course transport specific. I don't know how other transports (such as CCI) deal with this, but we need a mechanism that works for everyone. The AMQP protocol uses "active keep-alives", in other words the two peers agree on a timeout value and both transmit *and* receive 'heartbeats', considering the peer to be down if it does not "ping" without the configured delay. This increases load on the network, but guarantees we're not dependent on OS limits, because we handle the timeout ourselves without waiting for ETIMEDOUT. Of course, this means that if the network is saturated, we can loose connectivity even though the connection is still there, so there's a price to pay either way.

> If no earlier connection is open, the receiver can
> accept the new connection.

Again, there's nothing to do here, because when the sender calls 'reconnect' the underlying connection is closed and the local state pertaining to that connection is discarded. So all this business about 'ignore the old connection' is irrelevant - there is no more connection after you call reconnect/disconnect, network interruptions not withstanding.

Now in the absence of a call to reconnect/disconnect, if some sender P wants to connect to Q, it sends its own endpoint address to such that Q can reuse the connection. So a close from either end should do the trick. In context of your original question - about resource handling, viz leaking connections - the point you made, which I think has merit, is that it would be *nice* if we could detect that the *receiver* has gone away early, such that the connection (which we know is now useless) can be torn down early.

To simplify the common use case where the sender uses 'reconnect', we might add a primitive that sends a single message and then tears down the connection immediately. This guarantees that the connection is released quickly, and there is no further promise we can make about ordering between the two processes thereafter. We could leave implementing this to application developers (since it is ludicrously simple), but putting it in distributed-process allows us to document the semantics properly...

-- | Sends a message to @pid@. NB: <long dialogue about semantics>
sendOnce :: Serializable a => ProcessId -> a -> Process ()
sendOnce them msg = send them msg >> liftIO $ reconnect them

Perhaps that's worth putting into the base library, since it's pretty useful for inter-process communications that don't rely on the ordering of subsequent deliveries, and documenting the behaviour will probably reduce confusion for new users. I can also add a "performance + resource management" wiki page and an additional simple tutorial on the subject, prior to the next release. Does that sound useful?

Cheers,
Tim

Reply all
Reply to author
Forward
0 new messages