Setting up java socketTextStream vs socketStream

1,739 views
Skip to first unread message

sbur...@gmail.com

unread,
May 21, 2013, 6:28:47 PM5/21/13
to spark...@googlegroups.com
Can anyone tell me how to correctly set up a Java socketStream or rawSocketStream?

Everything works fine using a socketTextStreambut if I switch to a socketStream or rawSocketStream, I get this Exception:

java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute
at spark.streaming.DStreamGraph.validate(DStreamGraph.scala:126)
at spark.streaming.StreamingContext.validate(StreamingContext.scala:437)
at spark.streaming.StreamingContext.start(StreamingContext.scala:454)
at spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:568)

Creating a JavaDStream via socketTextStream the works fine, but I need to  receive raw bytes.  
Why can my socketTextStream read data from the tcp server, but not a socketSteam?




Tathagata Das

unread,
May 21, 2013, 7:20:44 PM5/21/13
to spark...@googlegroups.com

Can you give the complete spark streaming program with socketstream that you are trying out?

--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

sbur...@gmail.com

unread,
May 21, 2013, 9:17:09 PM5/21/13
to spark...@googlegroups.com
Gladly, thanks for asking.
The class below can reproduce the problem (here).  The line comments show what does work and does not.

#1 -- Works fine
#2 -- Cannot find registered output streams
#3 -- Can you show me how to create a simple conversion Function?




import spark.api.java.function.Function;
import spark.storage.StorageLevel;
import spark.streaming.Duration;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaStreamingContext;

public class ResultsStreamProcessor {

    static final String SPARK_HOME = "/usr/local/spark-0.7.0";
    static final String SPARK_MASTER = "mesos://10.XXX.XX.XXX:5050";
    static final String JAR_FILE = "file:///<path-to-jar>/spark-demos.jar";
    static final String TCP_SERVER = "10.XXX.XX.XXX";
    static final int TCP_PORT = 65000;
    static final long DURATION = 5000;

