Impact of RDD data size

265 views
Skip to first unread message

Sijo

unread,
Apr 10, 2013, 7:42:14 PM4/10/13
to spark...@googlegroups.com
Hi all,

I am puzzled by the impact of data size in RDD, on the Job timing in my application. 

Basic job steps: (Standalone mode, 2 worker and 1 master machines with plenty of Memory, 130K elements in RDD for this test run)
I simplified my code for test experiment to following barebone steps
- Populate RDD 
    JavaRDD<V> rdd = sc.parallelize(readInput, slices); //slices= 4 x Workers
    rdd.cache();
- For each realtime query/search, map on the same rdd object
  for(each query..)  {
resultRDD<String> = rdd.map(new mapFn).collect();
  }
  Map function is doing nothing, returns same string for every map

Observations:
1. If RDD contain String element (V is String), with data length 4000char each, JobTime 0.9sec 
2. If RDD contain Class1 element (V is Class1) with V.str data 4000char each, JobTime 8.5sec 
   (Object1 is serializable, contains just one String field "str")
3. If RDD contain Class1 element  with V.str data 2000char each, JobTime 3.9sec 
4. If RDD contain Class1 element  with V.str data 400char each, JobTime 1.6sec 
5. If RDD contain char[] element (V is char[]), with data length 4000char each, JobTime is worse 22.5sec 

Using Kryo serializer on the same code, give minimal improvement in timing.
Does it sound like the object serialization is causing this overhead? Is this expected or I am using it wrong?

Many thanks,
Sijo

Sijo Cherian

unread,
Apr 18, 2013, 9:59:09 AM4/18/13
to spark...@googlegroups.com
Hi all,

I looked further into this issue. With my data being 2-4K char each, using the data in a RDD has large overhead for my near-real time system. My observation item 1 in last mail was erroneous: even for a RDD of String element with 4000char, the job takes ~8sec (vs if String is 12char long, job takes 0.9sec).

So I am planning to distribute the data key (12char) in RDD and inside 'mapPartition', reach over to a cache system to retrieve the data. 

Is there alternate approach to get data locality between successive maps?
Even if I populate the RDD from a hadoop directory, the RDD will contain 4Kchar in each element, and hence have same overhead.

I appreciate any pointers to approaching the problem of using this larger size data in a distributed manner with subsecond latency.
Thanks
Sijo



--
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/I80uDAqjzTY/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Regards,
Sijo

Sijo

unread,
Apr 18, 2013, 3:46:53 PM4/18/13
to spark...@googlegroups.com

Another note after some more debugging:
  I debugged by doing "saveAsTextFile" of result containing key & worker-hostname where key was mapped.
  I see that between map iteration (my successive query) on same rdd, say between itr0 and itr15,  the worker-hostname is different in  most of the partition (in part-ddddd file).
Can this movement of data, be the core reason for the overhead?

To unsubscribe from this group and all its topics, send an email to spark-users+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Regards,
Sijo

Mark Hamstra

unread,
Apr 18, 2013, 4:17:24 PM4/18/13
to spark...@googlegroups.com
Any time a Spark job runs slowly, the cost of moving data between nodes is likely to blame.  You can get unnecessary movement of data if HDFS, Akka and/or Spark have different notions of the names and/or IP addresses of the nodes....
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Regards,
Sijo

Sijo

unread,
Apr 22, 2013, 3:30:45 PM4/22/13
to spark...@googlegroups.com

Thanks for the various suggestions.
Based on feedback that RDDs' memory usage is causing this, I tried following 5 experiments on same dataset. The blockmanager UI observed is attached. 
I observed map occurring on different workers due to some rdd.union I was trying. I eliminated that, now the map occurs on same worker between iterations.
There is plenty of free memory on worker. I tried different machines and upto five worker with same result. 
If my RDD contains data < 100 char, I get subsecond latency. But my usecase contains data with 4K chars. 

--Observations---
For the same simplified implemention, with 2 worker, 130K elements in RDD: 

