Concurrent updates of a single document

22 views
Skip to first unread message

Kilic Ali-Firat

unread,
Jun 15, 2017, 3:29:12 AM6/15/17
to ReactiveMongo - http://reactivemongo.org
Hi, 

I am facing a rather strange problem about concurrent updates of a document. I'm using  play2-reactivemongo_2.11 (0.12.3) and my mongodb server config is a replica set. I I have the following JSON document : 

{
 
"recordKey": "FOO",
 
"channels": [{
 
"id": "CH1",
 
"blocks": []
 
}, {
 
"id": "CH2",
 
"blocks": []
 
}]
}

In the context of my app, I can have multiple updates of the same document. I can add new blocks for the channel called CH1 or CH2.  To achieve that, I wrote the following code : 

case class MongoIO
(
  mongoConnection
: Try[MongoConnection],
  mongoDBName
: String,
  mongoRecordsCollectionName
: String,
  mongoEventsCollectionName
: String
) extends MetadataStorage with LazyLogging {


 
def futureDB =
   
Database.getFutureDatabaseOpt(mongoConnection, mongoDBName) match {
     
case None => throw new Exception("Cannot get a Future DB")
     
case Some(futureDB) => futureDB
   
}

 
def futureRecordsCollection =
   
Database.getFutureCollection(futureDB, mongoRecordsCollectionName)


 
def futureEventsCollection =
   
Database.getFutureCollection(futureDB, mongoEventsCollectionName)


override def addChannelChunks
 
(
    recordKey
: String,
    newChannelChunks
: Map[String, List[BFFBlock]]
 
): Future[Boolean] = {
    val futurePushStatus
=
      newChannelChunks
.map { case (channelId, newChunks) =>
       
this.pushNewChunks(recordKey, channelId, newChunks)
     
}
   
Future
     
.sequence(futurePushStatus)
     
.map { updateResults =>
        logger
.debug(
          s
"""
            | ******************************************************************
            | Update results : ${updateResults.mkString("
---")}
            | ******************************************************************
          """
.stripMargin)
        updateResults
.forall(identity(_))
     
}
 
}


private def pushNewChunks
 
(
    recordKey
: String,
    channelId
: String,
    chunks
: List[BFFBlock]
 
) : Future[Boolean] = {
   
this
     
.futureRecordsCollection
     
.flatMap { collection =>
        logger
.debug(
          s
"""
            | ******************************************************************
            | Will push blocks : ${chunks} for record $recordKey and channel $channelId
            | ******************************************************************
          """
.stripMargin)
        val recordKeyMatchRequest
= Json.obj("recordKey" -> recordKey)
        val idMatchRequest
= Json.obj("channels.id" -> channelId)
        val andRequest
=
         
Json.obj("$and" -> List(recordKeyMatchRequest, idMatchRequest))
        val addChunkRequest
=
         
Json.obj("$addToSet" ->
           
Json.obj("channels.$.blocks" ->
             
Json.obj("$each" -> chunks)))


          collection
         
.findAndUpdate(andRequest, addChunkRequest, fetchNewObject = true)
         
.map { updateResult =>
            updateResult
.result[BFFRecord] match {
             
case None =>
                logger
.error(
                  s
"""
                    | **********************************************************
                    | Error when update blocks index of channel $channelId of record $recordKey
                    | Trace : ${updateResult.lastError}
                    | **********************************************************
                  """
.stripMargin)
               
false
             
case Some(_) => true
           
}
         
}
     
}
 
}
}

Reading the mongodb document about atomic updates,  using the operator addToSet must be enough to have a final document updated with all the requested that I did. 

This above code is tested and the tests didn't failed. Also, this code is part of a library that I'm imported in another project and it's in this project that I meeting the concurrent updates issues. 

This code is used in a context of a Akka Cluster. I have two nodes instantiating this class (MongoIO) and using it for document updates. 

When the function addChannelChunks have finished to update the document, all the update results returns the updated object (reading code on github, it means that the update has been successful) but when I'm  reading the JSON document, this last is not consistent. 

I have some blocks missing in the JSON (never the same blocks are missing). We have used wireshark to be sure that the request is send to the server and it does. So I have the following questions :

  1. It is safe like in my code to have parallel update using Future.sequence ? 
  2. I get a successful response from the server but inconsistent document, is the mongodb config can be the cause ? 
  3. Do you particulier advices when making parallel updates of a single document ? 
Many thanks for your future responses !  


Cédric Chantepie

unread,
Jun 15, 2017, 3:51:17 AM6/15/17
to ReactiveMongo - http://reactivemongo.org
Hi,

The driver doesn't add transaction/lock semantics over the MongoDB protocol, so if the app can raise concurrency issue, the app must manage it.
There is no generic solution as it depends on the model.

Best regards

Kilic Ali-Firat

unread,
Jun 15, 2017, 6:35:42 AM6/15/17
to ReactiveMongo - http://reactivemongo.org
Maybe I found a solution but I want to know if the last version of reactivemongo (0.12.3)  supports the following functions : 

  1. updateOne
  2. bulkWrite
I want to group all my push requests to one using the above functions. 

Cédric Chantepie

unread,
Jun 15, 2017, 7:35:17 AM6/15/17
to ReactiveMongo - http://reactivemongo.org
Those are shell methods, not MongoDB commands.

Kilic Ali-Firat

unread,
Jun 15, 2017, 10:49:35 AM6/15/17
to ReactiveMongo - http://reactivemongo.org
There is something really weird and I'm not sure if it is related to the ReactiveMongoDriver or Akka. 

I tested my library with a simple Akka application (one node, not remote Actor at all) and it's perfectly works. 

But in a context of a Akka Cluster (2 of my 3 nodes creates a new MongoDriver to the same endpoint), I have inconsistent documents. 

Cédric Chantepie

unread,
Jun 15, 2017, 12:36:14 PM6/15/17
to ReactiveMongo - http://reactivemongo.org
ReactiveMongo is using its own (local) ActorSystem, not using remoting/Akka cluster.
Reply all
Reply to author
Forward
0 new messages