    public static void main(String[] args) {

        JavaStreamingContext ssc = new JavaStreamingContext(
                SPARK_MASTER, "ResultsStreamProcessor",
                new Duration(DURATION), SPARK_HOME, JAR_FILE);

// #1
        // THIS WORKS (socketTextStream)
        // JavaDStream<String> data = ssc.socketTextStream(TCP_SERVER, TCP_PORT);
        // data.print();


// #2
        // THIS DOES NOT WORK
        // Below is the code which prompted my post.  (rawSocketStream)
        // It finds "No output streams registered" in DStreamGraph.scala (line 126)
        // Fyi... I was trying to make it connect to a netcat server.
        JavaDStream<Object> data = ssc.rawSocketStream(TCP_SERVER, TCP_PORT, StorageLevel.MEMORY_AND_DISK());


// #3
        // How do I define the conversion function needed to create the socketStream?
        // First, I just want to read text I type into a netcat server terminal.
        // Later, I want to convert incoming HBase rows from a coprocessor.
        // Function converter = null;    
        // JavaDStream<Object> data = ssc.socketStream(TCP_SERVER, TCP_PORT, converter, StorageLevel.MEMORY_AND_DISK());
        // data.print();

        System.out.println("JavaDStream.count = " + data.count());
        ssc.start();

Tathagata Das

unread,
May 21, 2013, 9:36:56 PM5/21/13
to spark...@googlegroups.com
Comments inline. Hope this helps! 


Just adding data.print() would make it work. Essentially, the problem is that unless you tell Spark Streaming what to do with the data received (in this case "print it"), it does not know what to do. That's why its complaining. It needs at least one output operation (e.g., foreach, saveAsHadoopFiles) for starting the streaming computation.
 


// #3
        // How do I define the conversion function needed to create the socketStream?
        // First, I just want to read text I type into a netcat server terminal.
        // Later, I want to convert incoming HBase rows from a coprocessor.
        // Function converter = null;    
        // JavaDStream<Object> data = ssc.socketStream(TCP_SERVER, TCP_PORT, converter, StorageLevel.MEMORY_AND_DISK());
        // data.print();

If you want to receive the records in "\n" delimited text format, then textSocketStream should be fine, and you can use DStream.map to transform text into the necessary record structure.
If you want to receive the records in some custom byte stream, then you need to specify the converter function, which take a InputStream as input and returns an java.lang.Iterable[T] (T is the object type of the DStream). getNext() on this Iterable object should continuously return objects received by the input stream and parsed into Java object of type T. To get an idea of how to implement this you could take a look at the byteStream to \n delimited text convertor function that socketTextStream uses - SocketReceiver.bytesToLines() . It's Scala code but you probably can get a general idea of how to do it. 

sbur...@gmail.com

unread,
May 21, 2013, 9:57:42 PM5/21/13
to spark...@googlegroups.com
Yes, this helps a lot.  

#2 works, and the scala code does show me how to implement converter functions (#3)

I will be reading very large byte[] streams of  non_'\n'_delimited  structured records, so I won't use the socketTextStream.

Thanks again!
-s

Tathagata Das

unread,
May 22, 2013, 5:21:14 AM5/22/13
to spark...@googlegroups.com

Great. Let me know if you need any more help in figuring out the converter.

sbur...@gmail.com

unread,
May 22, 2013, 5:40:38 PM5/22/13
to spark...@googlegroups.com
Hi again TD,

Your pointer to the scala Iterable function helped me out a lot.  Could you look at the spark-driver
code below (the conversion Function) and tell me what might be incorrect, could be improved, or made 
more robust?

It seems to work -- I see the function's trace statements in the worker logs, I can leave the
tcp-server running while I repeatedly restart the spark-driver, reconnect, and process incoming data.
No errors in any spark-logs or from the driver...  Got to be too good to be true!  
What are some of the pitfalls I need to watch out for?

And what is the expected behaviour when a function throws a RuntimeException?  Spark worker crash?

Thanks,
-s



import spark.api.java.function.Function;
import spark.storage.StorageLevel;
import spark.streaming.Duration;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaStreamingContext;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.NoSuchElementException;

@SuppressWarnings("serial")
public class ResultsStreamProcessor {

    static final String SPARK_HOME = "/usr/local/spark-0.7.0";
    static final String SPARK_MASTER = "mesos://"XXX.XXX.XXX.234":5050";
    static final String JAR_FILE = "file:///<path-to-your-jar>/spark-demos.jar";
    static final String TCP_SERVER = "XXX.XXX.XXX.234";
    static final int TCP_PORT = 65002;
    static final long DURATION = 5000;

    public static void main(String[] args) {

        JavaStreamingContext ssc = new JavaStreamingContext(
                SPARK_MASTER, "ResultsStreamProcessor",
                new Duration(DURATION), SPARK_HOME, JAR_FILE);

        Function converterExample3 = new Function<InputStream, Iterable<String>>() {
            public Iterable<String> call(InputStream is) {

                System.out.println(">>> Entered Function converterExample3");

                class IterableClass implements Iterator<String>, Iterable<String> {
                    private InputStream is;
                    private BufferedReader br = null;
                    private boolean hasNext = false;
                    private boolean done = false;
                    private String nextValue = null;

                    IterableClass(InputStream is) {
                        this.is = is;
                        initStreamReader();
                    }

                    private void initStreamReader() {
                        try {
                            this.br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
                            System.out.println(">>> Function converterExample3 initStreamReader: " + br);
                            System.out.println(">>> Function converterExample3 initStreamReader.ready?: " + br.ready());
                        } catch (IOException e) {
                            e.printStackTrace();
                            throw new RuntimeException(e);
                        }
                    }

                    private void getNext() {
                        try {
                            System.out.println(">>> Function converterExample3 getNext");
                            nextValue = br.readLine();
                            if (nextValue == null) {
                                done = true;
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            throw new RuntimeException(e);
                        }
                        hasNext = true;
                    }

                    @Override
                    public boolean hasNext() {
                        System.out.println(">>> Function converterExample3 hasNext");
                        if (!done) {
                            if (!hasNext) {
                                next();
                                if (done) {
                                    if (br != null) {
                                        try {
                                            br.close();
                                        } catch (IOException e) {
                                            e.printStackTrace();
                                            throw new RuntimeException(e);
                                        }
                                    }
                                }
                            }
                        }
                        return !done;
                    }

                    @Override
                    public String next() {
                        System.out.println(">>> Function converterExample3 next");
                        if (done) {
                            throw new NoSuchElementException("End of InputStream");
                        }
                        if (!hasNext) {
                            getNext();
                        }
                        hasNext = false;
                        return nextValue;
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }

                    @Override
                    public Iterator<String> iterator() {
                        return this;
                    }
                }

                IterableClass myIterable = new IterableClass(is);
                System.out.println(">>> Function converterExample3 returning myIterable");
                return myIterable;
            }
        };

        JavaDStream<Object> data = ssc.socketStream(TCP_SERVER, TCP_PORT, converterExample3, StorageLevel.MEMORY_AND_DISK());

        data.cache();
        data.print();
        System.out.println("JavaDStream = " + data);
        System.out.println("JavaDStream.count = " + data.count());
        ssc.start();
    }
}



Tathagata Das

unread,
May 22, 2013, 7:50:22 PM5/22/13
to spark...@googlegroups.com
Responses inline. 


On Wed, May 22, 2013 at 2:40 PM, <sbur...@gmail.com> wrote:
Hi again TD,

Your pointer to the scala Iterable function helped me out a lot.  Could you look at the spark-driver
code below (the conversion Function) and tell me what might be incorrect, could be improved, or made 
more robust?

This code seems to be a one-to-one translation of the Scala code in SocketReceiver.bytesToLines(). And looks correct to me, and since you are getting the data, its working as it should. That's good! However, your use case was different from \n delimited lines, isn't it? So I am guessing you will write the necessary type of conversion next, now that this simple conversion is working. Is that your plan?
 
It seems to work -- I see the function's trace statements in the worker logs, I can leave the
tcp-server running while I repeatedly restart the spark-driver, reconnect, and process incoming data.
No errors in any spark-logs or from the driver...  Got to be too good to be true!  
What are some of the pitfalls I need to watch out for?
Some pitfalls you need to watch out for - 
(i) When you are doing your complex conversion, make sure you dont copy data internally multiple times.
(ii) (answering the question below) Figure out which exceptions you want to gracefully handle. If you throw an exception that is not caught, then the receiver Spark task should crash and be restarted in a different location. 

sbur...@gmail.com

unread,
May 28, 2013, 2:04:22 PM5/28/13
to spark...@googlegroups.com
Hi Tathagata,

Thanks for all your help.  
Following up on our converstion about input stream conversion functions used in spark socket-streams...

My simple IS -> String conversion function was a one-to-one translation of the SocketReceiver.bytesToLines()
scala method you pointed out to me.  Now I have a slightly more complicated converstion function which
I need some help with.

My spark driver (below) reads Tuple2<byte[], byte[]> objects from the socket.
Not a List<Tuple2>, just Tuple2 objects serialized like so, in my test data generator:

   ...
   ObjectOutputStream o = new ObjectOutputStream(new ByteArrayOutputStream(1024 * 1024 * 512)))
   for (Tuple2<byte[], byte[]> tuple : list) {
       o.writeObject(tuple);
   }
   o.flush();
   final byte[] bytes = b.toByteArray();
   ByteBuffer buffer = ByteBuffer.wrap(bytes);
   socketChannel.write(buffer);
   ...
 

I verified that a client-socket-channel can be used to read the expected number of Tuple2s, via
the ObjectInputStream readObject() method.  So, no problem with the TCP server.

However, when I use the spark-driver below, the InputStream -> Tuple2
converstion function does iterate through all the Tuples in the InputStream,
which I verified by stepping through the debugger (running spark-master = localhost).
But the generated RDD always contains only 1/2 of the Tuple objects I send.
If I send spark 1000 Tuples, the RDD contains 500.
If I send spark 10 Tuples, the RDD contains 5.
If I send 5 Tuples, the RDD contains 2 Tuples.

Very confusing.  The iterable function's getNext() method is called on each of the N Tuples 
I send to the spark-driver, and N values are returned from getNext(), but the
spark RDD always contains only N/2 Tuples.

I am ready to just read objects off the socket using the java api -- to make sure I get all of them --
and then use ctx.parallelize(...) to create RDDs instead of using the spark-streaming API.
But I am sure I am using the API incorrectly; it would be better to use the
sreaming API if I can make it work for me.

Can you spot the problem in the code below?
Thanks,
Stan
   



import scala.Tuple2;
import spark.api.java.JavaRDD;
import spark.api.java.function.Function;
import spark.storage.StorageLevel;
import spark.streaming.Duration;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaStreamingContext;

import java.io.*;
import java.util.*;

public class SparkDriver implements Serializable
{
  public static final String SPARK_MASTER = "mesos://XX.XXX.XX.XXX:5050";
  public static final String SPARK_HOME = "/usr/local/spark-0.7.0";
  public static final String JAR_FILE = "file:///<path-to-jar>/spark-demos.jar";
  public static final String TCP_SERVER = "XX.XXX.XX.234";
  private static final int TCP_PORT = 51000;

  public static void main(String[] args) throws Exception
  {
    SparkDriver driver = new SparkDriver();
    driver.stream();
  }

  public void stream()
  {
    JavaStreamingContext ssc = new JavaStreamingContext(SPARK_MASTER, "SparkDriver",
        new Duration(5000), SPARK_HOME, JAR_FILE);

    JavaDStream<Tuple2<byte[], byte[]>> data = ssc.socketStream(TCP_SERVER, TCP_PORT, streamToTupleConverter, StorageLevel.MEMORY_AND_DISK());

    data.persist();
    data.print();
    data.foreach(collectTuplesFunc);

    ssc.start();   
  }

  Function collectTuplesFunc = new Function<JavaRDD<Tuple2<byte[], byte[]>>, Void>()
  {
    @Override
    public Void call(JavaRDD rdd) throws Exception
    {
      System.out.printf("rdd.count=%d\n", rdd.count());

      List<Tuple2<byte[], byte[]>> tuples = rdd.collect();
//      for (Tuple2<byte[], byte[]> t : tuples) {
//        System.out.printf("Collected Tuple -> %s  \n", t);
//      }
      System.out.printf("Collected %d tuples.\n", tuples.size());
      System.out.flush();
      return null;
    }
  };

  Function streamToTupleConverter = new Function<InputStream, Iterable<Tuple2<byte[], byte[]>>>()
  {

    public Iterable<Tuple2<byte[], byte[]>> call(InputStream is)
    {

      class IterableClass implements Iterator<Tuple2<byte[], byte[]>>, Iterable<Tuple2<byte[], byte[]>>, Serializable
      {
        private InputStream is;

        private ObjectInputStream os = null;

        private boolean hasNext = false;
        private boolean done = false;
        private Tuple2<byte[], byte[]> nextValue = null;

        IterableClass(InputStream is)
        {
          this.is = is;
          try {

            this.os = new ObjectInputStream(is);

          } catch (EOFException e) {

            e.printStackTrace();

          } catch (IOException e) {

            e.printStackTrace();

          }
        }

        private void getNext()
        {
          try {
            try {

              nextValue = (Tuple2<byte[], byte[]>) os.readObject();

            } catch (EOFException e) {
              done = true;
              e.printStackTrace();
            }

            if (nextValue == null) {
              done = true;
            }

          } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
          }
          hasNext = true;
        }

        @Override
        public boolean hasNext()
        {
          if (!done) {
            if (!hasNext) {
              next();
              if (done) {
                System.out.println(">>> streamToTupleConverter hasNext DONE");
                if (os != null) {
                  try {
                    System.out.println(">>> streamToTupleConverter hasNext DONE CLOSING OS");
                    os.close();
                  } catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                  }
                }
              }
            }
          }
          return !done;
        }

        @Override
        public Tuple2<byte[], byte[]> next()
        {
          if (done) {
            throw new NoSuchElementException("End of InputStream");
          }
          if (!hasNext) {
            getNext();
          }
          hasNext = false;
          return nextValue;
        }

        @Override
        public void remove()
        {
          throw new UnsupportedOperationException();
        }

        @Override
        public Iterator<Tuple2<byte[], byte[]>> iterator()
        {
          return this;
        }
      }

      IterableClass myIterable = new IterableClass(is);
      System.out.println(">>> streamToTupleConverter returning myIterable");

      return myIterable;
    }
  };

}


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Tathagata Das