Exp1 Mem cache: For RDD<V>, V contains 4000 chars each, using rdd.cache() 
    Avg (of ten iterations) job time 10.4 sec; In blockmgr UI RDD memorysize 1098MB. Worker top memoryusage ~30%

Exp2 Serialize: V contains 4000 chars each, using rdd.persist(StorageLevel.MEMORY_ONLY_SER()), with KryoRegistrator
    Avg (of ten iterations) job time 10.8 sec; RDD memorysize 542MB

Exp3 No rdd cache: V contains 4000 chars each, rdd.cache() not called     
    Avg (of ten iterations) job time 10.5 sec; In blockmgr UI RDD memorysize 0MB

Exp4 Replication: V contains 4000 chars each, using rdd.persist(StorageLevel.MEMORY_ONLY_2())
    Avg (of ten iterations) job time 10.2 sec; 

Exp5 : Same as Exp1 except V contains 18 chars each
    Avg (of ten iterations) job time 575 msec;

Appreciate your feedbacks,
Sijo
experiment-rdd-charsize.jpg

Sijo

unread,
Apr 23, 2013, 1:26:50 PM4/23/13
to spark...@googlegroups.com
Hi
Looking at INFO from logs: 
For Exp1, log says "Serialized task 1.0:6 as 71206403 bytes" 
   Does it mean whole data is getting serialized? 100x more bytes are serialized compared to Exp5, which seem to be taking the time.
Can anyone please comment by peeking at the attached log statements and the code sample, if I am using the rdd correctly.

 INFO TaskSetManager: Serialized task 1.0:6 as 71206403 bytes in 615 ms
 INFO TaskSetManager: Finished TID 14 in 2291 ms (progress: 7/8)

- Attached are few lines of debug from initial warmup iteration and then the first iteration.

Appreciate any feedbacks,
Sijo


On Wednesday, April 10, 2013 7:42:14 PM UTC-4, Sijo wrote:
string4k.out

Sijo

unread,
Apr 23, 2013, 2:53:56 PM4/23/13
to spark...@googlegroups.com

I also wanted to confirm if latency between my iterations (map then collect, on same job) is not impacted by how/where data is being distributed from (local map vs hdfs).
I am assuming that if same data is being read from hdfs using sc.hadoopFile, after the first warmup action (rdd.collect), all subsequent map operation will work the same way as data is in local rdd cache and take same amount of time.

Thanks
Sijo
On Wednesday, April 10, 2013 7:42:14 PM UTC-4, Sijo wrote:

Sijo Cherian

unread,
Apr 24, 2013, 12:14:21 PM4/24/13
to spark...@googlegroups.com
Hi all,

I have this issue still unresolved.
During each map on my rdd instance in the same job (Exp1), I see following log:

  INFO TaskSetManager: Serialized task 1.0:6 as 71206403 bytes in 615 ms

What does this indicate? If it means during every map there is Serialization of the  RDD element/data (local to a node) and it can be avoided then I would like a pointer on how to. 

From the source code of TaskSetManager (version 0.7.0):
          // Serialize and return the task
          val serializedTask = Task.serializeWithDependencies(
            task, sched.sc.addedFiles, sched.sc.addedJars, ser)
          ....
          logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
            taskSet.id, index, serializedTask.limit, timeTaken))

It seem like it is serializing the Task+jars. 
But for Exp5 with 18char long rdd data, I see  
  INFO TaskSetManager: Serialized task 1.0:1 as 243421 bytes in 8 ms

I appreciate any clues.
thanks
Sijo


--
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/I80uDAqjzTY/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Regards,
Sijo

Sijo

unread,
Apr 26, 2013, 10:56:54 AM4/26/13
to spark...@googlegroups.com
Hi all,
 
Anyone familiar with TaskSetManager src: 
please confirm if data in a RDD is supposed to get serialized on every Transformation on a rdd, or if you see any wrong usage in my sample code from previous post.

I see growing serialize bytes, as I increase data size.
   INFO TaskSetManager: Serialized task ....

Thanks
Sijo


On Wednesday, April 10, 2013 7:42:14 PM UTC-4, Sijo wrote:
Reply all
Reply to author
Forward
0 new messages