lost tasks and executors problem

6,508 views
Skip to first unread message

paci...@berkeley.edu

unread,
Jan 7, 2015, 8:26:51 PM1/7/15
to spark...@googlegroups.com
I'm doing some basic manipulation with a 10 Gb dataset split into 22 CSV files and loaded onto HDFS. The dataset has about 123 million observations.

In reading in as a text file, caching, some basic filtering, etc., I'm often getting Java messages about lost tasks and lost executors. This seems to happen in a non-systematic way - I can rerun the same code after restarting R/SparkR and the error may disappear or appear. In some cases I seem to get a valid result despite this and in other cases I don't.

This is using Spark 1.1.0 on a 12-worker cluster on EC2.  I'm starting R/SparkR with:
MASTER=`cat /root/spark-ec2/cluster-url` SPARK_MEM=6g ./sparkR

I can provide more details and make a complete example if that would be helpful.

Here's an example of what is happening (note the warning about "no such file or directory" is spurious and I'll mention that in a different post). In the example below I got a valid result. But in other cases I don't, and I get error messages like:
Job aborted due to stage failure: Task 16 in stage 3.0 failed 4 times, most recent failure: Lost task 16.3 in stage 3.0 (TID 119, ip-172-31-18-58.us-west-2.compute.internal): ExecutorLostFailure (executor lost)


=============================================================

> hdfs_master <- system("cat /root/ephemeral-hdfs/conf/masters", intern = TRUE)
> lines <- cache(textFile(sc, paste0("hdfs://", hdfs_master, ":9000/data/airline")))
Warning message:
In normalizePath(path) :
  path[1]="hdfs://ec2-54-186-247-68.us-west-2.compute.amazonaws.com:9000/data/airline": No such file or directory
> count(lines)
15/01/08 00:24:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/01/08 00:24:42 WARN LoadSnappy: Snappy native library not loaded
15/01/08 00:24:43 INFO FileInputFormat: Total input paths to process : 22
15/01/08 00:27:02 WARN TaskSetManager: Lost task 5.0 in stage 0.0 (TID 2, ip-172-31-18-62.us-west-2.compute.internal): java.lang.NullPointerException:
        edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:41)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
15/01/08 00:27:50 WARN TaskSetManager: Lost task 5.1 in stage 0.0 (TID 22, ip-172-31-18-62.us-west-2.compute.internal): java.lang.NullPointerException:
        edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:41)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
15/01/08 00:27:56 ERROR TaskSchedulerImpl: Lost executor 2 on ip-172-31-18-62.us-west-2.compute.internal: remote Akka client disassociated
15/01/08 00:27:56 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(ip-172-31-18-62.us-west-2.compute.internal,40651) not found
15/01/08 00:27:56 WARN TaskSetManager: Lost task 20.0 in stage 0.0 (TID 14, ip-172-31-18-62.us-west-2.compute.internal): ExecutorLostFailure (executor lost)
15/01/08 00:28:21 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 0, ip-172-31-18-60.us-west-2.compute.internal): java.lang.NullPointerException:
        edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:41)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
15/01/08 00:28:40 WARN TaskSetManager: Lost task 6.0 in stage 0.0 (TID 10, ip-172-31-18-63.us-west-2.compute.internal): java.lang.NullPointerException:
        edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:41)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
15/01/08 00:28:51 WARN TaskSetManager: Lost task 17.0 in stage 0.0 (TID 17, ip-172-31-18-67.us-west-2.compute.internal): java.lang.NullPointerException:
        edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:41)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
15/01/08 00:30:02 WARN TaskSetManager: Lost task 18.0 in stage 0.0 (TID 12, ip-172-31-18-60.us-west-2.compute.internal): java.lang.NullPointerException:
        edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:41)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
15/01/08 00:31:19 ERROR TaskSchedulerImpl: Lost executor 10 on ip-172-31-18-60.us-west-2.compute.internal: remote Akka client disassociated



Shivaram Venkataraman

unread,
Jan 14, 2015, 1:01:11 AM1/14/15
to paci...@berkeley.edu, spark...@googlegroups.com
Sorry for the delay in replying -- But looking at the log I think the problem is that the Spark executors are somehow getting killed. Lines which look like "Lost executor 2" etc. indicate that. I wonder if this is happening because its running out of memory and the OS is somehow killing these processes. Could you try it without the cache and see if that helps ?

Thanks
Shivaram 

--
You received this message because you are subscribed to the Google Groups "SparkR Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sparkr-dev+...@googlegroups.com.
To post to this group, send email to spark...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/sparkr-dev/8bced643-9672-4c5c-80ee-eabd2523a13a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Chris Paciorek

unread,
Jan 14, 2015, 11:25:56 AM1/14/15
to Shivaram Venkataraman, spark...@googlegroups.com
Sure happy to try. By "without the cache", I assume you mean not
caching any of the RDDs, not that I need to change some
setting/configuration of Spark.

I'm assuming that each node is making use of 6 Gb RAM because of my
specification of SPARK_MEM when I invoke SparkR, so I'd be surprised
it is running out of memory given the number of worker nodes (12) and
the size of the full dataset (10 Gb). But perhaps I'm not
understanding something about the inner workings.

chris

Shivaram Venkataraman

unread,
Jan 14, 2015, 4:35:14 PM1/14/15
to Chris Paciorek, spark...@googlegroups.com
Yes, I meant not caching the RDDs. The part about memory usage is just a guess I had given that executors are dying. 

