sk8...@gmx.at
unread,Jul 5, 2013, 11:34:46 AM7/5/13Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Sign in to report message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to spark...@googlegroups.com
I am trying stream some EventObjects into my JavaStreamingContext.
Therefore I initialize a thread with a list of these objects and do something like this in the run:
@Override
public void run() {
ServerSocket serverSocket = null;
Socket writeSocket = null;
try {
serverSocket = new ServerSocket(this.port);
writeSocket = serverSocket.accept();
ObjectOutputStream oos = new ObjectOutputStream(writeSocket.getOutputStream());
for(ArchiveHeaderEvent event : this.ahEvents) {
Thread.sleep(1000);
oos.writeObject(event);;
oos.flush();
System.out.println("event written: " + event);
}
} catch (UnknownHostException e) {
LOGGER.error("exception occured", e);
} catch (IOException e) {
LOGGER.error("exception occured", e);
} catch (InterruptedException e) {
LOGGER.error("exception occured", e);
} finally {
try {
writeSocket.close();
serverSocket.close();
} catch (IOException e) {
LOGGER.error("resource close failed", e);
}
}
}
In my streaming class I am just trying to
JavaDStream<Object> stream = streamingContext.rawSocketStream(hostName, port);
stream.print();
All I am getting is this exception, and empty streams afterwards:
17:29:19.427 [pool-2-thread-3] ERROR s.s.dstream.RawNetworkReceiver - Error receiving data
java.lang.IllegalArgumentException: null
at java.nio.ByteBuffer.allocate(ByteBuffer.java:311) ~[na:1.6.0_30]
at spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:71) ~[spark-streaming_2.9.3-0.7.2.jar:na]
at spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:109) ~[spark-streaming_2.9.3-0.7.2.jar:na]
at spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:140) [spark-streaming_2.9.3-0.7.2.jar:na]
at spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:136) [spark-streaming_2.9.3-0.7.2.jar:na]
at spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:613) [spark-core_2.9.3-0.7.2.jar:na]
at spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:613) [spark-core_2.9.3-0.7.2.jar:na]
at spark.scheduler.ResultTask.run(ResultTask.scala:77) [spark-core_2.9.3-0.7.2.jar:na]
at spark.scheduler.local.LocalScheduler.runTask$1(LocalScheduler.scala:76) [spark-core_2.9.3-0.7.2.jar:na]
at spark.scheduler.local.LocalScheduler$$anon$1.run(LocalScheduler.scala:49) [spark-core_2.9.3-0.7.2.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) [na:1.6.0_30]
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) [na:1.6.0_30]
at java.util.concurrent.FutureTask.run(FutureTask.java:138) [na:1.6.0_30]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_30]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_30]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_30]
Exception in thread "Thread-28" java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:315)
at spark.streaming.dstream.RawNetworkReceiver$$anon$1.run(RawInputDStream.scala:56)
What am I doing wrong?
The IllegalArgumentException is thrown btw, because some capacity in the byteBuffer is < 0..
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}