storm-kafka spout consumes messages at slower rate than its produced

901 views
Skip to first unread message

Piyush Rai

unread,
Oct 31, 2013, 8:04:40 AM10/31/13
to storm...@googlegroups.com
Hi everyone,

I was just trying out kafka-storm spout mentioned here https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka and the configuration I used are mentioned as below.

    
BrokerHosts brokerHosts = KafkaConfig.StaticHosts.fromHostString(
ImmutableList.of("localhost"), 1);
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test","/kafkastorm", "discovery"); 
spoutConfig.scheme = new StringScheme();
spoutConfig.stateUpdateIntervalMs = 1000;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TridentTopology topology = new TridentTopology();
InetSocketAddress inetSocketAddress = new InetSocketAddress( "localhost", 6379);

TridentState wordsCount = topology
.newStream(SPOUT_FIRST, kafkaSpout)
.parallelismHint(1)
.each(new Fields("str"), new TestSplit(), new Fields("words"))
.groupBy(new Fields("words"))
.persistentAggregate( RedisState.transactional(inetSocketAddress),new Count(), new Fields("counts"))
                                .parallelismHint(100);

Config conf = new Config();
conf.setMaxTaskParallelism(200);
// conf.setDebug( true );
// conf.setMaxSpoutPending(20);

// This topology can only be run as local because it is a toy example
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("symbolCounter", conf, topology.build());

But the speed at which the above spout fetched messages from the Kafka topic is around 7000/seconds but I am expected a load of around 100000 messages per seconds. I have tried various options for increasing the fetch buffer size in spoutConfig with no visible results. 

Has any faced with the similar type of issue where he is not able to fetch the kafka topic via storm with the speed at which the producer produces messages?

Regards,
Piyush

Homer Strong

unread,
Oct 31, 2013, 6:26:57 PM10/31/13
to storm...@googlegroups.com
One option is to increase your spout's parallelism. If you have multiple Kafka partitions available then the spouts will consume from respective partitions in parallel.

Piyush Rai

unread,
Nov 4, 2013, 1:04:38 AM11/4/13
to storm...@googlegroups.com
Following is the log message which I am getting 

