--
You received this message because you are subscribed to the Google Groups "SparkR Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sparkr-dev+...@googlegroups.com.
To post to this group, send email to spark...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/sparkr-dev/8bced643-9672-4c5c-80ee-eabd2523a13a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
To view this discussion on the web visit https://groups.google.com/d/msgid/sparkr-dev/CAG%3DM9Lo-CuBPf8xVzhQk57HGUA2jAyTWsmdJjZr-FAXX6gGwsw%40mail.gmail.com.
sc <- sparkR.init(master="spark://<master>:7077",
sparkEnvir=list(spark.executor.memory="1g"))Ok, I've done some more exploration. Sorry for the dense post, but
hopefully the detail will be helpful.
Here's the setting. I'm using Spark 1.1.0. Original dataset is about
12 Gb as flat text, less when on the HDFS in bz2 format. It's split
into 22 .bz2 files on the HDFS and while not wildly different in size,
there is some variation in size across the files. I'm using a
12-worker EC2 cluster, each worker has 2 cores and 7.6 Gb RAM. I'm
calling SparkR as:
MASTER=`cat /root/spark-ec2/cluster-url` SPARK_MEM=6g ./sparkR
I then do sc.textFile to read in and do filterRDD and map/reduceByKey
operations.
When I do cache the original dataset, I see about 1.3 Gb memory per
partition, and most of the nodes use up to 2.7 Gb as they have 2
partitions assigned (a couple nodes only have one of the 22
partitions, so use less memory).
First a couple background questions:
1) I'd expect 6 Gb available on each worker, so 6 Gb per executor.
However both in SparkR and when I invoke PySpark without setting
SPARK_MEM, I see in the UI that each executor has 3 Gb available. Can
anyone clarify where the other 3 Gb is?
2) In the UI, I see 12 executors and often see 2 active tasks per
executor, which makes sense. But Shivaram suggested using fewer
executors. When I change SPARK_WORKER_CORES, I still see 12 executors,
but only 1 active task per executor, which makes sense to me given
each node has 2 cores, but seems not to be what Shivaram was
suggesting.
Now some results from trying with and without caching and reducing
SPARK_WORKER_CORES to 1.
Case A: My original attempt, with SPARK_WORKER_CORES=2 and with
caching. This causes the executor failure mentioned in the previous
thread. I am able to cache the initial dataset but then subsequent
operations lead to the failure. I note that there are a couple nodes
with 2.7 Gb or so used by the RDD partitions on those nodes. So an out
of memory problem seems plausible given the availability of only 3 Gb
rather than 6 Gb. That said, shouldn't Spark/SparkR be robust to this
and automatically fall back to disk instead of failing
catastrophically?
As noted, PySpark handles this dataset and analogous operations just
fine with caching.
When I look at memory used by the RDD for PySpark, the maximum on any
given node is 500 Mb.
So this presumably explains why PySpark is fine. But are the
underlying data structures different for PySpark and SparkR? I thought
both had the data in memory in Java on the back end?
Case B: Do with SPARK_WORKER_CORES=2 but without caching. Things seem
fine, which seems to implicate memory as the issue.
Case C: Use 24 workers, SPARK_WORKER_CORES=2 and caching. This works fine.
Case D: Use 12 workers, SPARK_WORKER_CORES=2, but only cache after
repartitioning so that the RDD is balanced across nodes. This works
fine, with 1.1 Gb of memory used per executor for the cached RDD.
Case E: Change SPARK_WORKER_CORES to 1 and use caching. Note that a
given node still uses up to 2.7 Gb for the two partitions of the RDD
assigned to it, and I still see 12 executors, each having 3 Gb
available. Now I'm not seeing all the lost executors as in Case A, but
I do see two lost tasks on one of the executors (after one task
succeeded). The error message is as follows. I let it go for an hour,
but in that time it wasn't able to complete.
Error in unserialize(readBin(con, raw(), as.integer(dataLen), endian = "big")) : String '{{Use mdy dates|date=January 2013}}[[File:OC - CU - PY.png|thumb|Occupy Harvard logo]][[File:Occupyharvardtents.jpg|thumb|Tents and banner at Occupy Harvard]][[File:Locked Gates Occupy Harvard.jpg|thumb|Administrators locked the gates to Harvard Yard for several weeks]]'''Occupy Harvard''' was a student demonstration at [[Harvard University]] identifying itself with the global [[Occupy Movement]]. It sought to create a forum for discussing economic inequality at Harvard, in the United States, and throughout the world. It criticized Harvard's influence on global economic policy and its involvement with the American financial sector. It also supported wage campaigns by Harvard workers and a divestment demand initiated by [[Hotel Workers Rising]].Facing resistance from administration and police, the group established an encampment in [[Harvard Yard]] after a march on November 9, 2011. Immediately after this march, the gates to Harvard Yard were locked and only peo Calls: source ... withVisible -> eval -> eval -> <Anonymous> -> unserialize
"Caused by: java.io.EOFException"
java.lang.OutOfMemoryError: Requested array size exceeds VM limit via org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42))exception /after/ the original error, some 30 seconds later. I have instantiated the spark context with [spark.executor.memory="16g"] as an option in both lists, and the executor overview indicates that this setting is more or less correctly transferred (8G are available per executor, and 4G for the driver which was set to 8G).The node itself has 32GB RAM free, so the VM shouldn't run into constraints either. I'm quite stumped as to why the VM runs out of memory, and why it does so after throwing all kinds of rather cryptic exceptions at the R level.
...
Error in if (numBroadcastVars > 0) { : Argument has length 0
I assume this is an error that falls out further down the exception chain, but it might be a point of interest nonetheless. I also get the occasional \0-string throwing errors in this case. I will attempt to set R on the workers to English, so I can get proper error messages - I have the feeling that currently the translation that's defaulted on to me doesn't include as much detail.partitionedRDD An object of class "PipelinedRDD"indicates that the object exists. Both RDDs have different ids (1 and 135).
...
| (the default for spark.executor.cores is 1, and nothing else is configured) while the driver should be using 4 according to the configuration(but I cannot verify this easily). Should I consider reducing the number of driver cores? What could be breaking repartition? Could there be an underlying configuration issue? YARN doesn't appear to be the best environment for Spark(R) from what I've seen, so I could imagine that there are some interactions happening there. If you could give me some pointers how to get further debug info out of the system, I'd like to further narrow down the issue, but I'm still quite new to the codebase, and a bit lost for the moment. Rick |
...
--
You received this message because you are subscribed to the Google Groups "SparkR Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sparkr-dev+...@googlegroups.com.
To post to this group, send email to spark...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/sparkr-dev/48545bd8-b19a-412c-8290-d9af4fb86e43%40googlegroups.com.
...
embedded nul in string: '106355\tBrassica\t1397901459\t{{taxobox|image = Brassica rapa plant.jpg|image_caption = ''[[Brassica rapa]]''|regnum = [[Plantae]]|unranked_divisio = [[Angiosperms]]|unranked_classis = [[Eudicots]]|unranked_ordo = [[Rosids]]|ordo = [[Brassicales]]|familia = [[Brassicaceae]]|genus = '''''Brassica'''''|subdivision_ranks = Species|subdivision = See text.|}}'''''Brassica''''' ({{IPAc-en|?|b|r|?|s|?|k|?}}) is a genus of plants in the [[Mustard plant|mustard]] family ([[Brassicaceae]]). The members of the genus are informally known as [[cruciferous vegetables]], [[cabbages]], or [[mustard plant]]. Crops from this genus are sometimes called ''cole crops''{{mdash}}derived from the Latin ''caulis'', meaning ''stem or cabbage''.<ref name="Wordnik - caulis"/>Members of brassica commonly used for food include [[cabbage]]\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\...
| // NOTE: Only works for ASCII right now |
| def writeString(out: DataOutputStream, value: String) { |
| val len = value.length |
| out.writeInt(len + 1) // For the \0 |
| out.writeBytes(value) |
| out.writeByte(0) |
| } |
...
...
--
You received this message because you are subscribed to the Google Groups "SparkR Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sparkr-dev+...@googlegroups.com.
To post to this group, send email to spark...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/sparkr-dev/b3fbdc2c-82b1-4f42-9971-21b8361fb0e3%40googlegroups.com.