override def storeData(accountNumber: Int, data: DataMessage) = {
val document = BSONDocument(
"accountNumber" -> accountNumber,
"recordedTime" -> data.recordedTime,
"headerDataPairs" -> data.headerDataPairs.map(hdp => {
BSONDocument("header" -> hdp.header, "data" -> hdp.values)
}))
val future: Future[WriteResult] = collection.insert(document)
val handlingError: Future[Either[String, DataMessage]] = future.map {
case writeResult if (writeResult.ok) => Right(data)
case writeResult => Left(writeResult.errmsg.get)
}
handlingError
}
override def storeData(accountNumber: Int, container: DataMessageContainer) = {
val documents: List[BSONDocument] = container.data.map { dataMessage =>
BSONDocument(
"accountNumber" -> accountNumber,
"recordedTime" -> dataMessage.recordedTime,
"headerDataPairs" -> dataMessage.headerDataPairs.map(hdp => {
BSONDocument("header" -> hdp.header, "data" -> hdp.values)
})
)
}
val stream = documents.toStream
val future: Future[MultiBulkWriteResult] = collection.bulkInsert(stream, ordered = false)
val handlingError: Future[Either[String, DataMessageContainer]] = future.map {
case writeResult if (writeResult.ok) => Right(container)
case writeResult => Left(writeResult.errmsg.get)
}
handlingError
}
Do operations other than bulkInsert are working? Is there any different in the connection management in this case?
val collection = connect
def connect = {
// (creates an actor system)
val driver = new MongoDriver
val connection = driver.connection(List("localhost"))
// Gets a reference to the database "plugin"
val db = connection("plugin")
// Gets a reference to the collection "acoll"
// By default, you get a BSONCollection.
db("acoll")
}
val havingMetadata = Failover2(db.connection, failoverStrategy) { () =>
metadata.map(Future.successful).getOrElse(Future.failed(ConnectionNotInitialized.MissingMetadata))
}
$ mkdir data
$ /usr/bin/mongod --dbpath data
val collection = connect
def connect = {
val timeout = 10 seconds
val DefaultOptions = {
val opts = MongoConnectionOptions()
if (Option(System getProperty "test.enableSSL").exists(_ == "true")) {
opts.copy(sslEnabled = true, sslAllowsInvalidCert = true)
} else opts
}
lazy val driver = new MongoDriver
lazy val connection = driver.connection(
List("localhost:27017"), DefaultOptions)
val db = {
val _db = connection("newcoll")
Await.ready(_db.drop, timeout)
_db
}
db.collection("acollection")
}
object DataMessageWriter extends BSONDocumentWriter[(Int, DataMessage)] {
def write(p: (Int, DataMessage)): BSONDocument =
BSONDocument(
"accountNumber" -> p._1,
"recordedTime" -> p._2.recordedTime,
"headerDataPairs" -> p._2.headerDataPairs.map(hdp => {
BSONDocument("header" -> hdp.header, "data" -> hdp.values)
}))
}
override def storeData(accountNumber: Int, container: DataMessageContainer) = {
implicit val dataMessageWriter = DataMessageWriter
val documents = container.data.map((accountNumber, _)).map(implicitly[collection.ImplicitlyDocumentProducer](_))
val future: Future[MultiBulkWriteResult] = collection.bulkInsert(true)(documents: _*)
val handlingError: Future[Either[String, DataMessageContainer]] = future.map {
case writeResult if (writeResult.ok) => Right(container)
case writeResult => Left(writeResult.errmsg.get)
}
handlingError
}
Failover2(db.connection, failoverStrategy) { () => /* ... */ }.future
val bulkRes = Failover2(db.connection, FailoverStrategy()) { () => collection.bulkInsert(ordered = false)(documents: _*) }.future