The other thing is you could run fewer executors per machine and see if it helps. Basically each executor forks off a R process and each R process uses some memory while doing the computation. Thus, using fewer executors might help in some cases.  

You can change this by setting `SPARK_WORKER_CORES` in /root/spark/conf/spark-env.sh but this will need to be copied to all the machines and a Spark restart (i.e. /root/spark-ec2/copy-dir /root/spark/conf; /root/spark/sbin/stop-all.sh; /root/spark/sbin/start-all.sh)

Thanks
Shivaram

hipho...@gmail.com

unread,
Jan 14, 2015, 9:26:09 PM1/14/15
to spark...@googlegroups.com, shivaram.v...@gmail.com
Hi, Chris,

1. You can go to the Spark web UI to take a look at the worker stderr logs, see if there is any error reported in R worker
2. SPARK_MEM is used to set the driver memory, not the executor memory. As documented in the SparkR readme, to set the executor memory:
sc <- sparkR.init(master="spark://<master>:7077",
                  sparkEnvir=list(spark.executor.memory="1g"))


在 2015年1月15日星期四 UTC+8上午12:25:56,Chris Paciorek写道:

Chris Paciorek

unread,
Jan 16, 2015, 3:02:41 PM1/16/15
to Shivaram Venkataraman, spark...@googlegroups.com
Ok, I've done some more exploration. Sorry for the dense post, but
hopefully the detail will be helpful.

Here's the setting. I'm using Spark 1.1.0. Original dataset is about
12 Gb as flat text, less when on the HDFS in bz2 format. It's split
into 22 .bz2 files on the HDFS and while not wildly different in size,
there is some variation in size across the files. I'm using a
12-worker EC2 cluster, each worker has 2 cores and 7.6 Gb RAM. I'm
calling SparkR as:
MASTER=`cat /root/spark-ec2/cluster-url` SPARK_MEM=6g ./sparkR

I then do sc.textFile to read in and do filterRDD and map/reduceByKey
operations.

When I do cache the original dataset, I see about 1.3 Gb memory per
partition, and most of the nodes use up to 2.7 Gb as they have 2
partitions assigned (a couple nodes only have one of the 22
partitions, so use less memory).

First a couple background questions:

1) I'd expect 6 Gb available on each worker, so 6 Gb per executor.
However both in SparkR and when I invoke PySpark without setting
SPARK_MEM, I see in the UI that each executor has 3 Gb available. Can
anyone clarify where the other 3 Gb is?

2) In the UI, I see 12 executors and often see 2 active tasks per
executor, which makes sense. But Shivaram suggested using fewer
executors. When I change SPARK_WORKER_CORES, I still see 12 executors,
but only 1 active task per executor, which makes sense to me given
each node has 2 cores, but seems not to be what Shivaram was
suggesting.

Now some results from trying with and without caching and reducing
SPARK_WORKER_CORES to 1.

Case A: My original attempt, with SPARK_WORKER_CORES=2 and with
caching. This causes the executor failure mentioned in the previous
thread. I am able to cache the initial dataset but then subsequent
operations lead to the failure. I note that there are a couple nodes
with 2.7 Gb or so used by the RDD partitions on those nodes. So an out
of memory problem seems plausible given the availability of only 3 Gb
rather than 6 Gb. That said, shouldn't Spark/SparkR be robust to this
and automatically fall back to disk instead of failing
catastrophically?

As noted, PySpark handles this dataset and analogous operations just
fine with caching.
When I look at memory used by the RDD for PySpark, the maximum on any
given node is 500 Mb.

So this presumably explains why PySpark is fine. But are the
underlying data structures different for PySpark and SparkR? I thought
both had the data in memory in Java on the back end?

Case B: Do with SPARK_WORKER_CORES=2 but without caching. Things seem
fine, which seems to implicate memory as the issue.

Case C: Use 24 workers, SPARK_WORKER_CORES=2 and caching. This works fine.

Case D: Use 12 workers, SPARK_WORKER_CORES=2, but only cache after
repartitioning so that the RDD is balanced across nodes. This works
fine, with 1.1 Gb of memory used per executor for the cached RDD.

Case E: Change SPARK_WORKER_CORES to 1 and use caching. Note that a
given node still uses up to 2.7 Gb for the two partitions of the RDD
assigned to it, and I still see 12 executors, each having 3 Gb
available. Now I'm not seeing all the lost executors as in Case A, but
I do see two lost tasks on one of the executors (after one task
succeeded). The error message is as follows. I let it go for an hour,
but in that time it wasn't able to complete.


> routeCount <- collect(reduceByKey(map(lines, createKeyValue), "+", 1L))
15/01/15 22:14:57 WARN TaskSetManager: Lost task 20.0 in stage 3.0
(TID 63, ip-172-31-23-101.us-west-2.compute.internal):
java.lang.NullPointerException:
edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:41)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:30)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

15/01/15 22:30:31 WARN TaskSetManager: Lost task 20.1 in stage 3.0
(TID 66, ip-172-31-23-101.us-west-2.compute.internal):
java.lang.NullPointerException:
edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:41)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:30)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)


Any thoughts appreciated.

chris

On Wed, Jan 14, 2015 at 1:35 PM, Shivaram Venkataraman

Shivaram Venkataraman

unread,
Jan 23, 2015, 7:34:18 PM1/23/15
to Chris Paciorek, spark...@googlegroups.com
Chris

Apologies for the long delay in getting back on this. I was out at a retreat and sick for a bit and didn't get a chance to go through the detailed email. Replies in line


