ConnectionNotInitialized whilst trying to perform bulkInsert (fine for normal inserts)

611 views
Skip to first unread message

Duncan Watson

unread,
Sep 15, 2015, 2:39:20 AM9/15/15
to ReactiveMongo - http://reactivemongo.org
Hi there,

Would really appreciate some help if someone's encountered this issue before?

I'm using version 0.11.7 of the reactivemongo library with scala 2.11.7, and MongoDB version 3.0.6.

I've got a piece of code that creates a BSONDocument and inserts in with no problems whatsoever - it looks like this:

override def storeData(accountNumber: Int, data: DataMessage) = {
   
    val document
= BSONDocument(
     
"accountNumber" -> accountNumber,
     
"recordedTime" -> data.recordedTime,
     
"headerDataPairs" -> data.headerDataPairs.map(hdp => {
       
BSONDocument("header" -> hdp.header, "data" -> hdp.values)
     
}))
   
    val future
: Future[WriteResult] = collection.insert(document)


    val handlingError
: Future[Either[String, DataMessage]] = future.map {
     
case writeResult if (writeResult.ok) => Right(data)
     
case writeResult => Left(writeResult.errmsg.get)
   
}
       
    handlingError
 
}


However, when I change this code to create multiple BSONDocuments and try to bulkInsert them instead, I get the ConnectionNotInitialized exception of the thread's title.  This amended code looks like this:



override def storeData(accountNumber: Int, container: DataMessageContainer) = {
   
    val documents
: List[BSONDocument] = container.data.map { dataMessage =>
     
BSONDocument(
       
"accountNumber" -> accountNumber,
       
"recordedTime" -> dataMessage.recordedTime,
       
"headerDataPairs" -> dataMessage.headerDataPairs.map(hdp => {
         
BSONDocument("header" -> hdp.header, "data" -> hdp.values)
       
})
     
)
   
}
   
    val stream
= documents.toStream
   
    val future
: Future[MultiBulkWriteResult] = collection.bulkInsert(stream, ordered = false)


    val handlingError
: Future[Either[String, DataMessageContainer]] = future.map {
     
case writeResult if (writeResult.ok) => Right(container)
     
case writeResult => Left(writeResult.errmsg.get)
   
}
       
    handlingError
 
}


I've included a partial stacktrace below, and you can see that the exception is being thrown from the "bulkInsert" call from the code snippet above.

reactivemongo.core.errors.ConnectionNotInitialized: MongoError['Connection is missing metadata (like protocol version, etc.) The connection pool is probably being initialized.']
at reactivemongo.core.errors.ConnectionNotInitialized$.MissingMetadata(errors.scala:67)
at reactivemongo.api.collections.GenericCollection$$anonfun$bulkInsert$7.apply(genericcollection.scala:199)
at reactivemongo.api.collections.GenericCollection$$anonfun$bulkInsert$7.apply(genericcollection.scala:199)
at scala.Option.getOrElse(Option.scala:121)
at reactivemongo.api.collections.GenericCollection$class.bulkInsert(genericcollection.scala:199)
at reactivemongo.api.collections.bson.BSONCollection.bulkInsert(bsoncollection.scala:72)
at reactivemongo.api.collections.GenericCollection$class.bulkInsert(genericcollection.scala:194)
at reactivemongo.api.collections.bson.BSONCollection.bulkInsert(bsoncollection.scala:72)
at com.hiveit.test.manager.MongoDbDataManager.storeData(MongoDbDataManager.scala:61)

Many thanks for any help anyone can give me!
Duncan

Cédric Chantepie

unread,
Sep 15, 2015, 3:33:28 AM9/15/15
to ReactiveMongo - http://reactivemongo.org
Hi,

Do operations other than bulkInsert are working? Is there any different in the connection management in this case?

Duncan Watson

unread,
Sep 15, 2015, 3:44:36 AM9/15/15
to ReactiveMongo - http://reactivemongo.org
Hi Cédric,

Thanks very much for the quick reply.  I should have posted up the connection code in the first place - please find it below.  This is the same connection information that works with single inserts but does not work for bulk inserts. 

  val collection = connect
 
 
def connect = {
   
// (creates an actor system)
    val driver
= new MongoDriver
    val connection
= driver.connection(List("localhost"))
 
   
// Gets a reference to the database "plugin"
    val db
= connection("plugin")
 
   
// Gets a reference to the collection "acoll"
   
// By default, you get a BSONCollection.
    db
("acoll")
 
}

So standard single inserts work with no problem, and reading BSONDocuments also works fine with this connection setup - it's just the bulk insert at the moment that's not working for me.

Many thanks,
Duncan 

Duncan Watson

unread,
Sep 15, 2015, 5:32:53 AM9/15/15
to ReactiveMongo - http://reactivemongo.org
Hi Cédric,

I'm assuming that the offending lines are here in reactivemongo.api.collections.GenericCollection.bulkInsert():

val havingMetadata = Failover2(db.connection, failoverStrategy) { () =>
  metadata
.map(Future.successful).getOrElse(Future.failed(ConnectionNotInitialized.MissingMetadata))
}

I'm assuming then that potentially I need to specify a particular Failover strategy when creating the connection in the first place?

Thanks,
Duncan

cchantep

