KafkaSpout, Storm and Kafka versions

6,306 views
Skip to first unread message

Nitin Jain

unread,
Jul 24, 2013, 3:10:56 PM7/24/13
to storm...@googlegroups.com
I have been trying to implement a Kafka+Storm system using KafkaSpout. I seem to have run into compatibility issues among the different versions. I wanted to know which would be the best combination to run all three together.

My current setup:
Storm 0.8.2
Kafka 0.8-SNAPSHOT
storm-kafka 0.9.0-wip16a-scala292

Tried using Kafka 0.7.2 with storm-kafka 0.8.0-wip4, scala-2.8.0 but still have issues.

Also, a working KafkaSpout topology example,link or resource would be great! Thanks.


Gmail

unread,
Jul 26, 2013, 8:23:19 AM7/26/13
to storm...@googlegroups.com
Nitin,

the setup we got to work was -

- storm-kafka 0.9.0-wip16a-scala292
- storm 0.9.0-wip16
- kafka:0.7.2, built against Scala 2.9.2 (https://github.com/cayova/kafka/tree/0.7.2_2.9.2)

Bill
> --
> You received this message because you are subscribed to the Google Groups "storm-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com (mailto:storm-user+...@googlegroups.com).
> For more options, visit https://groups.google.com/groups/opt_out.
>
>



Nitin Jain

unread,
Jul 26, 2013, 8:41:35 AM7/26/13
to storm...@googlegroups.com
Thanks for the info Bill. I will experiment with these versions.
> To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com (mailto:storm-user+unsub...@googlegroups.com).

Joey Chang

unread,
Jul 31, 2013, 10:57:54 PM7/31/13
to storm...@googlegroups.com

storm 0.9.0-wip16 is a development version. I think It is not very suitable for production env.

Does anyone use storm 0.8.x which it can work with kafka fine?

if so, can you show every component's version? (storm, kafka and storm-kafka)?

在 2013年7月26日星期五UTC+8下午8时23分19秒,Bill de hÓra写道:
> To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com (mailto:storm-user+unsub...@googlegroups.com).

Johan Lundahl

unread,
Aug 1, 2013, 1:52:08 AM8/1/13
to storm...@googlegroups.com
I´m currently setting it up and building topologies using

storm 0.8.2
storm-kafka 0.8.0-wip4
kafka_2.9.1 0.7.0

and so far this combination seems to be working


To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com.

Joey Chang

unread,
Aug 1, 2013, 4:13:03 AM8/1/13
to storm...@googlegroups.com
hi Johan.

    It will be very appreciative if you can provide your test topology.

在 2013年8月1日星期四UTC+8下午1时52分08秒,Johan Lundahl写道:
> To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com (mailto:storm-user+unsubscribe@googlegroups.com).
> For more options, visit https://groups.google.com/groups/opt_out.
>
>



Joey Chang

unread,
Aug 1, 2013, 4:33:15 AM8/1/13
to storm...@googlegroups.com
this is my test topology.

public class TestTopology {
    public static class PrinterBolt extends BaseBasicBolt {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            System.out.println(tuple.toString());
        }
        
    }
    
    public static void main(String [] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();  
        
        List<String> hosts = new ArrayList<String>();  
        hosts.add("localhost");  
        SpoutConfig spoutConf = new SpoutConfig(StaticHosts.fromHostString(hosts, 3), "test", "/root", "id");  
        spoutConf.scheme = new StringScheme();  
        spoutConf.forceStartOffsetTime(-2);  
          
        spoutConf.zkServers = new ArrayList<String>() {{  
                      add("localhost");   
                    }};  
        spoutConf.zkPort = 2181;  
          
        //set the spout for the topology  
        builder.setSpout("spout",  new KafkaSpout(spoutConf), 1);
        builder.setBolt("print", new PrinterBolt(), 2).allGrouping("spout");
        
if (args != null && args.length > 0){
Config conf = new Config();
//conf.setDebug(true);
conf.setNumWorkers(4);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(args[0], conf, builder.createTopology());
}
else{
Config conf = new Config();
conf.setNumWorkers(4);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("test-kafka", conf, builder.createTopology());
}
       
    }
}

storm 0.8.2  storm-kafka 0.8.0 wip4  kafka_2.9.1-0.7.0.jar
when i run this test topology, give me the same error: 

Caused by: java.lang.ClassNotFoundException: storm.kafka.KafkaConfig$BrokerHosts
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Could not find the main class: main.stormkafka.TestTopology.  Program will exit.