On Fri, Jan 16, 2015 at 12:02 PM, Chris Paciorek <paci...@stat.berkeley.edu> wrote:
Ok, I've done some more exploration. Sorry for the dense post, but
hopefully the detail will be helpful.

Here's the setting. I'm using Spark 1.1.0. Original dataset is about
12 Gb as flat text, less when on the HDFS in bz2 format. It's split
into 22 .bz2 files on the HDFS and while not wildly different in size,
there is some variation in size across the files. I'm using a
12-worker EC2 cluster, each worker has 2 cores and 7.6 Gb RAM.  I'm
calling SparkR as:
MASTER=`cat /root/spark-ec2/cluster-url` SPARK_MEM=6g ./sparkR

I then do sc.textFile to read in and do filterRDD and map/reduceByKey
operations.

When I do cache the original dataset, I see about 1.3 Gb memory per
partition, and most of the nodes use up to 2.7 Gb as they have 2
partitions assigned (a couple nodes only have one of the 22
partitions, so use less memory).

First a couple background questions:

1)  I'd expect 6 Gb available on each worker, so 6 Gb per executor.
However both in SparkR and when I invoke PySpark without setting
SPARK_MEM, I see in the UI that each executor has 3 Gb available. Can
anyone clarify where the other 3 Gb is?

6 Gb is the total memory given to the Spark executor. Of that only 60% is used for caching. The rest of the memory is used as scratch space during computation. The UI only shows memory used / available for caching. 
2) In the UI, I see 12 executors and often see 2 active tasks per
executor, which makes sense. But Shivaram suggested using fewer
executors. When I change SPARK_WORKER_CORES, I still see 12 executors,
but only 1 active task per executor, which makes sense to me given
each node has 2 cores, but seems not to be what Shivaram was
suggesting.

 So SPARK_WORKER_CORES controls the number of parallel threads used on a executor and thus controls the number of active tasks. If we set it to 1, there will be 1 active task at a time and this was what I was suggesting before. 
Now some results from trying with and without caching and reducing
SPARK_WORKER_CORES to 1.

Case A: My original attempt, with SPARK_WORKER_CORES=2 and with
caching. This causes the executor failure mentioned in the previous
thread.  I am able to cache the initial dataset but then subsequent
operations lead to the failure. I note that there are a couple nodes
with 2.7 Gb or so used by the RDD partitions on those nodes. So an out
of memory problem seems plausible given the availability of only 3 Gb
rather than 6 Gb.  That said, shouldn't Spark/SparkR be robust to this
and automatically fall back to disk instead of failing
catastrophically? 
 
SparkR doesn't yet have support for spilling to disk in the middle of a computation.  It assumes that an entire partition of input can be loaded into R and processed.
As noted, PySpark handles this dataset and analogous operations just
fine with caching.
When I look at memory used by the RDD for PySpark, the maximum on any
given node is 500 Mb.

So this presumably explains why PySpark is fine. But are the
underlying data structures different for PySpark and SparkR? I thought
both had the data in memory in Java on the back end?

The data in the backend is stored in a serialized format (Java just seems it as byte array). The format is different for each language and in SparkR we use the `serialize` command in R to serialize data. It could very well be that R's format is more inefficient than Python here.

Case B: Do with SPARK_WORKER_CORES=2 but without caching. Things seem
fine, which seems to implicate memory as the issue.

Case C: Use 24 workers, SPARK_WORKER_CORES=2 and caching. This works fine.

Case D: Use 12 workers, SPARK_WORKER_CORES=2, but only cache after
repartitioning so that the RDD is balanced across nodes. This works
fine, with 1.1 Gb of memory used per executor for the cached RDD.

Case E: Change SPARK_WORKER_CORES to 1 and use caching. Note that a
given node still uses up to 2.7 Gb for the two partitions of the RDD
assigned to it, and I still see 12 executors, each having 3 Gb
available. Now I'm not seeing all the lost executors as in Case A, but
I do see two lost tasks on one of the executors (after one task
succeeded). The error message is as follows. I let it go for an hour,
but in that time it wasn't able to complete.

The cause for the lost tasks are a little tricky to debug given this output. There error output from R right now is piped through to stderr on the executor and that gets lost while printing in the driver. I opened a JIRA to fix this https://sparkr.atlassian.net/browse/SPARKR-181 (this is a pretty painful bug). Meanwhile you can see the stderr of a lost task by clicking on the Spark Master WebUI (master_hostname:8080) and on the machine name where the task failed. 

Thanks
Shivaram   

Chris Paciorek

unread,
Jan 29, 2015, 6:21:56 PM1/29/15
to Shivaram Venkataraman, spark...@googlegroups.com
Hi Shivaram, thanks.

I did a bit of testing on the serialization question. I know very
little about serialization so this might be naive, but...

In R, with a vector of 10 million doubles using serialize() to output
to a file produces a file of size 190 Mb and takes about 25 seconds on
my Linux machine.
In Python, using pickle.dump on a numpy array of 10 million doubles
produces a file of size 235 Mb and takes about 10 seconds on my Linux
machine.

Perhaps the serialization for Python in Spark is done differently and
produces much smaller file? If not, then the much greater memory used
by SparkR than PySpark in my example in this thread still seems
strange!


chris

On Fri, Jan 23, 2015 at 4:34 PM, Shivaram Venkataraman

Shivaram Venkataraman

unread,
Jan 30, 2015, 2:32:22 AM1/30/15
to Chris Paciorek, spark...@googlegroups.com
Hi Chris

