Using Spark to read from a database in parallel

2,250 views
Skip to first unread message

Oliver Senn

unread,
Mar 28, 2013, 1:03:03 AM3/28/13
to spark...@googlegroups.com
Hi all,

Like Spark a lot, guys! But I am new to it, so I recently came across a question: 
I'd like to use Spark to read from a database in parallel, more specifically to
read all records of the database and do a group-by some field (I wouldn't 
do the group-by in the database since it's really huge)

Would that be something Spark can be used for and if so, how?
The problem is that I can't read the whole input first and then process it
in parallel but I would somehow had to read the input itself in parallel in
small batches and process it.

How would I best do this?

Thanks so much.

Cheers,

Oliver

MLnick

unread,
Mar 28, 2013, 10:04:53 AM3/28/13
to spark...@googlegroups.com
Hi

If you can partition the primary key of the table such that each Spark partition reads a subset of the PK range then this would work. You can look at using the mapWith method (in master branch, not released yet) to accomplish this. But that means you'll need to write a function to partition the PK range yourself (can be hash-partitioned or timestamp ranges etc) and generate SQL to read the appropriate range in each partition.

If you are using HDFS and Hadoop already, I'd recommend that you use sqoop (http://sqoop.apache.org/) to first pull the table from the DB to HDFS, and then process it using Spark using sc.textFile or sc.hadoopFile.

(Actually this makes me think that a Spark-based sqoop would be pretty cool...)

Oliver Senn

unread,
Mar 31, 2013, 8:56:08 PM3/31/13
to spark...@googlegroups.com
Hi,

Thanks sounds good. I tried actually exactly your proposal (not sqoop since I am using MongoDB as a store) but I am having trouble with the intermediate results not being persisted to memory
or disk. Will probably post the source code later.

But thanks for the suggestions, and +1 for a spark-sqoop :).

Best,
O

Oliver Senn

unread,
Apr 2, 2013, 2:11:21 AM4/2/13
to spark...@googlegroups.com
Hi all,

I tried implementing a little example which calculates the distinct values in a MongoDB database field. The database is large (> 250M records)
and the problem with my code is that the result of the map phase are not persisted to disc. When running it, I never get any log statements from the 
BlockManagerMaster about persisting anything. I guess I do something wrong here - conceptually?

A simplified version of the code is attached.

Help is appreciated :).

O
MongoDistinctField.java

MLnick

unread,
Apr 2, 2013, 5:41:42 AM4/2/13
to spark...@googlegroups.com
Ah

The code looks fine (I haven't run it though). Do you get the correct results for distinct values out when logging the output of the reduce?

The persist() method only indicates to Spark what storage level should be used for caching intermediate RDDs during a job. In this case you have a map operation followed by a reduce, so I suspect that it never triggers the caching since the job will feed the map results straight to the reduce operation (on the master, since it's not reduceByKey).

If you used rdd.persist() and then had a job with multiple calls against that RDD, you would see the caching come into play.

If you want to store the results in HDFS, then you need to use rdd.saveAsTextFile (or rdd.saveAsHadoopFile). Any RDD cached during the job using persist() is cleaned up after job completion.

Hope this helps
Nick

Oliver Senn

unread,
Apr 2, 2013, 8:36:05 PM4/2/13
to spark...@googlegroups.com
Hmm, ok. What I tried is writing the map output to a file using rdd.saveAsTextFile but the problem is that
the map phase only starts writing out once a task of it is finished. No intermediary map outputs are written out
which is a problem since the data doesn't fit in memory.

Is there another map variant which would write out results after each batch was processed?

Best,
Oliver

MLnick

unread,
Apr 3, 2013, 4:47:06 AM4/3/13
to spark...@googlegroups.com
I'm not sure I fully understand the outcome you are trying to achieve. Why would you like to write out the intermediate results after each batch?

As far as I understand your use case, you care about getting the relevant records out of Mongo in parallel (which is achieved via your map task that extracts the relevant records for a key range), and then you have an RDD[Record] that you can do further operations on (e.g. groupBy, reduce and get out a final result that you can write out to HDFS if you wish). Is this a fair interpretation?

The devs can step in here as they know more - but my understanding is that intermediate results will be spilled to disk where necessary (e.g. the shuffle phase before a reduce operation when doing reduceByKey) during a given workflow, in a similar manner to Hadoop. Again like Hadoop, these intermediate results (e.g. the equivalent of the map part-0000 files) will be cleaned up once the job is completed, and only the RDD that you call .saveAsTextFile on will be written to HDFS.

If you did want to write intermediate results to HDFS you can call .saveAsTextFile on any RDD you wish during the workflow.

A cached RDD itself will also be spilled to disk when the persist() storage level includes DISK and the RDD doesn't fit in memory. But caching is not required, it is there to speed up operations that repeatedly access the same RDD. Without caching, Spark will behave much like Hadoop (so intermediate results will indeed be spilled to temp disk as required).

Oliver Senn

unread,
Apr 3, 2013, 8:19:51 PM4/3/13
to spark...@googlegroups.com
Hi,

Thanks for the comments, mine are inline:


On Wednesday, April 3, 2013 4:47:06 PM UTC+8, MLnick wrote:
I'm not sure I fully understand the outcome you are trying to achieve. Why would you like to write out the intermediate results after each batch?

As far as I understand your use case, you care about getting the relevant records out of Mongo in parallel (which is achieved via your map task that extracts the relevant records for a key range), and then you have an RDD[Record] that you can do further operations on (e.g. groupBy, reduce and get out a final result that you can write out to HDFS if you wish). Is this a fair interpretation?

That's correct.
 

The devs can step in here as they know more - but my understanding is that intermediate results will be spilled to disk where necessary (e.g. the shuffle phase before a reduce operation when doing reduceByKey) during a given workflow, in a similar manner to Hadoop. Again like Hadoop, these intermediate results (e.g. the equivalent of the map part-0000 files) will be cleaned up once the job is completed, and only the RDD that you call .saveAsTextFile on will be written to HDFS.

That is exactly what I'd like to have, however my code is not 'provoking' any spills to disc when being run. Rather it runs out of memory since the output of each map iteration seems to stay in memory. That is why
I tried to use rdd.persist after the map phase to make it spill to disc but with no success.

So, this is the core of my problem: My code at the moment doesn't spill any map phase results to disc and runs out of memory therefore. The question would be how to make sure that map results are spilled
to disc if necessary.
 
Best,
Oliver

MLnick

unread,
Apr 4, 2013, 12:31:37 PM4/4/13
to spark...@googlegroups.com
Do you mind posting the stack trace?

It could be that the OOM is happening at the reduce phase since that takes place on the master and if you have very large amounts of distinct values then you could be running into memory issues when merging all the map results

Reply all
Reply to author
Forward
0 new messages