48205 [Thread-136] INFO  storm.kafka.PartitionManager  - Fetched 100000 messages from Kafka: localhost:0
50531 [Thread-136] INFO  storm.kafka.PartitionManager  - Added 100000 messages from Kafka: localhost:0 to internal buffers
50533 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
50533 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
51534 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
51534 [Thread-136] INFO  storm.kafka.PartitionManager  - Writing committed offset to ZK: 303749043
51574 [Thread-136] INFO  storm.kafka.ZkState  - Writing /kafkastorm/discovery/localhost:9092:0 the data {topology={id=e8170d4d-1777-4fe3-9fb2-fdec02eed118, name=symbolCounter}, offset=303749043, partition=0, broker={host=localhost, port=9092}, topic=test}
51627 [Thread-136] INFO  storm.kafka.PartitionManager  - Wrote committed offset to ZK: 303749043
51627 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
54535 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
54535 [Thread-136] INFO  storm.kafka.PartitionManager  - Writing committed offset to ZK: 303900996
54536 [Thread-136] INFO  storm.kafka.ZkState  - Writing /kafkastorm/discovery/localhost:9092:0 the data {topology={id=e8170d4d-1777-4fe3-9fb2-fdec02eed118, name=symbolCounter}, offset=303900996, partition=0, broker={host=localhost, port=9092}, topic=test}
54598 [Thread-136] INFO  storm.kafka.PartitionManager  - Wrote committed offset to ZK: 303900996
54598 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
55582 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
55582 [Thread-136] INFO  storm.kafka.PartitionManager  - Writing committed offset to ZK: 304116352
55582 [Thread-136] INFO  storm.kafka.ZkState  - Writing /kafkastorm/discovery/localhost:9092:0 the data {topology={id=e8170d4d-1777-4fe3-9fb2-fdec02eed118, name=symbolCounter}, offset=304116352, partition=0, broker={host=localhost, port=9092}, topic=test}
55590 [Thread-136] INFO  storm.kafka.PartitionManager  - Wrote committed offset to ZK: 304116352
55590 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
58860 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
58860 [Thread-136] INFO  storm.kafka.PartitionManager  - Writing committed offset to ZK: 304205006
58861 [Thread-136] INFO  storm.kafka.ZkState  - Writing /kafkastorm/discovery/localhost:9092:0 the data {topology={id=e8170d4d-1777-4fe3-9fb2-fdec02eed118, name=symbolCounter}, offset=304205006, partition=0, broker={host=localhost, port=9092}, topic=test}
58867 [Thread-136] INFO  storm.kafka.PartitionManager  - Wrote committed offset to ZK: 304205006
58867 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
59868 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
59868 [Thread-136] INFO  storm.kafka.PartitionManager  - Writing committed offset to ZK: 304420489
59868 [Thread-136] INFO  storm.kafka.ZkState  - Writing /kafkastorm/discovery/localhost:9092:0 the data {topology={id=e8170d4d-1777-4fe3-9fb2-fdec02eed118, name=symbolCounter}, offset=304420489, partition=0, broker={host=localhost, port=9092}, topic=test}
59876 [Thread-136] INFO  storm.kafka.PartitionManager  - Wrote committed offset to ZK: 304420489
59876 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
62511 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
62511 [Thread-136] INFO  storm.kafka.PartitionManager  - Writing committed offset to ZK: 304521711
62511 [Thread-136] INFO  storm.kafka.ZkState  - Writing /kafkastorm/discovery/localhost:9092:0 the data {topology={id=e8170d4d-1777-4fe3-9fb2-fdec02eed118, name=symbolCounter}, offset=304521711, partition=0, broker={host=localhost, port=9092}, topic=test}
62518 [Thread-136] INFO  storm.kafka.PartitionManager  - Wrote committed offset to ZK: 304521711
62518 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
63555 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
63555 [Thread-136] INFO  storm.kafka.PartitionManager  - Writing committed offset to ZK: 304635721
63555 [Thread-136] INFO  storm.kafka.ZkState  - Writing /kafkastorm/discovery/localhost:9092:0 the data {topology={id=e8170d4d-1777-4fe3-9fb2-fdec02eed118, name=symbolCounter}, offset=304635721, partition=0, broker={host=localhost, port=9092}, topic=test}
63562 [Thread-136] INFO  storm.kafka.PartitionManager  - Wrote committed offset to ZK: 304635721
63562 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
66696 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
66696 [Thread-136] INFO  storm.kafka.PartitionManager  - Writing committed offset to ZK: 304850922
66696 [Thread-136] INFO  storm.kafka.ZkState  - Writing /kafkastorm/discovery/localhost:9092:0 the data {topology={id=e8170d4d-1777-4fe3-9fb2-fdec02eed118, name=symbolCounter}, offset=304850922, partition=0, broker={host=localhost, port=9092}, topic=test}
66704 [Thread-136] INFO  storm.kafka.PartitionManager  - Wrote committed offset to ZK: 304850922
66704 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
67734 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
67734 [Thread-136] INFO  storm.kafka.PartitionManager  - Writing committed offset to ZK: 304888923
67734 [Thread-136] INFO  storm.kafka.ZkState  - Writing /kafkastorm/discovery/localhost:9092:0 the data {topology={id=e8170d4d-1777-4fe3-9fb2-fdec02eed118, name=symbolCounter}, offset=304888923, partition=0, broker={host=localhost, port=9092}, topic=test}
67750 [Thread-136] INFO  storm.kafka.PartitionManager  - Wrote committed offset to ZK: 304888923
67750 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0
68750 [Thread-136] INFO  storm.kafka.PartitionManager  - Committing offset for localhost:9092:0
68750 [Thread-136] INFO  storm.kafka.PartitionManager  - Comitted offset for localhost:9092:0


From what I understand, the storm is able to fetch all 100000 messages at once from kafka. But it makes some zookeeper updates that is slowing it down. I am not able to figure out how to improve on that aspect. 

Thanks,
Piyush


--
You received this message because you are subscribed to a topic in the Google Groups "storm-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/storm-user/TecCTFlzCEU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to storm-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--
Regards,
Piyush Rai

Piyush Rai

unread,
Nov 5, 2013, 7:54:58 AM11/5/13
to storm...@googlegroups.com
I changed the config to 
config.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 64 * 1024);
and processing improved significantly.

Regards,
Piyush

Reply all
Reply to author
Forward
0 new messages