Those are certainly interesting numbers -- Looks like on the local test R's serialization is more efficient (but a bit slower).
There are some differences in PySpark and SparkR in terms of serialization and how the data is transferred from JVM to R/Python. In Python the data is batched into small batches while in R we serialize the entire partition into a single byte array. 

Anyways I might be able to track this down if you have pointers to the code you were using and / or the dataset.

Thanks
Shivaram 

Chris Paciorek

unread,
Jan 30, 2015, 1:59:06 PM1/30/15
to Shivaram Venkataraman, spark...@googlegroups.com
The dataset is 22 .bz2 files contained in this tarball.
http://www.stat.berkeley.edu/share/paciorek/1987-2008.csvs.tgz

Here are some of the basic things I've been doing in my SparkR code.

hdfs_master <- system("cat /root/ephemeral-hdfs/conf/masters", intern = TRUE)
lines <- cache(textFile(sc, paste0("hdfs://", hdfs_master,
":9000/data/airline")))
count(lines)
take(lines, 5)

sfo_lines = filterRDD(lines, function(line) strsplit(line,
",")[[1]][17] == "SFO")
count(sfo_lines)

createKeyValue <- function(line) {
vals = strsplit(line, ",")[[1]]
return(list(paste(vals[17], vals[18], sep = '+'), 1))
}

routeCount <- collect(reduceByKey(map(lines, createKeyValue), "+", 1L))
routeCount2 <- countByKey(map(lines, createKeyValue))

Also, once I start up the EC2 cluster and am logged in, here is how I
set up SparkR:

echo '#!/bin/bash' > /usr/bin/realpath
echo 'readlink -e $1' >> /usr/bin/realpath
chmod guo+x /usr/bin/realpath

cd /root
wget http://cran.cnr.berkeley.edu/src/contrib/rJava_0.9-6.tar.gz
/root/spark-ec2/copy-dir rJava_0.9-6.tar.gz
R CMD javareconf
tar xvzf rJava_0.9-6.tar.gz; R CMD INSTALL rJava
/root/spark-ec2/copy-dir /usr/bin/realpath
/root/spark/sbin/slaves.sh R CMD javareconf
/root/spark/sbin/slaves.sh R CMD INSTALL ~/rJava_0.9-6.tar.gz

cd /root
git clone https://github.com/amplab-extras/SparkR-pkg.git
cd SparkR-pkg
SPARK_VERSION=1.1.0 ./install-dev.sh
/root/spark-ec2/copy-dir /root/SparkR-pkg

