C++ spark integration

6,791 views
Skip to first unread message

cearl

unread,
Mar 14, 2012, 11:46:26 PM3/14/12
to Spark Users
Hi,
I am wondering if any one has done work with using C++ within the
spark framework. For example, I could imagine doing such a thing by
forking a process, then constructing a closure that would pass data to
running C/C++ application through filesystem pipe or other mechanism.
One can imagine others. I am wondering if there is any data or
intuition to argue one approach over another.

Charles

Matei Zaharia

unread,
Mar 15, 2012, 1:10:24 AM3/15/12
to spark...@googlegroups.com
Hi Charles,

If you want to pass data through an external C++ program, there's already an operation on Spark's distributed datasets (RDDs) called pipe(). You give it a command, and it will launch that process, pass each element of the dataset to its stdin, and return an RDD of strings representing the lines printed to the process's stdout.

I guess the other thing you can do, of course, is to write a native library and access it through JNI. You can do that the same way you'd call it through Java.

Matei

cearl

unread,
Mar 23, 2012, 12:27:31 PM3/23/12
to Spark Users
Matei,
Thanks! Hope this thread is not lost...
I'm guessing it is case that if I send a byte array in the pipe, that
I could access it as char */void * on the native (C++) side? I should
be seeing soon enough...
Thanks
Charles

Matei Zaharia

unread,
Mar 23, 2012, 2:21:52 PM3/23/12
to spark...@googlegroups.com
That might work, but you could get weird stuff due to character encoding, because Java probably won't like writing out arbitrary bytes. To be safe I would encode them somehow (e.g. base64 encoding), although that's inefficient.

You might also be able to write a version of pipe() / PipedRDD that takes byte arrays instead of strings. Just write them directly to the subprocess's InputStream. If you'd like to write that and send a pull request on GitHub, I can incorporate it into the code.

Finally, you could also call your C++ code through JNI, but beware that there's some stuff you need to learn if you haven't used JNI before (http://java.sun.com/docs/books/jni/html/jniTOC.html).

Matei

cearl

unread,
Mar 23, 2012, 3:06:06 PM3/23/12
to Spark Users
Matei,
Thanks. I will take your second suggestion.
I had an as-yet-unresolved segfault when trying to use JNI to
integrate a large physics package into Hadoop. My take away was to
stay away from JNI for complex code bases.

On Mar 23, 2:21 pm, Matei Zaharia <ma...@eecs.berkeley.edu> wrote:
> That might work, but you could get weird stuff due to character encoding, because Java probably won't like writing out arbitrary bytes. To be safe I would encode them somehow (e.g. base64 encoding), although that's inefficient.
>
> You might also be able to write a version of pipe() / PipedRDD that takes byte arrays instead of strings. Just write them directly to the subprocess's InputStream. If you'd like to write that and send a pull request on GitHub, I can incorporate it into the code.

Ah, ok. This will be the evening project.
Thanks

cearl

unread,
Mar 25, 2012, 12:45:34 PM3/25/12
to Spark Users
Matei,
Looking at PipedRDD as a starting point, I believe that I can try
either
// Direct write alternative
for(elem <- parent.iterator(split))
proc.getOutputStream(elem)
or
// Char encoding alternative
val out = new PrintWriter(proc.getOutputStream)
for(elem <- parent.iterator(split))
out.println(Base64.encodeBase64(elem))
out.close()

For reading from the subprocess, I suppose that I could either stick
with
Source.fromInputStream(proc.getInputStream).getLines
as long as the process writes strings to stdout or
again read into a Array[Byte] ?
Is that right?
In doing reads from the BytesWritable, I have had to account for
Endian-ness of the original data but that now seems to work.
Charles

Matei Zaharia

unread,
Mar 25, 2012, 8:52:01 PM3/25/12
to spark...@googlegroups.com
This sounds good, but if you're going to go with raw bytes, then calling getLines on the stream won't work. You would probably need to define a storage format for the records, such as writing out first 4 bytes for the length of the record, then some bytes for the data. You would then need to split the stream among those boundaries in both Java and C++. It's easy in Java/Scala with DataInputStream. I believe that also deals with endianness.

If you just wanted to go with base64 encoding, you can probably do it on top of the existing API without making a new RDD subclass. Just call a map() to turn your byte arrays into base64-encoded strings, then call pipe(), then map() the results of that.

Matei

cearl

unread,
Mar 27, 2012, 12:08:24 PM3/27/12
to Spark Users
Matei,
Thanks. I ended up going with the second approach just to test things
end to end. This works -- and looks to be something like 2x better
than pipes.
I will return to the first approach shortly...
Charles
Reply all
Reply to author
Forward
0 new messages