Storm v0.8.2, 0MQ v2.1.7, Seeing java.lang.RuntimeException: Should always receive two-part ZMQ messages with high loads

406 views
Skip to first unread message

Kiran

unread,
Mar 2, 2013, 3:18:23 PM3/2/13
to storm...@googlegroups.com

3 machine cluster
-----------------
Nimbus and UI on a machine
2 supervisors on other 2 machines with 1 worker per machine

Configuration of one of the identical machines
----------------------------------------------
MemTotal:       198452480 kB (189 GB)
Intel(R) Xeon(R) CPU E5-2620 0 @ 2.00GHz
cache size : 15360 KB
cpu cores : 6

Software/Platform details:
----------------------------------
Java.version=1.6.0_24 (Sun Microsystems Inc.)
Red Hat Enterprise Linux Server release 6.1 (Santiago)
0MQ : v2.1.7
jzmq : From nathan's github branch
Storm: v0.8.2

We installed 0MQ and jzmq using the source and used the storm binary. We used slightly modified word count topology(attached) for testing out. We had a parallelism of 25 for both split sentence bolt and word count bolt. We kept on tweaking the spout parameters. Please find the details and the inferences below.

1) 2 spout threads emitting infinitely(once per nextTuple call) with no sleep

Fails with the following exception after running for about 2 minutes.

java.lang.RuntimeException: Should always receive two-part ZMQ messages
at backtype.storm.messaging.zmq.ZMQConnection.recv_with_flags(zmq.clj:36)
at backtype.storm.messaging.loader$launch_receive_thread_BANG_$fn__1629$fn__1630.invoke(loader.clj:38)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:662)


followed by another exception as shown below

java.io.FileNotFoundException: File '/home/arinto/local-var/storm/supervisor/stormdist/fed_word_count_two-4-1362153302/stormconf.ser' does not exist
    at org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:137)
    at org.apache.commons.io.FileUtils.readFileToByteArray(FileUtils.java:1135)
    at backtype.storm.config$read_supervisor_storm_conf.invoke(config.clj:138)
    at backtype.storm.daemon.supervisor$fn__4793.invoke(supervisor.clj:414)
    at clojure.lang.MultiFn.invoke(MultiFn.java:177)
    at backtype.storm.daemon.supervisor$sync_processes$iter__4684__4688$fn__4689.invoke(supervisor.clj:249)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$dorun.invoke(core.clj:2725)
    at clojure.core$doall.invoke(core.clj:2741)
    at backtype.storm.daemon.supervisor$sync_processes.invoke(supervisor.clj:237)
    at clojure.lang.AFn.applyToHelper(AFn.java:161)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.core$apply.invoke(core.clj:603)
    at clojure.core$partial$fn__4070.doInvoke(core.clj:2343)
    at clojure.lang.RestFn.invoke(RestFn.java:397)
    at backtype.storm.event$event_manager$fn__2507.invoke(event.clj:24)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:722)
2013-03-01 16:10:09 util [INFO] Halting process: ("Error when processing an event")

Other inferences: The latency was really high right from the beginning and the workers died once the exceptions were thrown


2) 2 spout threads emitting infinitely(once per nextTuple call) with 2 milli second sleep(once per nextTuple call)
This seemed to work without issues for about 10 mins after which we killed the topology.

We're blocked on this issue. Any pointers on which component leads to the issue and how to resolve it would be really appreciated.

Thanks,
Kiran
RSSpout.java
WCTopo.java

Nathan Marz

unread,
Mar 2, 2013, 4:33:03 PM3/2/13
to storm...@googlegroups.com
I haven't seen this. Two things:

1. Can you try with 0mq 2.1.4?
2. Are you 100% certain that nothing else is talking to the worker ports (like a network ping or anything like that)?

--
You received this message because you are subscribed to the Google Groups "storm-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Twitter: @nathanmarz
http://nathanmarz.com

Kiran

unread,
Mar 5, 2013, 1:54:31 PM3/5/13
to storm...@googlegroups.com, nat...@nathanmarz.com
Thanks a lot for the response Nathan. Will keep you posted about our findings.

Thanks,
Kiran

Aleksandar Vitorovic

unread,
Apr 16, 2013, 8:29:23 AM4/16/13
to storm...@googlegroups.com, nat...@nathanmarz.com
Hi Kiran,

I am facing exactly the same issue:
2013-04-15 22:13:05 util [ERROR] Async loop died!

java.lang.RuntimeException: Should always receive two-part ZMQ messages
    at backtype.storm.messaging.zmq.ZMQConnection.recv_with_flags(zmq.clj:36)
    at backtype.storm.messaging.protocol$recv.invoke(protocol.clj:18)
    at backtype.storm.messaging.loader$launch_receive_thread_BANG_$fn__1629$fn__1630.invoke(loader.clj:29)

    at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:722)