unread,
Sep 16, 2015, 4:51:26 AM9/16/15
to ReactiveMongo - http://reactivemongo.org
What is the storage engine used? Is the MongoDB server remote?

Duncan Watson

unread,
Sep 16, 2015, 7:39:18 AM9/16/15
to ReactiveMongo - http://reactivemongo.org
Hi,

I don't believe so.  I ran though the steps outlined in the "Step by Step" guide on the main Reactive Mongo page.  So I downloaded the Ubuntu 14.04 64 bit package from https://www.mongodb.org/downloads, then ran:

$ mkdir data
$ /usr/bin/mongod --dbpath data

Then did a sudo service mongo start

Thanks,
Duncan

Duncan Watson

unread,
Sep 16, 2015, 7:55:19 AM9/16/15
to ReactiveMongo - http://reactivemongo.org
Sorry - I should have said that I followed the MongoDB instructions on how to install with apt, found at this page:


Specifically, the instructions showed how to install and then fix the version at 3.0.6.

Many thanks,
Duncan

Cédric Chantepie

unread,
Sep 16, 2015, 10:48:39 AM9/16/15
to ReactiveMongo - http://reactivemongo.org

Duncan Watson

unread,
Sep 16, 2015, 2:04:03 PM9/16/15
to ReactiveMongo - http://reactivemongo.org
Hi again, and again thanks for the assistance,

I took a look at the test code and how it sets up its connection, and was then able to take this connection setup and apply it to my own code.  My new connection code looks like this:

  val collection = connect
 
 
def connect = {

    val timeout
= 10 seconds
 
    val
DefaultOptions = {
      val opts
= MongoConnectionOptions()
 
     
if (Option(System getProperty "test.enableSSL").exists(_ == "true")) {
        opts
.copy(sslEnabled = true, sslAllowsInvalidCert = true)
     
} else opts
   
}
 
    lazy val driver
= new MongoDriver
    lazy val connection
= driver.connection(
     
List("localhost:27017"), DefaultOptions)
 
    val db
= {
      val _db
= connection("newcoll")
     
Await.ready(_db.drop, timeout)
      _db
   
}
   
    db
.collection("acollection")
 
}

The good news is that it was now no longer erroring when bulk inserting!  The bad news is that it seems that a bulk insert using the above code overwrites anything that is currently in the collection, so that there is only ever the last bulk insert's worth of data available in the collection.

Furthermore with the above setup, I am no longer able to read from the collection.  I can use the Mongo shell to confirm that there is data in there (only the last bulkInsert's set of data though) but the query that was previously working now returns nothing.  If I switch back to my original connection settings, read starts to work fine again.

Any ideas most welcome :)

Many thanks,
Duncan

Duncan Watson

unread,
Sep 16, 2015, 2:06:06 PM9/16/15
to ReactiveMongo - http://reactivemongo.org
I should also point out that I rewrote the bulkInsert code as per the test scripts:

  object DataMessageWriter extends BSONDocumentWriter[(Int, DataMessage)] {
   
def write(p: (Int, DataMessage)): BSONDocument =
     
BSONDocument(
       
"accountNumber" -> p._1,
       
"recordedTime" -> p._2.recordedTime,
       
"headerDataPairs" -> p._2.headerDataPairs.map(hdp => {

         
BSONDocument("header" -> hdp.header, "data" -> hdp.values)
       
}))
 
}

   
 
override def storeData(accountNumber: Int, container: DataMessageContainer) = {
   
   
implicit val dataMessageWriter = DataMessageWriter
 
    val documents
= container.data.map((accountNumber, _)).map(implicitly[collection.ImplicitlyDocumentProducer](_))
   
    val future
: Future[MultiBulkWriteResult] = collection.bulkInsert(true)(documents: _*)



    val handlingError
: Future[Either[String, DataMessageContainer]] = future.map {
     
case writeResult if (writeResult.ok) => Right(container)
     
case writeResult => Left(writeResult.errmsg.get)
   
}
       
    handlingError
 
}

Thanks,
Duncan

Cédric Chantepie

unread,
Sep 23, 2015, 6:42:07 AM9/23/15
to ReactiveMongo - http://reactivemongo.org
Do you still have errors with this code?

Christian Linne

unread,
Sep 24, 2015, 12:53:40 PM9/24/15
to ReactiveMongo - http://reactivemongo.org
I've faced the same issue on windows (10) as well.

Seems to be related to the different behavior of insert and bulkInsert.

Insert wraps the metadata check and the write request itself within

Failover2(db.connection, failoverStrategy) { () => /* ... */ }.future

whereas bulkInsert does not. When using this wrapper manually, like in 

val bulkRes = Failover2(db.connection, FailoverStrategy()) { () =>
    collection.bulkInsert(ordered = false)(documents: _*)
  }.future

everything works fine. However, I don't think this is intended.

Duncan Watson

unread,
Sep 25, 2015, 4:00:05 AM9/25/15
to ReactiveMongo - http://reactivemongo.org
Hi Christian,

Thanks for your reply - yes, I'd noticed that they were written differently but had assumed that they boiled down to the same functionality at the end of the day.  I guess not however!

Have you experienced any issues with bulk inserting in the way that you described, by wrapping it manually?  If not, I may give it a try.  I'm using Ubuntu 14.04 LTS by the way.

Thanks again,
Duncan
Reply all
Reply to author
Forward
0 new messages