Hi there,
We are trying to figure out the best way to address an issue we have found when using ReactiveMongo with a cross-datacentre cluster.
Each datacentre is connected via a VPN. We have recreated this issue locally via a three node vagrant cluster. This consists of two active nodes and one arbiter, arranged so that they can only communicate when ipsec (strongswan) is running.
Normally when a primary node is lost and an election occurs... a ChannelClosed message is raised, which results in all awaitingResponses for that channel to be failed with a GenericDriverException("socket disconnected"). The node is then unauthenticated, leaving it in Unknown status with all connections for that channel removed.
channelUnavailable match {
case _: ChannelClosed => updateNodeSet(
nodeSet.updateNodeByChannelId(channelId) { node =>
unauthenticate(node, node.connections.
filter(_.channel.getId != channelId))
})
Once this is all finished, we see a log message like this:
ConnectAll Job running... Status: Node[xxxxx:27017: Unknown (0/10 available connections), latency=21], auth=Set() | Node[yyyyy:27017: Primary (10/10 available connections), latency=7], auth=Set() | Node[zzzzz:27017: NonQueryableUnknownStatus (10/10 available connections), latency=12], auth=Set()
Our problem comes when connection to the primary is because of a VPN outage. In this case, no ChannelClosed is raised (possibly as there is no reset packet sent), and so the node is not unauthenticated and the actor model is not aware that the connections are no longer working. A re-election does occur, but instead of the usual ConnectAll log message we see:
ConnectAll Job running... Status: Node[xxxxx:27017: Primary (10/10 available connections), latency=21], auth=Set() | Node[yyyyy:27017: Primary (10/10 available connections), latency=7], auth=Set() | Node[zzzzz:27017: NonQueryableUnknownStatus (10/10 available connections), latency=7], auth=Set()
Two primaries?? That can't be right! What has happened is that an isMaster response has come back from the failover node, and so the nodeSet has been updated:
// isMaster response
case response: Response if RequestId.isMaster accepts response => {
val nodeSetWasReachable = nodeSet.isReachable
val primaryWasAvailable = nodeSet.primary.isDefined
But nothing in the actor system has told ReactiveMongo that the old primary is not reachable... as far as
nodeSet.primary.isDefined is concerned it's still there. The code that marks a node as
Unknown only runs as part of
unauthenticate or
onPrimaryUnavailable. We've discussed the former already, and the latter is only invoked in the fallback handler, in response to an error.
The only thing left that can update the nodeset is the raw netty socket timeout, which seems to default to 15 minutes (!). The result is that we have a bunch of zombie connections for which the futures added to requiredResponses simply never return, and we have to wait 15 minutes for things to return to normal:
"registering awaiting response for requestID 6831, awaitingResponses: Map(6747 -> AwaitingResponse(6747,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@2bfb3113,false,false), 6748 -> AwaitingResponse(6748,709965763,scala.concurrent.impl.Promise$DefaultPromise@4b552299,false,false), 6749 -> AwaitingResponse(6749,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@5837cee8,false,false), 6750 -> AwaitingResponse(6750,-889200509,scala.concurrent.impl.Promise$DefaultPromise@8e97859,false,false), 6751 -> AwaitingResponse(6751,-246203378,scala.concurrent.impl.Promise$DefaultPromise@7ae8a3fe,false,false), 6752 -> AwaitingResponse(6752,-295274850,scala.concurrent.impl.Promise$DefaultPromise@184b07d0,false,false), 6753 -> AwaitingResponse(6753,-147087934,scala.concurrent.impl.Promise$DefaultPromise@7e6ddabf,false,false), 6754 -> AwaitingResponse(6754,1399428204,scala.concurrent.impl.Promise$DefaultPromise@3d055e9b,false,false), 6755 -> AwaitingResponse(6755,-65195976,scala.concurrent.impl.Promise$DefaultPromise@2cded96,false,false), 6756 -> AwaitingResponse(6756,929324070,scala.concurrent.impl.Promise$DefaultPromise@42780f98,false,false), 6757 -> AwaitingResponse(6757,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@52509d4a,false,false), 6758 -> AwaitingResponse(6758,709965763,scala.concurrent.impl.Promise$DefaultPromise@4536f01d,false,false), 6759 -> AwaitingResponse(6759,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@287a0470,false,false), 6760 -> AwaitingResponse(6760,-889200509,scala.concurrent.impl.Promise$DefaultPromise@2afcda28,false,false), 6761 -> AwaitingResponse(6761,-246203378,scala.concurrent.impl.Promise$DefaultPromise@7c10a8f0,false,false), 6762 -> AwaitingResponse(6762,-295274850,scala.concurrent.impl.Promise$DefaultPromise@e540977,false,false), 6763 -> AwaitingResponse(6763,-147087934,scala.concurrent.impl.Promise$DefaultPromise@7675c549,false,false), 6764 -> AwaitingResponse(6764,1399428204,scala.concurrent.impl.Promise$DefaultPromise@3a946c35,false,false), 6765 -> AwaitingResponse(6765,-65195976,scala.concurrent.impl.Promise$DefaultPromise@433a14bf,false,false), 6766 -> AwaitingResponse(6766,929324070,scala.concurrent.impl.Promise$DefaultPromise@6d0ba21e,false,false), 6767 -> AwaitingResponse(6767,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@350c6d7b,false,false), 6768 -> AwaitingResponse(6768,709965763,scala.concurrent.impl.Promise$DefaultPromise@2c604a2b,false,false), 6769 -> AwaitingResponse(6769,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@3fec3cf4,false,false), 6770 -> AwaitingResponse(6770,-889200509,scala.concurrent.impl.Promise$DefaultPromise@f477245,false,false), 6771 -> AwaitingResponse(6771,-246203378,scala.concurrent.impl.Promise$DefaultPromise@59f6b163,false,false), 6772 -> AwaitingResponse(6772,-295274850,scala.concurrent.impl.Promise$DefaultPromise@77e99964,false,false), 6773 -> AwaitingResponse(6773,-147087934,scala.concurrent.impl.Promise$DefaultPromise@29bc00f1,false,false), 6774 -> AwaitingResponse(6774,1399428204,scala.concurrent.impl.Promise$DefaultPromise@1ddd8ad9,false,false), 6775 -> AwaitingResponse(6775,-65195976,scala.concurrent.impl.Promise$DefaultPromise@714c4da8,false,false), 6776 -> AwaitingResponse(6776,929324070,scala.concurrent.impl.Promise$DefaultPromise@4a55d85c,false,false), 6777 -> AwaitingResponse(6777,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@327e26f,false,false), 6778 -> AwaitingResponse(6778,709965763,scala.concurrent.impl.Promise$DefaultPromise@72dd3b90,false,false), 6779 -> AwaitingResponse(6779,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@61ef096e,false,false), 6780 -> AwaitingResponse(6780,-889200509,scala.concurrent.impl.Promise$DefaultPromise@59cba37,false,false), 6781 -> AwaitingResponse(6781,-246203378,scala.concurrent.impl.Promise$DefaultPromise@39141471,false,false), 6782 -> AwaitingResponse(6782,-295274850,scala.concurrent.impl.Promise$DefaultPromise@220eb978,false,false), 6783 -> AwaitingResponse(6783,-147087934,scala.concurrent.impl.Promise$DefaultPromise@3b33702e,false,false), 6784 -> AwaitingResponse(6784,1399428204,scala.concurrent.impl.Promise$DefaultPromise@5b9e387d,false,false), 6785 -> AwaitingResponse(6785,-65195976,scala.concurrent.impl.Promise$DefaultPromise@21ee4e07,false,false), 6786 -> AwaitingResponse(6786,929324070,scala.concurrent.impl.Promise$DefaultPromise@34afc765,false,false), 6787 -> AwaitingResponse(6787,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@57565cef,false,false), 6788 -> AwaitingResponse(6788,709965763,scala.concurrent.impl.Promise$DefaultPromise@3955c7d0,false,false), 6789 -> AwaitingResponse(6789,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@6a98288b,false,false), 6790 -> AwaitingResponse(6790,-889200509,scala.concurrent.impl.Promise$DefaultPromise@20465e52,false,false), 6791 -> AwaitingResponse(6791,-246203378,scala.concurrent.impl.Promise$DefaultPromise@4592cf91,false,false), 6792 -> AwaitingResponse(6792,-295274850,scala.concurrent.impl.Promise$DefaultPromise@526eebb4,false,false), 6793 -> AwaitingResponse(6793,-147087934,scala.concurrent.impl.Promise$DefaultPromise@79195d02,false,false), 6794 -> AwaitingResponse(6794,1399428204,scala.concurrent.impl.Promise$DefaultPromise@6bbf28bd,false,false), 6795 -> AwaitingResponse(6795,-65195976,scala.concurrent.impl.Promise$DefaultPromise@38828c1c,false,false), 6796 -> AwaitingResponse(6796,929324070,scala.concurrent.impl.Promise$DefaultPromise@1d14e3f3,false,false), 6797 -> AwaitingResponse(6797,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@2e565c1e,false,false), 6798 -> AwaitingResponse(6798,709965763,scala.concurrent.impl.Promise$DefaultPromise@3c3e73a9,false,false), 6799 -> AwaitingResponse(6799,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@1070fa5e,false,false), 6800 -> AwaitingResponse(6800,-889200509,scala.concurrent.impl.Promise$DefaultPromise@2ac198d8,false,false), 6801 -> AwaitingResponse(6801,-246203378,scala.concurrent.impl.Promise$DefaultPromise@366b8732,false,false), 6802 -> AwaitingResponse(6802,-295274850,scala.concurrent.impl.Promise$DefaultPromise@7907196d,false,false), 6803 -> AwaitingResponse(6803,-147087934,scala.concurrent.impl.Promise$DefaultPromise@6eb2ae19,false,false), 6804 -> AwaitingResponse(6804,1399428204,scala.concurrent.impl.Promise$DefaultPromise@4893ec4f,false,false), 6805 -> AwaitingResponse(6805,-65195976,scala.concurrent.impl.Promise$DefaultPromise@22abb078,false,false), 6806 -> AwaitingResponse(6806,929324070,scala.concurrent.impl.Promise$DefaultPromise@63a61931,false,false), 6807 -> AwaitingResponse(6807,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@181fb13a,false,false), 6808 -> AwaitingResponse(6808,709965763,scala.concurrent.impl.Promise$DefaultPromise@4fb9d7e2,false,false), 6809 -> AwaitingResponse(6809,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@324e7e46,false,false), 6810 -> AwaitingResponse(6810,-889200509,scala.concurrent.impl.Promise$DefaultPromise@4bf443c1,false,false), 6811 -> AwaitingResponse(6811,-246203378,scala.concurrent.impl.Promise$DefaultPromise@618efdb6,false,false), 6812 -> AwaitingResponse(6812,-295274850,scala.concurrent.impl.Promise$DefaultPromise@4640b65b,false,false), 6813 -> AwaitingResponse(6813,-147087934,scala.concurrent.impl.Promise$DefaultPromise@7bd4c9b,false,false), 6814 -> AwaitingResponse(6814,1399428204,scala.concurrent.impl.Promise$DefaultPromise@6ebeede6,false,false), 6815 -> AwaitingResponse(6815,-65195976,scala.concurrent.impl.Promise$DefaultPromise@784bdeca,false,false), 6816 -> AwaitingResponse(6816,929324070,scala.concurrent.impl.Promise$DefaultPromise@3bfa4df6,false,false), 6817 -> AwaitingResponse(6817,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@56ce0786,false,false), 6818 -> AwaitingResponse(6818,709965763,scala.concurrent.impl.Promise$DefaultPromise@306e861d,false,false), 6819 -> AwaitingResponse(6819,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@63526eb0,false,false), 6820 -> AwaitingResponse(6820,-889200509,scala.concurrent.impl.Promise$DefaultPromise@6218581,false,false), 6821 -> AwaitingResponse(6821,-246203378,scala.concurrent.impl.Promise$DefaultPromise@7a64460,false,false), 6822 -> AwaitingResponse(6822,-295274850,scala.concurrent.impl.Promise$DefaultPromise@433c64e7,false,false), 6823 -> AwaitingResponse(6823,-147087934,scala.concurrent.impl.Promise$DefaultPromise@336ba5de,false,false), 6824 -> AwaitingResponse(6824,1399428204,scala.concurrent.impl.Promise$DefaultPromise@396c23bf,false,false), 6825 -> AwaitingResponse(6825,-65195976,scala.concurrent.impl.Promise$DefaultPromise@c5ce5b6,false,false), 6826 -> AwaitingResponse(6826,929324070,scala.concurrent.impl.Promise$DefaultPromise@677b1747,false,false), 6827 -> AwaitingResponse(6827,-1409711785,scala.concurrent.impl.Promise$DefaultPromise@4af0fafc,false,false), 6828 -> AwaitingResponse(6828,709965763,scala.concurrent.impl.Promise$DefaultPromise@5071c80,false,false), 6829 -> AwaitingResponse(6829,-1939591216,scala.concurrent.impl.Promise$DefaultPromise@50c33837,false,false), 6830 -> AwaitingResponse(6830,-889200509,scala.concurrent.impl.Promise$DefaultPromise@6f3ddc47,false,false), 6831 -> AwaitingResponse(6831,-246203378,scala.concurrent.impl.Promise$DefaultPromise@4f061164,false,false))
Fun times. So our question right now is, how best to fix this?
As far as we can tell we have a couple of options:
- Implement some kind of node heartbeat similar to the official client http://api.mongodb.org/java/current/com/mongodb/MongoClientOptions.html
- Improve nodeset handling so that it will deal with a new master arriving before it's aware of a problem with the old one. (I don't know we'd deal with this though, without simply setting the old master to 'unknown')
- Something else entirely?
We are prepared to do the work and send a PR given the effect it's having on us but want to make sure it's aligned with the thoughts of the contributors.
Thanks in advance, Pete