One way to implement OCC would be to have a version variable which is incremented on each update (as cited above) and to include the old version number in the update query. Either one record was updated or none. In the latter case we had a concurrent modification.
Currently the mongo record implementation throws away the write result that is returned from the underlying Java driver. I.e. we have no way to implement the OCC algorithm based on current functionality.
Below you'll find two traits (sorry for posting the code inline, but perhaps some of you might be interested):
- OptimisticVersionField should be mixed into your model class. It simply adds the version field.
- OptimisticOperations: This one copies some of the update-methods of MongoMetaRecord to be able to evaluate the WriteResult.
def optimisticUpdate(oid: String, occVersion: Long, updateOperations: JObject): Either[OccUpdateError, UpdatedRowsCount]
You can use your model object or you can express the desired database modifications as a json expression. Both methods return an Either. If it's a Left(error), the update failed. Otherwise we get a Right(1), because we only update the first row. There could be an switch or an additional method to update multiple records.
The code is not heavily tested, but it seems to work. Feel free to use the code, but don't blame me if you loose your data ;-)
import net.liftweb.json.JsonAST.JObject
import net.liftweb.json.JsonDSL._
import net.liftweb.mongodb._
import record.{MongoRecord, MongoMetaRecord}
import com.mongodb.{WriteConcern, WriteResult, DB, DBObject}
trait OptimisticOperations[BaseRecord <: MongoRecord[BaseRecord] with OptimisticVersionField[BaseRecord]] extends MongoMetaRecord[BaseRecord] {
self: BaseRecord =>
/*
* Update document with a DBObject query and write concern as defined in <pre>Globals.SafeWriteConcern</pre>.
*/
protected def optimisticUpdate(qry: DBObject, newobj: DBObject, opts: UpdateOption*): Either[OccUpdateError, UpdatedRowsCount] = {
val dboOpts = opts.toList
MongoDB.use(mongoIdentifier)(db => {
db.getCollection(collectionName).update(
qry,
newobj,
dboOpts.find(_ == Upsert).map(x => true).getOrElse(false),
dboOpts.find(_ == Multi).map(x => true).getOrElse(false),
WriteConcern.SAFE
)
} match {
case wr: WriteResult if (wr.getLastError.ok() && wr.getN == 1) => Right(UpdatedRowsCount(1))
case wr: WriteResult if (wr.getLastError.ok() && wr.getN == 0) => Left(ConcurrentModification)
case wr: WriteResult if !wr.getLastError.ok() => Left(WriteError(wr.getError))
case _ => Left(WriteError("unknown"))
})
}
/*
* Update document with a JObject query and write concern as defined in <pre>Globals.SafeWriteConcern</pre>..
*/
protected def optimisticUpdate(qry: JObject, newobj: JObject, opts: UpdateOption*): Either[OccUpdateError, UpdatedRowsCount] = {
optimisticUpdate(
JObjectParser.parse(qry),
JObjectParser.parse(newobj),
opts: _*
)
}
/**
* Updates the record identified by the given <pre>oid</pre> in the database only if the given <pre>occVersion</pre> is the same.
* <b>In memory copies of the updated record will be outdated after this operation. It is the responsibility of the caller to deal
* with this issue. I.e. the caller should reflect the changes (including an incremented occVersionNumber) after a successful update
* or the record should be reloaded from the database.</b>
*/
def optimisticUpdate(oid: String, occVersion: Long, updateOperations: JObject): Either[OccUpdateError, UpdatedRowsCount] = {
optimisticUpdate(
("_id" -> ("$oid" -> oid)) ~ (occVersionNumber.name -> occVersion),
("$set" -> updateOperations) ~ ("$inc" -> (occVersionNumber.name -> 1))
)
}
/**
* Updates the given record in the database only if the <pre>occVersionNumber</pre> is still the same. When the update was successful,
* the version is incremented and the given record reflects the new database state. <b>If the update fails, then the given record
* does not reflect the state in the database, i.e. you should not continue using that invalid record!</b>
*/
def optimisticUpdate(br: BaseRecord): Either[OccUpdateError, UpdatedRowsCount] = {
val oldVersion = br.occVersionNumber.get
br.occVersionNumber.set(oldVersion + 1)
optimisticUpdate(JObjectParser.parse(("_id" -> ("$oid" -> br.id.toString)) ~ (occVersionNumber.name -> oldVersion)), br.asDBObject)
}
}
abstract sealed class OccUpdateError
case object ConcurrentModification extends OccUpdateError
case class WriteError(msg: String) extends OccUpdateError
final case class UpdatedRowsCount(rows: Int)