Hi,
I am facing a rather strange problem about concurrent updates of a document. I'm using play2-reactivemongo_2.11 (0.12.3) and my mongodb server config is a replica set. I I have the following JSON document :
{
"recordKey": "FOO",
"channels": [{
"id": "CH1",
"blocks": []
}, {
"id": "CH2",
"blocks": []
}]
}
In the context of my app, I can have multiple updates of the same document. I can add new blocks for the channel called CH1 or CH2. To achieve that, I wrote the following code :
case class MongoIO
(
mongoConnection : Try[MongoConnection],
mongoDBName : String,
mongoRecordsCollectionName : String,
mongoEventsCollectionName : String
) extends MetadataStorage with LazyLogging {
def futureDB =
Database.getFutureDatabaseOpt(mongoConnection, mongoDBName) match {
case None => throw new Exception("Cannot get a Future DB")
case Some(futureDB) => futureDB
}
def futureRecordsCollection =
Database.getFutureCollection(futureDB, mongoRecordsCollectionName)
def futureEventsCollection =
Database.getFutureCollection(futureDB, mongoEventsCollectionName)
override def addChannelChunks
(
recordKey: String,
newChannelChunks: Map[String, List[BFFBlock]]
): Future[Boolean] = {
val futurePushStatus =
newChannelChunks.map { case (channelId, newChunks) =>
this.pushNewChunks(recordKey, channelId, newChunks)
}
Future
.sequence(futurePushStatus)
.map { updateResults =>
logger.debug(
s"""
| ******************************************************************
| Update results : ${updateResults.mkString("---")}
| ******************************************************************
""".stripMargin)
updateResults.forall(identity(_))
}
}
private def pushNewChunks
(
recordKey : String,
channelId : String,
chunks : List[BFFBlock]
) : Future[Boolean] = {
this
.futureRecordsCollection
.flatMap { collection =>
logger.debug(
s"""
| ******************************************************************
| Will push blocks : ${chunks} for record $recordKey and channel $channelId
| ******************************************************************
""".stripMargin)
val recordKeyMatchRequest = Json.obj("recordKey" -> recordKey)
val idMatchRequest = Json.obj("channels.id" -> channelId)
val andRequest =
Json.obj("$and" -> List(recordKeyMatchRequest, idMatchRequest))
val addChunkRequest =
Json.obj("$addToSet" ->
Json.obj("channels.$.blocks" ->
Json.obj("$each" -> chunks)))
collection
.findAndUpdate(andRequest, addChunkRequest, fetchNewObject = true)
.map { updateResult =>
updateResult.result[BFFRecord] match {
case None =>
logger.error(
s"""
| **********************************************************
| Error when update blocks index of channel $channelId of record $recordKey
| Trace : ${updateResult.lastError}
| **********************************************************
""".stripMargin)
false
case Some(_) => true
}
}
}
}
}
Reading the mongodb document about
atomic updates, using the operator
addToSet must be enough to have a final document updated with all the requested that I did.
This above code is tested and the tests didn't failed. Also, this code is part of a library that I'm imported in another project and it's in this project that I meeting the concurrent updates issues.
This code is used in a context of a Akka Cluster. I have two nodes instantiating this class (MongoIO) and using it for document updates.
When the function addChannelChunks have finished to update the document, all the update results returns the updated object (reading code on github, it means that the update has been successful) but when I'm reading the JSON document, this last is not consistent.
I have some blocks missing in the JSON (never the same blocks are missing). We have used wireshark to be sure that the request is send to the server and it does. So I have the following questions :
- It is safe like in my code to have parallel update using Future.sequence ?
- I get a successful response from the server but inconsistent document, is the mongodb config can be the cause ?
- Do you particulier advices when making parallel updates of a single document ?
Many thanks for your future responses !