Hi Tathagata,
Thanks for all your help.
Following up on our converstion about input stream conversion functions used in spark socket-streams...
My simple IS -> String conversion function was a one-to-one translation of the SocketReceiver.bytesToLines()
scala method you pointed out to me. Now I have a slightly more complicated converstion function which
I need some help with.
My spark driver (below) reads Tuple2<byte[], byte[]> objects from the socket.
Not a List<Tuple2>, just Tuple2 objects serialized like so, in my test data generator:
...
ObjectOutputStream o = new ObjectOutputStream(new ByteArrayOutputStream(1024 * 1024 * 512)))
for (Tuple2<byte[], byte[]> tuple : list) {
o.writeObject(tuple);
}
o.flush();
final byte[] bytes = b.toByteArray();
ByteBuffer buffer = ByteBuffer.wrap(bytes);
socketChannel.write(buffer);
...
I verified that a client-socket-channel can be used to read the expected number of Tuple2s, via
the ObjectInputStream readObject() method. So, no problem with the TCP server.
However, when I use the spark-driver below, the InputStream -> Tuple2
converstion function does iterate through all the Tuples in the InputStream,
which I verified by stepping through the debugger (running spark-master = localhost).
But the generated RDD always contains only 1/2 of the Tuple objects I send.
If I send spark 1000 Tuples, the RDD contains 500.
If I send spark 10 Tuples, the RDD contains 5.
If I send 5 Tuples, the RDD contains 2 Tuples.
Very confusing. The iterable function's getNext() method is called on each of the N Tuples
I send to the spark-driver, and N values are returned from getNext(), but the
spark RDD always contains only N/2 Tuples.
I am ready to just read objects off the socket using the java api -- to make sure I get all of them --
and then use ctx.parallelize(...) to create RDDs instead of using the spark-streaming API.
But I am sure I am using the API incorrectly; it would be better to use the
sreaming API if I can make it work for me.
Can you spot the problem in the code below?
Thanks,
Stan
import scala.Tuple2;
import spark.api.java.JavaRDD;
import spark.api.java.function.Function;
import spark.storage.StorageLevel;
import spark.streaming.Duration;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaStreamingContext;
import java.io.*;
import java.util.*;
public class SparkDriver implements Serializable
{
public static final String SPARK_MASTER = "mesos://XX.XXX.XX.XXX:5050";
public static final String SPARK_HOME = "/usr/local/spark-0.7.0";
public static final String JAR_FILE = "file:///<path-to-jar>/spark-demos.jar";
public static final String TCP_SERVER = "XX.XXX.XX.234";
private static final int TCP_PORT = 51000;
public static void main(String[] args) throws Exception
{
SparkDriver driver = new SparkDriver();
driver.stream();
}
public void stream()
{
JavaStreamingContext ssc = new JavaStreamingContext(SPARK_MASTER, "SparkDriver",
new Duration(5000), SPARK_HOME, JAR_FILE);
JavaDStream<Tuple2<byte[], byte[]>> data = ssc.socketStream(TCP_SERVER, TCP_PORT, streamToTupleConverter, StorageLevel.MEMORY_AND_DISK());
data.persist();
data.print();
data.foreach(collectTuplesFunc);
ssc.start();
}
Function collectTuplesFunc = new Function<JavaRDD<Tuple2<byte[], byte[]>>, Void>()
{
@Override
public Void call(JavaRDD rdd) throws Exception
{
System.out.printf("rdd.count=%d\n", rdd.count());
List<Tuple2<byte[], byte[]>> tuples = rdd.collect();
// for (Tuple2<byte[], byte[]> t : tuples) {
// System.out.printf("Collected Tuple -> %s \n", t);
// }
System.out.printf("Collected %d tuples.\n", tuples.size());
System.out.flush();
return null;
}
};
Function streamToTupleConverter = new Function<InputStream, Iterable<Tuple2<byte[], byte[]>>>()
{
public Iterable<Tuple2<byte[], byte[]>> call(InputStream is)
{
class IterableClass implements Iterator<Tuple2<byte[], byte[]>>, Iterable<Tuple2<byte[], byte[]>>, Serializable
{
private InputStream is;
private ObjectInputStream os = null;
private boolean hasNext = false;
private boolean done = false;
private Tuple2<byte[], byte[]> nextValue = null;
IterableClass(InputStream is)
{
try {
this.os = new ObjectInputStream(is);
} catch (EOFException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
private void getNext()
{
try {
try {
nextValue = (Tuple2<byte[], byte[]>) os.readObject();
} catch (EOFException e) {
done = true;
e.printStackTrace();
}
if (nextValue == null) {
done = true;
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
hasNext = true;
}
@Override
public boolean hasNext()
{
if (!done) {
if (!hasNext) {
next();
if (done) {
System.out.println(">>> streamToTupleConverter hasNext DONE");
if (os != null) {
try {
System.out.println(">>> streamToTupleConverter hasNext DONE CLOSING OS");
os.close();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
}
return !done;
}
@Override
public Tuple2<byte[], byte[]> next()
{
if (done) {
throw new NoSuchElementException("End of InputStream");
}
if (!hasNext) {
getNext();
}
hasNext = false;
return nextValue;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public Iterator<Tuple2<byte[], byte[]>> iterator()
{
return this;
}
}
IterableClass myIterable = new IterableClass(is);
System.out.println(">>> streamToTupleConverter returning myIterable");
return myIterable;
}
};
}
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++