2013-04-15 22:13:05 util [INFO] Halting process:
...
2013-04-15 22:13:57 util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
    at backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)

    at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.NullPointerException
    at plan_runner.thetajoin.dynamic.storm_component.ThetaJoinerDynamicAdvisedEpochs.execute(Unknown Source)
    at backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
    at backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
    ... 6 more
2013-04-15 22:13:57 executor [ERROR]


 I have a Solaris cluster and here is what I've installed:
1) Storm 0.8.2 which I compiled from the source
2) ZeroMQ 2.1.7
3) jzmq from Nathan's repository

Did you try ZeroMQ 2.1.4? What was the outcome?

Thanks!

Kiran Nagasubramanian

unread,
Apr 16, 2013, 11:55:47 AM4/16/13
to storm...@googlegroups.com, nat...@nathanmarz.com
A couple of issues including this one have been fixed in v0.8.3-wip3. Nathan would be able to confirm.

Thanks,
Kiran

You received this message because you are subscribed to a topic in the Google Groups "storm-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/storm-user/L2W_nxzZm50/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to storm-user+...@googlegroups.com.

Aleksandar Vitorovic

unread,
Apr 17, 2013, 2:13:45 AM4/17/13
to storm...@googlegroups.com, nat...@nathanmarz.com
Thank you very much for the answer. I tried this, but I induced another problem now :( Namely, my topology freezes. The only exceptions are in zookeeper logs:

2013-04-16 20:38:51,074 - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@634] - EndOfStreamException: Unable to read additional data from client sessionid 0x13e1423c9d20069, likely client has closed socket
...
2013-04-16 20:38:09,551 - INFO  [WorkerReceiver Thread:FastLeaderElection@496] - Notification: 1 (n.leader), 0 (n.zxid), 1 (n.round), LOOKING (n.state), 1 (n.sid), LOOKING (my state)

2013-04-16 20:38:09,563 - WARN  [WorkerSender Thread:QuorumCnxManager@384] - Cannot open channel to 2 at election address icdatasrv7-priv/172.16.100.70:3888
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:701)
    at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:115)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:371)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.toSend(QuorumCnxManager.java:340)
    at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.process(FastLeaderElection.java:360)
    at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.run(FastLeaderElection.java:333)
    at java.lang.Thread.run(Thread.java:722)
2013-04-16 20:38:09,772 - WARN  [QuorumPeer:/0:0:0:0:0:0:0:0:2181:QuorumCnxManager@384] - Cannot open channel to 2 at election address icdatasrv7-priv/172.16.100.70:3888
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:701)
    at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:115)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:371)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:404)
    at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:688)
    at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:622)
2013-04-16 20:38:09,773 - INFO  [QuorumPeer:/0:0:0:0:0:0:0:0:2181:FastLeaderElection@697] - Notification time out: 400
2013-04-16 20:38:09,836 - INFO  [WorkerReceiver Thread:FastLeaderElection@496] - Notification: 2 (n.leader), 0 (n.zxid), 1 (n.round), LOOKING (n.state), 2 (n.sid), LOOKING (my state)

As it seems that the issue appears only when zmq.hwm is used, I will revert to 0.8.2 and I will not use this option.

@Nathan: In some other post it is said that jzmq.jar in $STORM/lib directory should be updated. Is this mandatory if we are using the recommended versions of zeroMQ and jzmq? I guess not, even if we recompiled the libraries from source code.

Aleksandar Vitorovic

unread,
Apr 18, 2013, 6:11:21 AM4/18/13
to storm...@googlegroups.com, nat...@nathanmarz.com
Hi Kiran,

In order to make this work, did you have to:
1) Downgrade to 0mq 2.1.4?
2) upload your jzmq.jar into $STORM/lib directory?

Thanks!

Kiran Nagasubramanian

unread,
Apr 18, 2013, 1:23:23 PM4/18/13
to storm...@googlegroups.com
We dint do any of those ZMQ changes...We used v2.1.7 as it is...Btw, I could see that you've a zookeeper quorum setup. Do you have three or more nodes in the zk cluster and did you make sure that majority of them are up(as per zk's spec)?

Thanks,
Kiran


Aleksandar Vitorovic

unread,
Apr 20, 2013, 5:06:37 PM4/20/13
to storm...@googlegroups.com
 Hi Kiran,

Thanks for the answer. I am using 2 zookeeper servers as I don't want slowdowns due to replication. The cluster is dedicated, so failures should not be so often, and if they happen I could restart the topology. Performance is the priority. By the way, as far I understand zookeeper specification, it's not mandatory to use 3 servers, they only say there is no fault-tolerance if less than 3 servers are used.
Reply all
Reply to author
Forward
0 new messages