Connection heartbeat at actor level

127 views
Skip to first unread message

Pete Smith

unread,
Mar 21, 2016, 12:36:19 PM3/21/16
to ReactiveMongo - http://reactivemongo.org
Hi there,

We are trying to figure out the best way to address an issue we have found when using ReactiveMongo with a cross-datacentre cluster. 

Each datacentre is connected via a VPN. We have recreated this issue locally via a three node vagrant cluster. This consists of two active nodes and one arbiter, arranged so that they can only communicate when ipsec (strongswan) is running.

Normally when a primary node is lost and an election occurs... a ChannelClosed message is raised, which results in all awaitingResponses for that channel to be failed with a GenericDriverException("socket disconnected"). The node is then unauthenticated, leaving it in Unknown status with all connections for that channel removed.
     
channelUnavailable match {
 
case _: ChannelClosed => updateNodeSet(
    nodeSet
.updateNodeByChannelId(channelId) { node =>
      unauthenticate
(node, node.connections.
        filter
(_.channel.getId != channelId))
   
})

Once this is all finished, we see a log message like this:

ConnectAll Job running... Status: Node[xxxxx:27017: Unknown (0/10 available connections), latency=21], auth=Set() | Node[yyyyy:27017: Primary (10/10 available connections), latency=7], auth=Set() | Node[zzzzz:27017: NonQueryableUnknownStatus (10/10 available connections), latency=12], auth=Set()


Our problem comes when connection to the primary is because of a VPN outage. In this case, no ChannelClosed is raised (possibly as there is no reset packet sent), and so the node is not unauthenticated and the actor model is not aware that the connections are no longer working. A re-election does occur, but instead of the usual ConnectAll log message we see:

ConnectAll Job running... Status: Node[xxxxx:27017: Primary (10/10 available connections), latency=21], auth=Set() | Node[yyyyy:27017: Primary (10/10 available connections), latency=7], auth=Set() | Node[zzzzz:27017: NonQueryableUnknownStatus (10/10 available connections), latency=7], auth=Set()

Two primaries?? That can't be right! What has happened is that an isMaster response has come back from the failover node, and so the nodeSet has been updated:

// isMaster response
case response: Response if RequestId.isMaster accepts response => {
  val nodeSetWasReachable = nodeSet.isReachable
  val primaryWasAvailable = nodeSet.primary.isDefined

But nothing in the actor system has told ReactiveMongo that the old primary is not reachable... as far as nodeSet.primary.isDefined is concerned it's still there. The code that marks a node as Unknown only runs as part of unauthenticate or onPrimaryUnavailable. We've discussed the former already, and the latter is only invoked in the fallback handler, in response to an error.

The only thing left that can update the nodeset is the raw netty socket timeout, which seems to default to 15 minutes (!). The result is that we have a bunch of zombie connections for which the futures added to requiredResponses simply never return, and we have to wait 15 minutes for things to return to normal:

"registering awaiting response for requestID 6831, awaitingResponses: Map(6747 -> AwaitingResponse(6747,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@2bfb3113,false,false), 6748 -> AwaitingResponse(6748,709965763,scala.concurrent.impl.Promise$DefaultPromise@4b552299,false,false), 6749 -> AwaitingResponse(6749,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@5837cee8,false,false), 6750 -> AwaitingResponse(6750,-889200509,scala.concurrent.impl.Promise$DefaultPromise@8e97859,false,false), 6751 -> AwaitingResponse(6751,-246203378,scala.concurrent.impl.Promise$DefaultPromise@7ae8a3fe,false,false), 6752 -> AwaitingResponse(6752,-295274850,scala.concurrent.impl.Promise$DefaultPromise@184b07d0,false,false), 6753 -> AwaitingResponse(6753,-147087934,scala.concurrent.impl.Promise$DefaultPromise@7e6ddabf,false,false), 6754 -> AwaitingResponse(6754,1399428204,scala.concurrent.impl.Promise$DefaultPromise@3d055e9b,false,false), 6755 -> AwaitingResponse(6755,-65195976,scala.concurrent.impl.Promise$DefaultPromise@2cded96,false,false), 6756 -> AwaitingResponse(6756,929324070,scala.concurrent.impl.Promise$DefaultPromise@42780f98,false,false), 6757 -> AwaitingResponse(6757,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@52509d4a,false,false), 6758 -> AwaitingResponse(6758,709965763,scala.concurrent.impl.Promise$DefaultPromise@4536f01d,false,false), 6759 -> AwaitingResponse(6759,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@287a0470,false,false), 6760 -> AwaitingResponse(6760,-889200509,scala.concurrent.impl.Promise$DefaultPromise@2afcda28,false,false), 6761 -> AwaitingResponse(6761,-246203378,scala.concurrent.impl.Promise$DefaultPromise@7c10a8f0,false,false), 6762 -> AwaitingResponse(6762,-295274850,scala.concurrent.impl.Promise$DefaultPromise@e540977,false,false), 6763 -> AwaitingResponse(6763,-147087934,scala.concurrent.impl.Promise$DefaultPromise@7675c549,false,false), 6764 -> AwaitingResponse(6764,1399428204,scala.concurrent.impl.Promise$DefaultPromise@3a946c35,false,false), 6765 -> AwaitingResponse(6765,-65195976,scala.concurrent.impl.Promise$DefaultPromise@433a14bf,false,false), 6766 -> AwaitingResponse(6766,929324070,scala.concurrent.impl.Promise$DefaultPromise@6d0ba21e,false,false), 6767 -> AwaitingResponse(6767,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@350c6d7b,false,false), 6768 -> AwaitingResponse(6768,709965763,scala.concurrent.impl.Promise$DefaultPromise@2c604a2b,false,false), 6769 -> AwaitingResponse(6769,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@3fec3cf4,false,false), 6770 -> AwaitingResponse(6770,-889200509,scala.concurrent.impl.Promise$DefaultPromise@f477245,false,false), 6771 -> AwaitingResponse(6771,-246203378,scala.concurrent.impl.Promise$DefaultPromise@59f6b163,false,false), 6772 -> AwaitingResponse(6772,-295274850,scala.concurrent.impl.Promise$DefaultPromise@77e99964,false,false), 6773 -> AwaitingResponse(6773,-147087934,scala.concurrent.impl.Promise$DefaultPromise@29bc00f1,false,false), 6774 -> AwaitingResponse(6774,1399428204,scala.concurrent.impl.Promise$DefaultPromise@1ddd8ad9,false,false), 6775 -> AwaitingResponse(6775,-65195976,scala.concurrent.impl.Promise$DefaultPromise@714c4da8,false,false), 6776 -> AwaitingResponse(6776,929324070,scala.concurrent.impl.Promise$DefaultPromise@4a55d85c,false,false), 6777 -> AwaitingResponse(6777,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@327e26f,false,false), 6778 -> AwaitingResponse(6778,709965763,scala.concurrent.impl.Promise$DefaultPromise@72dd3b90,false,false), 6779 -> AwaitingResponse(6779,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@61ef096e,false,false), 6780 -> AwaitingResponse(6780,-889200509,scala.concurrent.impl.Promise$DefaultPromise@59cba37,false,false), 6781 -> AwaitingResponse(6781,-246203378,scala.concurrent.impl.Promise$DefaultPromise@39141471,false,false), 6782 -> AwaitingResponse(6782,-295274850,scala.concurrent.impl.Promise$DefaultPromise@220eb978,false,false), 6783 -> AwaitingResponse(6783,-147087934,scala.concurrent.impl.Promise$DefaultPromise@3b33702e,false,false), 6784 -> AwaitingResponse(6784,1399428204,scala.concurrent.impl.Promise$DefaultPromise@5b9e387d,false,false), 6785 -> AwaitingResponse(6785,-65195976,scala.concurrent.impl.Promise$DefaultPromise@21ee4e07,false,false), 6786 -> AwaitingResponse(6786,929324070,scala.concurrent.impl.Promise$DefaultPromise@34afc765,false,false), 6787 -> AwaitingResponse(6787,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@57565cef,false,false), 6788 -> AwaitingResponse(6788,709965763,scala.concurrent.impl.Promise$DefaultPromise@3955c7d0,false,false), 6789 -> AwaitingResponse(6789,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@6a98288b,false,false), 6790 -> AwaitingResponse(6790,-889200509,scala.concurrent.impl.Promise$DefaultPromise@20465e52,false,false), 6791 -> AwaitingResponse(6791,-246203378,scala.concurrent.impl.Promise$DefaultPromise@4592cf91,false,false), 6792 -> AwaitingResponse(6792,-295274850,scala.concurrent.impl.Promise$DefaultPromise@526eebb4,false,false), 6793 -> AwaitingResponse(6793,-147087934,scala.concurrent.impl.Promise$DefaultPromise@79195d02,false,false), 6794 -> AwaitingResponse(6794,1399428204,scala.concurrent.impl.Promise$DefaultPromise@6bbf28bd,false,false), 6795 -> AwaitingResponse(6795,-65195976,scala.concurrent.impl.Promise$DefaultPromise@38828c1c,false,false), 6796 -> AwaitingResponse(6796,929324070,scala.concurrent.impl.Promise$DefaultPromise@1d14e3f3,false,false), 6797 -> AwaitingResponse(6797,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@2e565c1e,false,false), 6798 -> AwaitingResponse(6798,709965763,scala.concurrent.impl.Promise$DefaultPromise@3c3e73a9,false,false), 6799 -> AwaitingResponse(6799,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@1070fa5e,false,false), 6800 -> AwaitingResponse(6800,-889200509,scala.concurrent.impl.Promise$DefaultPromise@2ac198d8,false,false), 6801 -> AwaitingResponse(6801,-246203378,scala.concurrent.impl.Promise$DefaultPromise@366b8732,false,false), 6802 -> AwaitingResponse(6802,-295274850,scala.concurrent.impl.Promise$DefaultPromise@7907196d,false,false), 6803 -> AwaitingResponse(6803,-147087934,scala.concurrent.impl.Promise$DefaultPromise@6eb2ae19,false,false), 6804 -> AwaitingResponse(6804,1399428204,scala.concurrent.impl.Promise$DefaultPromise@4893ec4f,false,false), 6805 -> AwaitingResponse(6805,-65195976,scala.concurrent.impl.Promise$DefaultPromise@22abb078,false,false), 6806 -> AwaitingResponse(6806,929324070,scala.concurrent.impl.Promise$DefaultPromise@63a61931,false,false), 6807 -> AwaitingResponse(6807,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@181fb13a,false,false), 6808 -> AwaitingResponse(6808,709965763,scala.concurrent.impl.Promise$DefaultPromise@4fb9d7e2,false,false), 6809 -> AwaitingResponse(6809,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@324e7e46,false,false), 6810 -> AwaitingResponse(6810,-889200509,scala.concurrent.impl.Promise$DefaultPromise@4bf443c1,false,false), 6811 -> AwaitingResponse(6811,-246203378,scala.concurrent.impl.Promise$DefaultPromise@618efdb6,false,false), 6812 -> AwaitingResponse(6812,-295274850,scala.concurrent.impl.Promise$DefaultPromise@4640b65b,false,false), 6813 -> AwaitingResponse(6813,-147087934,scala.concurrent.impl.Promise$DefaultPromise@7bd4c9b,false,false), 6814 -> AwaitingResponse(6814,1399428204,scala.concurrent.impl.Promise$DefaultPromise@6ebeede6,false,false), 6815 -> AwaitingResponse(6815,-65195976,scala.concurrent.impl.Promise$DefaultPromise@784bdeca,false,false), 6816 -> AwaitingResponse(6816,929324070,scala.concurrent.impl.Promise$DefaultPromise@3bfa4df6,false,false), 6817 -> AwaitingResponse(6817,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@56ce0786,false,false), 6818 -> AwaitingResponse(6818,709965763,scala.concurrent.impl.Promise$DefaultPromise@306e861d,false,false), 6819 -> AwaitingResponse(6819,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@63526eb0,false,false), 6820 -> AwaitingResponse(6820,-889200509,scala.concurrent.impl.Promise$DefaultPromise@6218581,false,false), 6821 -> AwaitingResponse(6821,-246203378,scala.concurrent.impl.Promise$DefaultPromise@7a64460,false,false), 6822 -> AwaitingResponse(6822,-295274850,scala.concurrent.impl.Promise$DefaultPromise@433c64e7,false,false), 6823 -> AwaitingResponse(6823,-147087934,scala.concurrent.impl.Promise$DefaultPromise@336ba5de,false,false), 6824 -> AwaitingResponse(6824,1399428204,scala.concurrent.impl.Promise$DefaultPromise@396c23bf,false,false), 6825 -> AwaitingResponse(6825,-65195976,scala.concurrent.impl.Promise$DefaultPromise@c5ce5b6,false,false), 6826 -> AwaitingResponse(6826,929324070,scala.concurrent.impl.Promise$DefaultPromise@677b1747,false,false), 6827 -> AwaitingResponse(6827,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@4af0fafc,false,false), 6828 -> AwaitingResponse(6828,709965763,scala.concurrent.impl.Promise$DefaultPromise@5071c80,false,false), 6829 -> AwaitingResponse(6829,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@50c33837,false,false), 6830 -> AwaitingResponse(6830,-889200509,scala.concurrent.impl.Promise$DefaultPromise@6f3ddc47,false,false), 6831 -> AwaitingResponse(6831,-246203378,scala.concurrent.impl.Promise$DefaultPromise@4f061164,false,false))


Fun times. So our question right now is, how best to fix this? 

As far as we can tell we have a couple of options:
  • Implement some kind of node heartbeat similar to the official client http://api.mongodb.org/java/current/com/mongodb/MongoClientOptions.html
  • Improve nodeset handling so that it will deal with a new master arriving before it's aware of a problem with the old one. (I don't know we'd deal with this though, without simply setting the old master to 'unknown')
  • Something else entirely?
We are prepared to do the work and send a PR given the effect it's having on us but want to make sure it's aligned with the thoughts of the contributors. 

Thanks in advance, Pete
 

Cédric Chantepie

unread,
Mar 21, 2016, 5:15:38 PM3/21/16
to ReactiveMongo - http://reactivemongo.org
 Hi,

First it's required to indicate what are the used versions, and which options you have tried (e.g. keepAlive : http://reactivemongo.org/releases/0.11/documentation/tutorial/connect-database.html ).

Best regards

Pete Smith

unread,
Mar 22, 2016, 7:55:36 AM3/22/16
to ReactiveMongo - http://reactivemongo.org
Hi again,

We've tried setting rm.keepalive=true on the connection string just now but the behavior is the same. We're currently testing against 0.11.5.

Pete

Pete Smith

unread,
Mar 22, 2016, 7:57:00 AM3/22/16
to ReactiveMongo - http://reactivemongo.org
Our connection string looks like this with the keepalive added:

"mongodb://xxxxx:27017,yyyyy:27017,zzzzz:27017/dbname?rm.keepAlive=true"

Cédric Chantepie

unread,
Mar 24, 2016, 4:26:33 AM3/24/16
to ReactiveMongo - http://reactivemongo.org

Pete Smith

unread,
Mar 24, 2016, 5:06:33 AM3/24/16
to ReactiveMongo - http://reactivemongo.org
Hi Cedric,

Actually this was the starting point for our investigations (Charles is my colleague here :) )... unfortunately the approach we took here had some faults, leading to frequent connection churn regardless of activity, and so we have gone back to the drawing board.

Which of the initially mentioned approaches would be better for us to spike out and send as a PR? Or do you have something entirely different you would like to try?


Pete

Cédric Chantepie

unread,
Mar 25, 2016, 10:39:56 AM3/25/16
to ReactiveMongo - http://reactivemongo.org
Some expiration mechanism could be based on the pingInfo.lastIsMasterTime of the nodes, by checking it in response to the IsMaster sent regularly by the refreshAllJob.
If a node hasn't been refreshed for some time, it can be decided it is unavailable.

Some effort to setup a test env for such case (using netem or vaurien) could help to validate any solution.

Waldemar Wosiński

unread,
Jan 16, 2017, 10:36:57 AM1/16/17
to ReactiveMongo - http://reactivemongo.org
Hi Cédric,

I have a concern at: core/actors.scala:1155 :

} else if (node.pingInfo.lastIsMasterId >= PingInfo.pingTimeout) {

Types do not match. On left we have some kind of identity (0,999). On right we have a measure of time (60 * 1000). This condition is always false.

How this "if block" should work?

Maybe it should be:
node.pingInfo.lastIsMasterTime instead of
node.pingInfo.lastIsMasterId?



Waldemar Wosiński
Reply all
Reply to author
Forward
0 new messages