Some naive question

156 views
Skip to first unread message

Axel Rivera

unread,
Jul 15, 2013, 12:46:12 PM7/15/13
to spark...@googlegroups.com
Hi guys, I just started Spark. I'm using spark with pyspark and the spark cluster system. Everything is running smooth but I have some doubts.

1) How can get the node number? For example, in OpenMp you can do omp_get_thread_num() and give you the thread number, so how I can I get the node or RDD number that is running?
2) Second, I want to split the job in different nodes, but apparently Spark is not doing it. Here is a small example (taken from the pySpakr docs):

         sc = SparkContext(sys.argv[1],jobName="CrappyTest")
         def f(x):
               yield sum(x)
        out = sc.parallelize([1,2,3,4],2)
        print out.mapPartitions(f,preservesPartitioning=True).collect()

   I expect from the result to be [3,7], but it is giving me [0,10]. I did other longer tests and all nodes finish fast expect the last one which spend the time doing all the jobs.

Thanks in advance.

Axel Rivera

unread,
Jul 15, 2013, 5:06:56 PM7/15/13
to spark...@googlegroups.com
I was doing some tests with glom() and apparently spark is not partitioning data:

>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.glom().collect()
13/07/15 15:04:28 INFO spark.SparkContext: Starting job: collect at NativeMethodAccessorImpl.java:-2
13/07/15 15:04:28 INFO scheduler.DAGScheduler: Got job 0 (collect at NativeMethodAccessorImpl.java:-2) with 2 output partitions (allowLocal=false)
13/07/15 15:04:28 INFO scheduler.DAGScheduler: Final stage: Stage 0 (PythonRDD at NativeConstructorAccessorImpl.java:-2)
13/07/15 15:04:28 INFO scheduler.DAGScheduler: Parents of final stage: List()
13/07/15 15:04:28 INFO scheduler.DAGScheduler: Missing parents: List()
13/07/15 15:04:28 INFO scheduler.DAGScheduler: Submitting Stage 0 (PythonRDD[1] at PythonRDD at NativeConstructorAccessorImpl.java:-2), which has no missing parents
13/07/15 15:04:29 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[1] at PythonRDD at NativeConstructorAccessorImpl.java:-2)
13/07/15 15:04:29 INFO local.LocalScheduler: Running ResultTask(0, 0)
13/07/15 15:04:29 INFO local.LocalScheduler: Size of task 0 is 2525 bytes
13/07/15 15:04:30 INFO local.LocalScheduler: Finished ResultTask(0, 0)
13/07/15 15:04:30 INFO local.LocalScheduler: Running ResultTask(0, 1)
13/07/15 15:04:30 INFO local.LocalScheduler: Size of task 1 is 2611 bytes
13/07/15 15:04:30 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
13/07/15 15:04:30 INFO local.LocalScheduler: Finished ResultTask(0, 1)
13/07/15 15:04:30 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
13/07/15 15:04:30 INFO scheduler.DAGScheduler: Stage 0 (PythonRDD at NativeConstructorAccessorImpl.java:-2) finished in 1.671 s
13/07/15 15:04:30 INFO spark.SparkContext: Job finished: collect at NativeMethodAccessorImpl.java:-2, took 2.264232229 s
[[], [1, 2, 3, 4]]

Josh Rosen

unread,
Jul 15, 2013, 10:17:09 PM7/15/13
to spark...@googlegroups.com
RE the parallelize() behavior that you observed: This is due to how PySpark batches multiple Python objects during serialization.  PySpark's parallelize() method is grouping its input (into groups of 1024 by default) then partitioning the groups, whereas it should partition then group items.  I've opened an issue at https://spark-project.atlassian.net/browse/SPARK-815 to fix this.

RE your second question: check out the mapPartitionsWithSplit() method:

    def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
        """
        Return a new RDD by applying a function to each partition of this RDD,
        while tracking the index of the original partition.

        >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
        >>> def f(splitIndex, iterator): yield splitIndex
        >>> rdd.mapPartitionsWithSplit(f).sum()
        6
        """
--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Axel Rivera

unread,
Jul 16, 2013, 12:29:36 PM7/16/13
to spark...@googlegroups.com
Thanks for explain it. I manage to use partitionBy(X) with glom() and it works fine. On the other hand, is there a way tu mute the output? For example, when I run spark it gives me a lot of output information like job started, job finished, and other extra stuffs. It is good for debugging but now that everything is running fine I want to do some timing and remove the output of stuffs that I don't need. I know I can do a Pipe, but I just want to mute it.

Patrick Wendell

unread,
Jul 16, 2013, 1:16:04 PM7/16/13
to spark...@googlegroups.com
Hey Axel,

Look in conf/log4j.properties.template - you can copy this to
conf/log4j.properties and add your own policies about how much noise
to make.

- Patrick
Reply all
Reply to author
Forward
0 new messages