In my local system I am running kafka 0.7.2 server, but i don't think this could be affected the running of the test topology.

can you give me any advice? Thanks

在 2013年8月1日星期四UTC+8下午1时52分08秒,Johan Lundahl写道:
> To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com (mailto:storm-user+unsubscribe@googlegroups.com).
> For more options, visit https://groups.google.com/groups/opt_out.
>
>



Johan Lundahl

unread,
Aug 1, 2013, 4:45:24 AM8/1/13
to storm-user
As my starting point, I pretty much used the example from https://github.com/TheHydroImpulse/storm-kafka-starter/blob/master/src/jvm/storm/starter/KafkaTopology.java with some very minor modifications:


List<String> hosts = new ArrayList<String>();
hosts.add("127.0.0.1:9092");
SpoutConfig kafkaConf = new SpoutConfig(StaticHosts.fromHostString(hosts, 1), "testtopic", "/kafkastorm", "testid");
kafkaConf.scheme = new StringScheme();
kafkaConf.zkPort = 2181;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf);
kafkaConf.forceStartOffsetTime(-2); //To start from beginning of topic

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout, 1);
builder.setBolt("bolt", new PrinterBolt()).allGrouping("spout");

Config config = new Config();
config.setDebug(true);

config.setMaxTaskParallelism(1);
StormTopology topo = builder.createTopology();

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, topo);
Thread.sleep(5000);
cluster.shutdown();

From this I get the spout to read from Kafka and be executed up by the bolt.

In your case it looks more like a classpath issue. I would check again to make sure that the correct storm-kafka-0.8.0-wip4 jar is in there.

Joey Chang

unread,
Aug 1, 2013, 7:25:26 AM8/1/13
to storm...@googlegroups.com
thank you Johan.
I have known where the mistake is. I didn't put  storm-kafka 0.8.0-wip4.jar  in my storm lib.  I just put the kafka and scala in there.
But we used kafka 0.7.2 in production env. when i use kafka_2.9.1 0.7.0, kafka producer show me wrong partition 1.  I certainly set the partition to 1 in storm spout config. I don't know whether it's caused by incompatible version. 
Anyway, I'm very appreciated for your help.


在 2013年8月1日星期四UTC+8下午4时45分24秒,Johan Lundahl写道:

Johan Lundahl

unread,
Aug 1, 2013, 7:31:12 AM8/1/13
to storm...@googlegroups.com
I also use Kafka 0.7.2 on the broker tier while using kafka_2.9.1 0.7.0 on the Storm client tier and it seems to work fine.

stavr...@gmail.com

unread,
Aug 9, 2013, 3:42:28 AM8/9/13
to storm...@googlegroups.com
I am also facing the same issues using the above mentioned libs:
storm 0.8.2  storm-kafka 0.8.0 wip4  kafka_2.9.1-0.7.0.jar
I run storm 0.8.2, kafka 0.7.2 (using the default 2.8.0 scala during build and I am wondering if this is the issue).
When I run the test topology you gave I have the same error:


Caused by: java.lang.ClassNotFoundException: storm.kafka.KafkaConfig$BrokerHosts
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)

Thanks in advance for any help.

Joey Chang

unread,
Aug 9, 2013, 6:28:21 AM8/9/13
to storm...@googlegroups.com

put these jars (storm-kafka 0.8.0 wip4, kafka 2.9.1-0.7.0.jar and scala-libaray jar) in your storm lib directory.

在 2013年8月9日星期五UTC+8下午3时42分28秒,stavr...@gmail.com写道:

stavr...@gmail.com

unread,
Aug 10, 2013, 6:39:51 AM8/10/13
to storm...@googlegroups.com
Thank you so much! All ok.

stavr...@gmail.com

unread,
Aug 11, 2013, 9:04:50 AM8/11/13
to storm...@googlegroups.com
I also used this implementation but I cannot make it work.
When I visit the ui webpage I see the topology with the spout and the bolt inside. When I run a producer (manually from terminal) for topic "test" I do not see any emitted tuples on the spout and I also cannot find under zookeeper root folder the kafkastorm folder. Should I create that folder manually first? I use zookeeper 3.3.3 (/usr/lib/zookeeper is the root). Should I give the full path to zookeeper root folder in the code?

PS If I run a consumer manually I can get the stream without issue.
Thanks in advance.

Joey Chang

