Topology allocates only one executor in distributed mode

1,190 views
Skip to first unread message

Oleg Migrin

unread,
Jan 23, 2013, 10:09:30 AM1/23/13
to storm...@googlegroups.com
Hi all, 
We deployed topology with current parameters on 10-node cluster:

Storm 0.8.2:

spout.parallelism = 10 
topology.workers = 20

And expected receive for each spout each bolt (1:1),
De facto in storm UI we got: 10 spouts and only one bolt (!) for this topology  - one executor chain.

We've stuck on performance issues due to this single executors. Can anyone help?
For some reason topology have (Storm UI) - Num workers, Num executors, Num tasks (15,15,15). How can it be ?

Thanks for help!

--------- TOPOLOGY CONFIG: ----


[INFO] Worker has topology config {"storm.id" "topology secret name", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "nimbus.monitor.freq.secs" 10, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "cleanup" {"period" 10}, "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/tmp/storm", "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "local.nimbus.com", "storm.zookeeper.port" 2282, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "supervisor.enable" true, "storm.zookeeper.servers" ["zklocal"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" 1, "topology.kryo.decorators" (), "topology.name" "name-prod", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "env" {"name" "production"}, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" nil, "spout" {"parallelism" 10, "bandwidth" 100}, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" {"storm.trident.topology.TransactionAttempt" nil}, "topology.message.timeout.secs" 20, "task.refresh.poll.secs" 10, "topology.workers" 20, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.acker.tasks" nil, "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil

Nathan Marz

unread,
Jan 23, 2013, 1:46:02 PM1/23/13
to storm...@googlegroups.com
Can you share a screenshot of the Storm UI that indicates your problem?
--
Twitter: @nathanmarz
http://nathanmarz.com

Oleg Migrin

unread,
Jan 23, 2013, 3:08:45 PM1/23/13
to storm...@googlegroups.com
Nathan, 

Topology screenshot: http://cl.ly/image/1J0a1u2E3l2j

These screenshot was made after some experiments.
Still can't set  b-0 and b-1 bolts executors to 10.

Current topology code:
--------------------------------------------------
JmsTupleProducer jmsTuple = new JsonTupleProducer()
JmsSpout jmsSpout = new JmsSpout(config, jmsTuple)

TridentTopology topology = new TridentTopology();
TridentState state = topology.newStream(SPOUT_NAME, jmsSpout)
        .parallelismHint(10)
        .shuffle()
        .each(jmsTuple.getOutputFields(), new MapperFunction(), new Fields(FUNC_SIGNAL_MAPPER, RAW_STORED_ID))
        .each(new Fields(FUNC_SIGNAL_MAPPER, RAW_STORED_ID), new CassandraFunction(), new Fields(FUNC_CASSANDRA_SAVED))
        .parallelismHint(10)
        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) // most likely we don't need aggregation
return topology
-------------------------------------------

So I suspect something wrong with Trident topology, could you help us with correct setup?
How set executors to plain storm bolts described in documentation, but there's no docs or examples in Trident API.

Thanks for help!
-------
Oleg M.

Nathan Marz

unread,
Jan 23, 2013, 3:11:09 PM1/23/13
to storm...@googlegroups.com
The persistentAggregate is a global aggregation, so the "global" part can only run as one executor (though it's doing combining in b-0 to reduce the throughput sent to that single task).

Oleg Migrin

unread,
Jan 23, 2013, 3:40:18 PM1/23/13
to storm...@googlegroups.com, nat...@nathanmarz.com
ok, but how can we avoid global aggregation here, see that Stream builder 
have partitionPersist (..) and persistentAggregate(..) methods,but how can I exclude aggregation 
with this builder ? 

Or Trident is not appropriate for such purpose ?

-----
Oleg M.

Nathan Marz

unread,
Jan 23, 2013, 3:45:25 PM1/23/13
to storm...@googlegroups.com
What do you mean? You're explicitly saying you want to do a global aggregation there. partitionPersist isn't global, but is a completely different operation. 

I also doubt that's your bottleneck, as like I said, it combines before doing the global portion, so the throughput to that bolt will be on the order of 10 messages per batch of processing.

Oleg Migrin

unread,
Jan 23, 2013, 3:57:07 PM1/23/13
to storm...@googlegroups.com, nat...@nathanmarz.com
I don't need any sort of aggregation - I use persistentAggregate here - only because of it's TridentState instance from builder. 
We just need parallelize our tasks - CassandraFunction - is the final bolt - when I persist result of the calculation.
------
Oleg M. 

Nathan Marz

unread,
Jan 23, 2013, 3:57:43 PM1/23/13
to storm...@googlegroups.com
Sounds like you want to use partitionPersist then

Oleg Migrin

unread,
Jan 23, 2013, 3:59:24 PM1/23/13
to storm...@googlegroups.com, nat...@nathanmarz.com
Okay, thanks for help!
-----
Oleg M.
Reply all
Reply to author
Forward
0 new messages