unread,
May 29, 2013, 6:46:29 PM5/29/13
to spark...@googlegroups.com
From a casual glance its hard to see why this would not work. Also its weird that you get exactly half. Have tried sending simple objects, say Int, instead of Tuple2<byte[], byte[]> through the same server-receiver setup? If you still dont seem to get all the values, then it may be easier analyze that by actually seeing which numbers you are getting which you arent getting. 

If that doesnt give any hints either, then can you give me the Java files of the SparkDriver and the standalone tcp server sending Ints, so that I can try running it myself and see whats going wrong?

TD

sbur...@gmail.com

unread,
May 29, 2013, 7:50:43 PM5/29/13
to spark...@googlegroups.com
I will try this and get back to you in 1-2 days.
Thanks,
S

sbur...@gmail.com

unread,
Jun 3, 2013, 4:39:40 PM6/3/13
to spark...@googlegroups.com
Thanks again for all you help, TD.

Here was the bug in my scala->java stream converter function:

@Override
public boolean hasNext() {
System.out.println(">>> Function converterExample3 hasNext");
if (!done) {
   if (!hasNext) {
getNext(); // My bug --> I was calling 'next()' instead of 'getNext()'
if (done) {
   if (br != null) {
       try {
           br.close();
       } catch (IOException e) {
           e.printStackTrace();
           throw new RuntimeException(e);
       }
   }
}
   }
}
return !done;
}

It works now.
-Stan

Tathagata Das

unread,
Jun 4, 2013, 3:19:04 AM6/4/13
to spark...@googlegroups.com
Hah! I remember looking at that line, but I missed the bug as well. I am glad you got it working!

TD
Reply all
Reply to author
Forward
0 new messages