Streaming Objects with rawSocketStream fails using a ObjectOutputStream?

44 views
Skip to first unread message

sk8...@gmx.at

unread,
Jul 5, 2013, 11:34:46 AM7/5/13
to spark...@googlegroups.com
I am trying stream some EventObjects into my JavaStreamingContext.

Therefore I initialize a thread with a list of these objects and do something like this in the run:

@Override
    public void run() {
        ServerSocket serverSocket = null;
        Socket writeSocket = null;
        try {
            serverSocket = new ServerSocket(this.port);
            writeSocket = serverSocket.accept();
            ObjectOutputStream oos = new ObjectOutputStream(writeSocket.getOutputStream());
           
            for(ArchiveHeaderEvent event : this.ahEvents) {
                Thread.sleep(1000);
                oos.writeObject(event);;
                oos.flush();
                System.out.println("event written: " + event);
            }
        } catch (UnknownHostException e) {
            LOGGER.error("exception occured", e);
        } catch (IOException e) {
            LOGGER.error("exception occured", e);
        } catch (InterruptedException e) {
            LOGGER.error("exception occured", e);
        } finally {
            try {
                writeSocket.close();
                serverSocket.close();
            } catch (IOException e) {
                LOGGER.error("resource close failed", e);
            }
        }
    }

In my streaming class I am just trying to
JavaDStream<Object> stream = streamingContext.rawSocketStream(hostName, port);
stream.print();


All I am getting is this exception, and empty streams afterwards:
17:29:19.427 [pool-2-thread-3] ERROR s.s.dstream.RawNetworkReceiver - Error receiving data
java.lang.IllegalArgumentException: null
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:311) ~[na:1.6.0_30]
    at spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:71) ~[spark-streaming_2.9.3-0.7.2.jar:na]
    at spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:109) ~[spark-streaming_2.9.3-0.7.2.jar:na]
    at spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:140) [spark-streaming_2.9.3-0.7.2.jar:na]
    at spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:136) [spark-streaming_2.9.3-0.7.2.jar:na]
    at spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:613) [spark-core_2.9.3-0.7.2.jar:na]
    at spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:613) [spark-core_2.9.3-0.7.2.jar:na]
    at spark.scheduler.ResultTask.run(ResultTask.scala:77) [spark-core_2.9.3-0.7.2.jar:na]
    at spark.scheduler.local.LocalScheduler.runTask$1(LocalScheduler.scala:76) [spark-core_2.9.3-0.7.2.jar:na]
    at spark.scheduler.local.LocalScheduler$$anon$1.run(LocalScheduler.scala:49) [spark-core_2.9.3-0.7.2.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) [na:1.6.0_30]
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) [na:1.6.0_30]
    at java.util.concurrent.FutureTask.run(FutureTask.java:138) [na:1.6.0_30]
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_30]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_30]
    at java.lang.Thread.run(Thread.java:662) [na:1.6.0_30]
Exception in thread "Thread-28" java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996)
    at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:315)
    at spark.streaming.dstream.RawNetworkReceiver$$anon$1.run(RawInputDStream.scala:56)

What am I doing wrong?
The IllegalArgumentException is thrown btw, because some capacity in the byteBuffer is < 0..
public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}


Reply all
Reply to author
Forward
Message has been deleted
0 new messages