unread,
Aug 11, 2013, 9:20:01 AM8/11/13
to storm...@googlegroups.com
plz share your kafka spout config.

在 2013年8月11日星期日UTC+8下午9时04分50秒,stavr...@gmail.com写道:

stavr...@gmail.com

unread,
Aug 11, 2013, 9:33:05 AM8/11/13
to storm...@googlegroups.com
public class KafkaTopology {
    
    public static void main(String[] args) throws Exception {
   
    List<String> hosts = new ArrayList<String>();
        hosts.add("zeus:9092");
        SpoutConfig kafkaConf = new SpoutConfig(StaticHosts.fromHostString(hosts, 1), "test", "/kafkastorm", "discovery");
        kafkaConf.scheme = new StringScheme();
        kafkaConf.zkPort = 2181;
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf);
        kafkaConf.forceStartOffsetTime(-2);
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", kafkaSpout, 1);
        builder.setBolt("printer", new PrinterBolt())
                .allGrouping("spout");
        
        Config config = new Config();
        config.setDebug(true);
        
        if(args!=null && args.length > 0) {
            config.setNumWorkers(3);
            
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {        
            config.setMaxTaskParallelism(1);
            StormTopology topo = builder.createTopology();
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafka", config, topo);
        
            Thread.sleep(10000);

            cluster.shutdown();
        }
       
    }
    
}

It is the exact same example. So at first I start the server and a producer for topic test. If I run a consumer for the same topic I can see the messages sent.Then I run the KafkaTopology. If I open storm web UI I see the spout and the bolt but I cannot see any tuple emitted. In addition I dont understand the /kafkaspout folder I set in the code. Where is this folder? It says under zookeeper root (in my case /usr/lib/zookeeper) but I cannot find this folder anywhere. 

Thanks for your help.

Joey Chang

unread,
Aug 11, 2013, 10:53:56 PM8/11/13
to storm...@googlegroups.com
"/kafkastorm" change to "/tmp"
give it a try.

在 2013年8月11日星期日UTC+8下午9时33分05秒,stavr...@gmail.com写道:

姚仁捷

unread,
Aug 12, 2013, 3:52:43 AM8/12/13
to storm...@googlegroups.com
this won't make any help i think. this directory only refers to the zookeeper directory, not the linux filesystem

在 2013年8月12日星期一UTC+8上午10时53分56秒,Joey Chang写道:

姚仁捷

unread,
Aug 12, 2013, 4:18:15 AM8/12/13
to storm...@googlegroups.com
We use kafka-0.7.2 and storm-0.9.0, zookeeper-3.3.4 and it worked well.

In my experience, storm-0.9.0 is more stable than storm-0.8.2. In 0.8.2, this issue: 'stormconf.ser does not exists' usually happened, and nathan suggested upgrade to 0.9.2. Furthermore, I almost never meet this problem after upgrade storm from 0.8.2 to 0.9.0.

About Kafka, I thikn 0.8.0 is not best choice now, for that this version used some complex feature.

About zookeeper: at first there are two zookeeper clusters in my environment - one for storm(zookeeper-3.4.5) and one for kafka(zookeeper-3.3.4). Someday I wanna use ONE zookeeper cluster for both storm and kafka. While I'm using zookeeper-3.4.5 in kafka, something weird happened. After some researching, I rollback to 3.3.4 which used by storm and kafka, then everything were okay.

below is my topology for kafka:

        List<String> kafkaList=  new ArrayList<String>();
        kafkaList.add("kafka-server-hostname");   //--------------------------- Caution: this you need specify your hostname
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, "/stormkafka", topologyId);
String[] zkServerString = MarsPvConfig.get("zkServers").split(" ");
    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConf.zkServers = ImmutableList.copyOf(zkServerString);
        spoutConf.zkPort = Integer.parseInt(MarsPvConfig.get("zkPort"));
        spoutConf.forceStartOffsetTime(-1);

在 2013年7月25日星期四UTC+8上午3时10分56秒,Nitin Jain写道:

stavr...@gmail.com

unread,
Aug 12, 2013, 8:01:41 AM8/12/13
to storm...@googlegroups.com
So what kind of directory /kafkastorm is?
That is what I don't understand. The folder specified in the code should exist (created manually) on zookeeper root folder? Does the spout create this folder and the files inside this? What is the expected behavior after I run the code as is(/kafkastorm)?
Generally speaking I don't understand how this folder is created and used. Moreover how we debug a code like this? Where we can find traces like System.out.println of a bolt?

