Mongodb Hadoop Connector and Spark

139 views
Skip to first unread message

Marcus Rehm

unread,
Jul 7, 2015, 6:24:23 PM7/7/15
to mongod...@googlegroups.com
Hi all,

I'm trying to insert some documents in mongodb using hadoop connector with Spark (using Python). 
By now, I am able to connect to mongo and get the collection. My problem is that I can't find a way to insert the document and send it back to database. Actually, the collection is sent back but it is empty without the document I tryed to insert.

Could anyone help me with this? 

Thanks,
Marcus

Below is the code I'm using:

__author__ = 'marcusrehm'

from pyspark import SparkContext

import xmltodict, json

nfce = open('D:/35150400776574000741550030073022671331273267.xml','r')
o = xmltodict.parse(nfce)

# set up parameters for reading from MongoDB via Hadoop input format
config = {"mongo.input.uri": "mongodb://ds.mongolab.com:39250/td.nf",
"mongo.input.split.create_input_splits": "false",
"mongo.output.uri": "mongodb://ds.mongolab.com:39250/td.nf"}
inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"
outputFormatClassName = "com.mongodb.hadoop.MongoOutputFormat"
# these values worked but others might as well
keyClassName = "org.apache.hadoop.io.Text"
valueClassName = "org.apache.hadoop.io.MapWritable"

# read the nfc-e from MongoDB into Spark RDD format
sc = SparkContext()
nfRDD = sc.newAPIHadoopRDD(inputFormatClassName, keyClassName, valueClassName, None, None, config)
notasRDD.map(None, o)
notasRDD.saveAsNewAPIHadoopFile("file:///placeholder", outputFormatClassName, None, None, None, None, config)

Luke Lovett

unread,
Jul 13, 2015, 2:00:47 PM7/13/15
to mongod...@googlegroups.com
Hi Marcus,

I see that your input and output MongoDB URIs are identical. Is that intentional? Are you saying that this collection was used as input, then all the documents were erased after running the spark job, or are actually using two separate input/output collections?

Can you include the log output from running the spark job?

You might also try specifying the key and value classes when calling 'saveAsNewAPIHadoopFile'. Since you're not doing any data transormation, you can just use the same key/value classes that you did for the input.

Also, beware that PySpark isn't fully supported by the Hadoop connector (it has trouble with some BSON-specific types like ObjectId). Problems of this kind usually manifest as the job stopping with an Exception like " org.apache.spark.SparkException: Data of type java.util.GregorianCalendar cannot be used," so it doesn't sound as if you're running into this. It's just something of which to be aware.
Reply all
Reply to author
Forward
0 new messages