# read data into HDFS:
mkdir /mnt/airline
cd /mnt/airline
wget http://www.stat.berkeley.edu/share/paciorek/1987-2008.csvs.tgz
tar -xvzf 1987-2008.csvs.tgz
export PATH=$PATH:/root/ephemeral-hdfs/bin/
hadoop fs -mkdir /data/airline
hadoop fs -copyFromLocal /mnt/airline/*bz2 /data/airline
hadoop fs -ls /data/airline


# start SparkR
cd /root/SparkR-pkg
MASTER=`cat /root/spark-ec2/cluster-url` SPARK_MEM=6g ./sparkR


-chris

On Thu, Jan 29, 2015 at 11:32 PM, Shivaram Venkataraman

Shivaram Venkataraman

unread,
Feb 1, 2015, 1:43:58 AM2/1/15
to Chris Paciorek, spark...@googlegroups.com
Chris

Thanks a lot for posting the data and the code -- I used this to do a bunch of debugging and profiling today (but this was on a cluster with more memory). I've opened a PR with some fixes I made at https://github.com/amplab-extras/SparkR-pkg/pull/147 

1. The cache on textFile works a little differently in SparkR vs. PySpark. In SparkR we cache JavaRDD encoded as strings, while Python encodes it in Python. The reason this is important is that Strings in Java take 2 bytes per character vs 1 byte in most other languages. So one workaround in your case is to cache an R encoded RDD by explicitly calling `lapply`. i.e. You can do "lines <- cache(lapply(textFile(sc, <filepath>), identity))" -- This halves the cached dataset size from what I observed (i.e. each partition is down from 1G to 500 MB)

2. I also found that in this case calling `strsplit` is slow and you probably don't want to do that every time. So another optimization could be to do `strsplit` before cache.

3. Finally we've recently merged a PR that removes the dependency on rJava. So you can skip that step during the installation process. I'll update the SparkR wiki soon.

Shivaram 

Chris Paciorek

unread,
Apr 9, 2015, 8:10:53 PM4/9/15
to Shivaram Venkataraman, spark...@googlegroups.com
I'm finally getting back to this and trying out Shivaram's idea of
caching with lapply.

This causes lost executors so thought I would report the problem. The
setup is the same as discussed earlier in this thread back in January,
including the airline dataset.

Here's what I am doing in SparkR:


Welcome to SparkR!
Spark context is available as sc
> hdfs_master <- system("cat /root/ephemeral-hdfs/conf/masters", intern = TRUE)
> lines <- cache(lapply(textFile(sc, paste0("hdfs://", hdfs_master, ":9000/data/airline")), identity))
> system.time(count(lines))


And here's the error. Note that if I do the cache() without the
lapply() this succeeds.


15/04/08 22:32:39 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
15/04/08 22:32:39 WARN LoadSnappy: Snappy native library not loaded
15/04/08 22:32:39 INFO FileInputFormat: Total input paths to process : 22
15/04/08 22:33:26 WARN TaskSetManager: Lost task 10.0 in stage 0.0
(TID 16, ip-172-31-1-137.us-west-2.compute.internal):
org.apache.spark.SparkException: R computation failed with
Error in unserialize(readBin(con, raw(), as.integer(dataLen), endian
= "big")) :
embedded nul in string:
'1987,10,8,4\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
Calls: source ... withVisible -> eval -> eval -> <Anonymous> -> unserialize
Execution halted
edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:87)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:39)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
15/04/08 22:33:31 ERROR ConnectionManager: Corresponding
SendingConnection to
ConnectionManagerId(ip-172-31-1-137.us-west-2.compute.internal,37492)
not found
15/04/08 22:33:31 ERROR TaskSchedulerImpl: Lost executor 6 on
ip-172-31-1-137.us-west-2.compute.internal: remote Akka client
disassociated
15/04/08 22:33:31 WARN TaskSetManager: Lost task 10.1 in stage 0.0
(TID 22, ip-172-31-1-137.us-west-2.compute.internal):
ExecutorLostFailure (executor lost)
15/04/08 22:34:08 WARN TaskSetManager: Lost task 0.1 in stage 0.0 (TID
23, ip-172-31-1-132.us-west-2.compute.internal):
org.apache.spark.SparkException: R computation failed with
Error in unserialize(readBin(con, raw(), as.integer(dataLen), endian
= "big")) :
embedded nul in string:
'1987,10,8,4\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
Calls: source ... withVisible -> eval -> eval -> <Anonymous> -> unserialize
Execution halted
edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:87)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
15/04/08 22:34:09 WARN TaskSetManager: Lost task 0.2 in stage 0.0 (TID
25, ip-172-31-1-132.us-west-2.compute.internal):
org.apache.spark.SparkException: R computation failed with

On Sat, Jan 31, 2015 at 10:43 PM, Shivaram Venkataraman

Shivaram Venkataraman

unread,
Apr 9, 2015, 8:55:14 PM4/9/15
to Chris Paciorek, spark...@googlegroups.com
Hmm thats weird. I can try to debug this when I find some time as I still have the dataset. Could you try `toString` instead of identity in the lapply and see if that works ? Basically at some point we are introducing some NULLs into this string and then the parsing seems to fail.

Also could you let me know which git commit the package was built with ?

Thanks
Shivaram

Chris Paciorek

unread,
Apr 9, 2015, 10:53:55 PM4/9/15
to Shivaram Venkataraman, spark...@googlegroups.com
hi Shivaram:

This works:
> hdfs_master <- system("cat /root/ephemeral-hdfs/conf/masters", intern = TRUE)
> lines <- textFile(sc, paste0("hdfs://", hdfs_master, ":9000/data/airline"))
> lines2 <- lapply(lines, toString)
> count(lines2)

However, if I then try to cache lines2, I get the error I noted in the
previous post:

> lines3 <- cache(lines2)
> count(lines3)
15/04/10 02:41:22 WARN TaskSetManager: Lost task 9.0 in stage 1.0 (TID
35, ip-172-31-4-146.us-west-2.compute.internal):
org.apache.spark.SparkException: R computation failed with
Error in unserialize(readBin(con, raw(), as.integer(dataLen), endian
= "big")) :
embedded nul in string:
'1987,10,8,4\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
Calls: source ... withVisible -> eval -> eval -> <Anonymous> -> unserialize
Execution halted
edu.berkeley.cs.amplab.sparkr.BaseRRDD.compute(RRDD.scala:87)


As far as git commit, here's what git log shows as the most recent commit:
root@ip-172-31-6-23 SparkR-pkg]$ git log
commit b64301200f47083505ef13871e783a31af143917
Merge: 2dbcf38 8391e18
Author: Shivaram Venkataraman <shivaram.v...@gmail.com>
Date: Thu Apr 9 14:44:25 2015 -0700

Merge pull request #245 from hqzizania/upstream

Add Rd files for sampleByKey() of [SPARKR-163] and sumRDD() of [SPARKR-92]

On Thu, Apr 9, 2015 at 5:54 PM, Shivaram Venkataraman

Chris Paciorek

unread,
Apr 22, 2015, 6:38:19 PM4/22/15
to Shivaram Venkataraman, spark...@googlegroups.com
One more followup on this.

I tried to use repartition with the same airline dataset used earlier
in this thread. And I got the same error as in the previous message in
this thread.

Here's the code I ran:

hdfs_master <- system("cat /root/ephemeral-hdfs/conf/masters", intern = TRUE)
lines <- textFile(sc, paste0("hdfs://", hdfs_master, ":9000/data/airline"))
linesPartitioned <- repartition(lines, numPartitions = as.integer(96))
count(linesPartitioned)

Note that the same error occurs when I use a much smaller version of
the dataset, just the first three of the 22 original files. This
smaller dataset around 1 Gb.

-chris


On Thu, Apr 9, 2015 at 7:53 PM, Chris Paciorek

Rick Moritz

unread,
Jun 1, 2015, 12:03:04 PM6/1/15
to spark...@googlegroups.com, shivaram.v...@gmail.com
While using the Amp-camp SparkR tutorial [http://ampcamp.berkeley.edu/5/exercises/sparkr.html] to validate my SparkR installation (yarn-client mode), I ran into the same issue, where a count on a cached RDD would fail, but the same RDD without caching would count correctly.
I did not run into \nul issues, but instead into:

Error in unserialize(readBin(con, raw(), as.integer(dataLen), endian = "big")) : 
  String '{{Use mdy dates|date=January 2013}}[[File:OC - CU - PY.png|thumb|Occupy Harvard logo]][[File:Occupyharvardtents.jpg|thumb|Tents and banner at Occupy Harvard]][[File:Locked Gates Occupy Harvard.jpg|thumb|Administrators locked the gates to Harvard Yard for several weeks]]'''Occupy Harvard''' was a student demonstration at [[Harvard University]] identifying itself with the global [[Occupy Movement]]. It sought to create a forum for discussing economic inequality at Harvard, in the United States, and throughout the world. It criticized Harvard's influence on global economic policy and its involvement with the American financial sector. It also supported wage campaigns by Harvard workers and a divestment demand initiated by [[Hotel Workers Rising]].Facing resistance from administration and police, the group established an encampment in [[Harvard Yard]] after a march on November 9, 2011. Immediately after this march, the gates to Harvard Yard were locked and only peo
Calls: source ... withVisible -> eval -> eval -> <Anonymous> -> unserialize

in conjunction with
"Caused by: java.io.EOFException"

I will attempt to identify the issue, but some pointers would be much appreciated.

It appears as though there is some memory corruption happening due to caching - could this be due to memory settings for the executors? I have gotten an OOM (
java.lang.OutOfMemoryError: Requested array size exceeds VM limit via org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42))exception /after/ the original error, some 30 seconds later. I have instantiated the spark context with [spark.executor.memory="16g"] as an option in both lists, and the executor overview indicates that this setting is more or less correctly transferred (8G are available per executor, and 4G for the driver which was set to 8G).
The node itself has 32GB RAM free, so the VM shouldn't run into constraints either. I'm quite stumped as to why the VM runs out of memory, and why it does so after throwing all kinds of rather cryptic exceptions at the R level.

I'd be very thankful for any help!

Additional info: This is a version I zip-downloaded from github two weeks ago or so, which is running inside an Rstudio server.
...

Shivaram Venkataraman

unread,
Jun 1, 2015, 8:28:51 PM6/1/15
to Rick Moritz, spark...@googlegroups.com
Thanks Rick for trying out SparkR. I think the problem is coming up when we try to serialize data from the JVM and pass it on to R which is why you see if when you call cache. You could try to create more partitions of data and see if that helps. The reason is that right now we serialize whole partitions and send them to R as one byte array and very large partitions could lead to the array length errors you are seeing.

I have been caught up with 1.4 release but I also wanted to update this thread with a some information on the DataFrame API which is being released in Spark 1.4. With the DataFrame API a bunch of the issues around serializing data to R etc. can be avoided and most of the data processing that.  The latest programming guide http://people.apache.org/~pwendell/spark-nightly/spark-1.4-docs/latest/sparkr.html might provide a good starting point for the DataFrame API. 

Chris -- I am also planning on adding a section there soon on how to read in CSV files and I think that should help some of the queries you were running with your dataset as well.

Shivaram

Rick Moritz

unread,
Jun 2, 2015, 4:44:18 AM6/2/15
to spark...@googlegroups.com, rah...@gmail.com
Thanks for the swift reply, Shivaram.

Much as Chris reports, I run into the same error as him, when it comes to partioning an RDD using repartition(RDD, int), whether I attempt to partition to =numExecutors, or larger numbers (300, 3000).

Sadly, 1.4 is not quite relevant for me yet, as I need to have distribution support by the end of summer to get this solution into production. Even on the dev/test system I'll likely be stuck with Spark 1.2 for a while yet. Of course, I'm running on YARN, so by putting artefacts into HDFS, I suppose I could work around some of these issues, at least on the test systems.

I assume that due to the imminent release of Spark 1.4 -- and in particular the slight change in scope of SparkR -- interest for SparkR for Spark 1.2 is somewhat diminished?

With regard to my issue, note that the dataset from the example weighs in at only 140MB on disk - it should fit many times over into the caches I assigned, even if serialized very inefficiently.

One further error thrown during repartition, which does not appear, when trying to count on a cached RDD:
Error in if (numBroadcastVars > 0) { : Argument has length 0
I assume this is an error that falls out further down the exception chain, but it might be a point of interest nonetheless. I also get the occasional \0-string throwing errors in this case. I will attempt to set R on the workers to English, so I can get proper error messages - I have the feeling that currently the translation that's defaulted on to me doesn't include as much detail.

Furthermore, cached RDDs do not show up in the storage portion of the Spark GUI in the way I would expect them to.

I called
partitionedRDD <- repartition(parsedRDD, numPartition = as.integer(3000))
then cached both parsedRDD and partionedRDD.
parsedRDD shows up as a value in the SparkUI, partitionedRDD does not. This may be due to the fact, that partitioning throws exception. Nonetheless:
 partitionedRDD
An object of class "PipelinedRDD"
indicates that the object exists. Both RDDs have different ids (1 and 135).
Additionally, in SparkGUI both RDDs are stored (e.g. after count()-ing them) as a single RDD, which grew from 3.2 GB to 5.5 upon calling the second count, in two partitions. On the executors tab, this memory usage is not reflected. Possibly due to the death of the executors?
And, to make matters more interesting, starting another count job (as opposed to refreshing the GUI) resets memory usage to just a few MB per RDD - does the cache get wiped out by the executor deaths? Does it get compacted?

I will attempt to play with the number of cores, which apparently helped in Chris' case, and see if that alleviates the symptoms, but I'd prefer getting to the underlying issue eventually. If oyu have any more pointers, I'd be glad to hear them.

Thanks again,

Rick
...

Rick Moritz

unread,
Jun 2, 2015, 12:14:54 PM6/2/15
to spark...@googlegroups.com, rah...@gmail.com
A minor update: I managed to pass LANG="en" to my executors (which didn't like it much and switched to "C" locale...close enough) and got some proper error messages this time around. Whoever had the brilliant idea of truncating errors in R in the process of translation should be ashamed of themselves.
The result of this is that indeed I get "embedded nul in string" errors for the tsv_wiki dataset, just as Chris reported, and thus can confirm (t)his issue.
In addition, since I'm using YARN, modifying SPARK_WORKER_CORES is not an option, instead I could reduce the number of executors from 3 (one per machine), but this number is already very small, and thus as a work-around it's not very useful. Each worker should be using just a single core

(the default for spark.executor.cores is 1, and nothing else is configured) while the driver should be using 4 according to the configuration(but I cannot verify this easily).
Should I consider reducing the number of driver cores?
What could be breaking repartition?
Could there be an underlying configuration issue? YARN doesn't appear to be the best environment for Spark(R) from what I've seen, so I could imagine that there are some interactions happening there.

If you could give me some pointers how to get further debug info out of the system, I'd like to further narrow down the issue, but I'm still quite new to the codebase, and a bit lost for the moment.

Rick

On Tuesday, June 2, 2015 at 2:28:51 AM UTC+2, Shivaram Venkataraman wrote:
...

Shivaram Venkataraman

unread,
Jun 2, 2015, 2:13:59 PM6/2/15
to Rick Moritz, spark...@googlegroups.com
Thanks for following up Rick. SparkR development has moved to the Apache repository and the focus has also slightly shifted for the 1.4 release to support high-level DataFrame API instead of the low-level RDD API. Thats part of the reason I haven't found time to get to the bottom of the issues that Chris ran into.

BTW what is the dataset and program you are running ? Is it just the AMPCamp 5 tutorial ?

Thanks
Shivaram

--
You received this message because you are subscribed to the Google Groups "SparkR Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sparkr-dev+...@googlegroups.com.
To post to this group, send email to spark...@googlegroups.com.

Rick Moritz

unread,
Jun 3, 2015, 2:51:32 AM6/3/15
to spark...@googlegroups.com, rah...@gmail.com
Further study points to the dataset being used as a potential source of trouble: The strings which fail all contain non-ascii characters (japanese, chinese, egyptian characters) which are sufficiently strange in the present format that they break the `less' command somewhat, introducing "side effects".

I will attempt to remove the offending elements, and see if ultimately this is merely an issue with serializing non-ascii/multibyte characters. I suspect the AMPCamp tutorial was only tested in local mode, where there's less serialization/deserialization happening.
...

Rick Moritz

unread,
Jun 3, 2015, 8:02:25 AM6/3/15
to spark...@googlegroups.com, rah...@gmail.com
And it looks like I just got confirmation, that double-wide characters are not the issue. Identically to Chris' case I now get some shorter strings, where the error log shows me the appended \0-nul bytes. I presume those are caused by deserializing the strings out of the array, probably using a wrong boundary setting and reading beyond the end of the array, or a similar problem.
Since the entries that are hit are usually at similar positions in the files, it is alos possible, that reparition(2) attempts to cut at certain positions in the file, then tries to re-extend the String to it's original length but the place where it expects to get the end of the String from only gives \0's.
Could this be due to a communication issue between nodes? I will attempt to look closer at the repartition code to figure out where this is going wrong.

The following example should illustrate what I mean:

actual file line content:
106355  Brassica        1397901459      {{taxobox|image = Brassica rapa plant.jpg|image_caption = ''[[Brassica rapa]]''|regnum = [[Plantae]]|unranke
d_divisio = [[Angiosperms]]|unranked_classis = [[Eudicots]]|unranked_ordo = [[Rosids]]|ordo = [[Brassicales]]|familia = [[Brassicaceae]]|genus = '''''Brassica'''''|subdivision_ranks = Species|subdivision = See text.|}}'''''Brassica''''' ({{IPAc-en|Ë|b|r|æ|s|ɨ|k|É}}) is a genus of plants in the [[Mustard plant|mustard]] family ([[Brassicaceae]]). The members of the genus are informally known as [[cruciferous vegetables]], [[cabbages]], or [[mustard plant]]. Crops from this genus are sometimes called ''cole crops''{{mdash}}derived from the Latin ''caulis'', meaning ''stem or cabbage''.<ref name="Wordnik - caulis"/>Members of brassica commonly used for food include [[cabbage]], [[cauliflower]], [[broccoli]], [[Brussels sprouts]], and some seeds. The [[genus]] is known for its important [[agricultural]] and [[horticultural]] crops and includes a number of [[weed]]s, both of wild taxa and escapees from cultivation. It counts over 30 wild species and hybrids plus numerous [[cultivar]]s and hybrids of cultivated origin. Most are seasonal plants ([[Annual plant|annual]]s or [[Biennial plant|biennial]]s), but some are small shrubs. ''Brassica'' plants have been the subject of much scientific interest for their agricultural importance. Six particularly species[ctd...]

partial log output following error in unserialize:
embedded nul in string: '106355\tBrassica\t1397901459\t{{taxobox|image = Brassica rapa plant.jpg|image_caption = ''[[Brassica rapa]]''|regnum = [[Plantae]]|unranked_divisio = [[Angiosperms]]|unranked_classis = [[Eudicots]]|unranked_ordo = [[Rosids]]|ordo = [[Brassicales]]|familia = [[Brassicaceae]]|genus = '''''Brassica'''''|subdivision_ranks = Species|subdivision = See text.|}}'''''Brassica''''' ({{IPAc-en|?|b|r|?|s|?|k|?}}) is a genus of plants in the [[Mustard plant|mustard]] family ([[Brassicaceae]]). The members of the genus are informally known as [[cruciferous vegetables]], [[cabbages]], or [[mustard plant]]. Crops from this genus are sometimes called ''cole crops''{{mdash}}derived from the Latin ''caulis'', meaning ''stem or cabbage''.<ref name="Wordnik - caulis"/>Members of brassica commonly used for food include [[cabbage]]\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\

The issue is due to the intial partitioning [not affected by the number of partitions I pass to repartition, the number of instances is equal to the number of partitions -1 for the original number of partitions], so may even be present in the RDD directly after reading from HDFS.

Another observation: a take(parsedRDD,1) on the parsedRDD obtained using lapply according to the tutorial on the data RDD after repartioning attempt leads to the OOM situation I described. I'm not sure if any operation on a PipelinesRDD based off an RDD that had such a basic operation fail on itself can be expected to perform with any expectation, but I think it may be noteworthy nonetheless.

Hopefully this weekend I'll be able to take a closer look at the source code of repartition and go down the rabbit hole to where the cut/reassemble operation happens and what could cause them to fail.

Could multi-byte chars confuse the partitioning algorithm (because #chars != # bytes and thus length of a string != size of the corresponding byte array)?
The actual number of missing bytes is probably too high for this to be the underlying issue, but something similar appears to be happening, unless it's a locality issue and partitions don't get properly created/read because adjacent partitions are not on the same machine.

Again, thanks for any possible pointers to interesting bits of source code, or potential test scenarios to narrrow the issue down.

Best,

Rick
...

Rick Moritz

unread,
Jun 5, 2015, 12:34:18 PM6/5/15
to Shivaram Venkataraman, spark...@googlegroups.com
That was my starting point. The dataset is still the same, but obviously I started experimenting with different operations on the data, in order to explore the issue and follow your recommendation. Interestingly, caching the the base RDD and then counting on that works fine, it's only when I lapply-derive an RDD from that, does caching and subsequent counting cause an issue. The base RDD also gets correctly referenced (at least sometimes) with its name in the storage tab of the SparkGUI.
Re-partitioning on the other hand is guaranteed to fail, from what I can tell so far.

Rick Moritz

unread,
Jun 8, 2015, 8:07:17 AM6/8/15
to spark...@googlegroups.com
Well, I take it back, I just found the following snippet of code:

// NOTE: Only works for ASCII right now
def writeString(out: DataOutputStream, value: String) {
val len = value.length
out.writeInt(len + 1) // For the \0
out.writeBytes(value)
out.writeByte(0)
}

I would expect this (275-281 in SerDe) to possibly be an issue, when double-wide non-ASCII chars are being used, such as is the case in a surprising number of these wikipedia articles. Chris: could you chime in, on whether your dataset might also be "contaminated" with UTF-8 double-wides? This bit looks very suspcious for now.
I suspect that there are several ways in which this could lead to a failure, either by simply miscounting, or by actually mis-parsing the input string and seeing a nul-byte where there isn't one.
I will check the new SparkR code to see if there's been a modification from the github repo.
Shivaram: If you think I am on the wrong track, please give me a heads-up -- thanks.

Best,

Rick
...

Rick Moritz

unread,
Jun 8, 2015, 9:33:42 AM6/8/15
to spark...@googlegroups.com
Okay, from what I've seen in the current Spark sources, serialization of Strings has been moved into R code, and UTF-8 doublewides are being specifically taken into account.
If my hunch is correct, then that should cure this issue for the Spark 1.4.0 R-API.
I'll try to work with my actual dataset from now on, which shouldn't contain any doublewides, so at least with regard to that bug, I should be fine.
...

Shivaram Venkataraman

unread,
Jun 8, 2015, 1:23:52 PM6/8/15
to Rick Moritz, spark...@googlegroups.com
Rick

Many thanks for the update. I think you are on the right track in that we did have a limitation of only handling ASCII strings. However that was fixed in https://github.com/amplab-extras/SparkR-pkg/pull/208 and you should be able to use it from the old SparkR-pkg repository.Of course this was also ported to Spark-1.4, so the fix should be present there as well.

Thanks
Shivaram

--
You received this message because you are subscribed to the Google Groups "SparkR Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sparkr-dev+...@googlegroups.com.
To post to this group, send email to spark...@googlegroups.com.

Rick Moritz

unread,
Jun 9, 2015, 2:04:26 AM6/9/15
to Shivaram Venkataraman, spark...@googlegroups.com
I just took a look at that PR, and while the read-logic [readStringBytes] was modified to be UTF-8 compatible, it looks like the write logic [writeString] was not - hence the //NOTE - bit in the "current" master - SerDe. I will have to check again in the source to see if writeString gets called during cache() or repartition() [likely via writeTypes], but to me this still looks like a likely cuprit.

Shivaram Venkataraman

unread,
Jun 9, 2015, 2:09:42 AM6/9/15
to Rick Moritz, spark...@googlegroups.com
Ah I see - I can file a JIRA to also change the writeString version in the Spark repo. If you want to take a shot at making the change let me know

Shivaram
Reply all
Reply to author
Forward
0 new messages