Thanks

姚仁捷

unread,
Aug 12, 2013, 10:25:28 AM8/12/13
to storm...@googlegroups.com
At first, you may need to read a brief intro to zookeeper. Here I try my best to describe it in a simple way: zookeeper is like a tree-like (or filesystem-like) structure and every node has its own attributes(node's create time, modify time, value). For example, /stormkafka/TOPOLOGYID is in zookeeper, 'stormkafka' and 'TOPOLOGYID' is two nodes. Further, 'TOPOLOGYID' is the child of node 'stormkafka'. This is just like the directory and file in linux filesystem.

Storm will store cluster information, supervisor heartbeat and other cluster attributes. In addition, Kafka stores its topic-partition mapping, offset for each topic which consumed by a client and so on. 

You can get into it very easily with zkCli.sh which is in zookeeper's directory. It is in the location like that '/apps/zookeeper-3.3.4/bin/zkCli.sh' if zookeeper is installed in '/apps/zookeeper-3.3.4'. After you walk into zkCli.sh, you can use some pieces of commands for viewing the tree-like structure. For example, 'ls /' will list the node in root. 'get' will get info for your specified node, 'set' can set a value(which is dangerous if you do not know the meaning of a certain node).

For our storm, there is problem in log4j for outputing debug info. Our workaround is 'print' the log into redis which we used in our project. :) 

在 2013年8月12日星期一UTC+8下午8时01分41秒,stavr...@gmail.com写道:

Andrés Tello

unread,
Nov 29, 2013, 10:42:58 AM11/29/13
to storm...@googlegroups.com
Hi guys, 

Could you please post you pom.xml files to see which dependencies are you using? I am running in Local Mode. I've tried the Bill's and Johan's the versions with not succees

1. Attempt
storm-kafka 0.9.0-wip16a-scala292 
storm 0.9.0-wip16 

kafka:0.7.2, built against Scala 2.9.2 

2. Attempt
storm 0.8.2
storm-kafka 0.8.0-wip4
kafka_2.9.1 0.7.0

Every time I tried a different combination I got a different error, but I can't make my spout to read from kafka.

This is the exception I'm getting

3158 [Thread-16] ERROR backtype.storm.util  - Async loop died!
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:486)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:57)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:184)
at kafka.consumer.SimpleConsumer.liftedTree3$1(SimpleConsumer.scala:168)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:158)
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:66)
at storm.kafka.PartitionManager.<init>(PartitionManager.java:64)
at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:24)
at storm.kafka.KafkaSpout.open(KafkaSpout.java:72)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:744)
3159 [Thread-16] ERROR backtype.storm.daemon.executor  - 
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:486)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:57)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:184)
at kafka.consumer.SimpleConsumer.liftedTree3$1(SimpleConsumer.scala:168)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:158)
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:66)
at storm.kafka.PartitionManager.<init>(PartitionManager.java:64)
at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:24)
at storm.kafka.KafkaSpout.open(KafkaSpout.java:72)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:744)
3365 [Thread-16] INFO  backtype.storm.util  - Halting process: ("Worker died")

This is my topology code:

            List<String> hosts = new ArrayList<String>();
            hosts.add("localhost");
            SpoutConfig spoutConf = new SpoutConfig(StaticHosts.fromHostString(hosts, 1), "test", "/test2", "id_test2");
            spoutConf.scheme = new StringScheme();
            spoutConf.forceStartOffsetTime(-2);

            spoutConf.zkServers = new ArrayList<String>() {{
                add("localhost");
            }};
            spoutConf.zkPort = 2181;


            //Topology definition
            TopologyBuilder builder = new TopologyBuilder();

            builder.setSpout("kafka-spout", new KafkaSpout(spoutConf));
            builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("kafka-spout");

            Config conf = new Config();
            conf.setDebug(true);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafka-test", conf, builder.createTopology());
            Thread.sleep(5000);
            cluster.shutdown();



Please, any help would be really appreciated. Pom file example would be great.

Andres

Edison Xu

unread,
Dec 12, 2013, 2:43:18 AM12/12/13
to storm...@googlegroups.com
Hi,

Did you solve this problem?

I'm facing the same thing with you.

Thanks,

在 2013年11月29日星期五UTC+8下午11时42分58秒,Andrés Tello写道:
Reply all
Reply to author
Forward
0 new messages