public class KafkaFetcher extends Kafka08ConsumerFlowlet<byte[], byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaFetcher.class);
@Output("outSpitter")
private OutputEmitter<MessageTuple> emitter;
@Override
protected void configureKafka(KafkaConfigurer kafkaConfig) {
kafkaConfig.addTopicPartition("reactions", 2);
kafkaConfig.setZooKeeper("localhost:2182");
kafkaConfig.setBrokers("localhost:9092");
}
@Override
protected void processMessage(byte[] payload) throws Exception {
LOG.debug("payload: {}", payload);
//sample:
MessageTuple mp = new MessageTuple();
emitter.emit(mp,"profile",1);
}
}
flow definitaion:
FlowSpecification.Builder.with().
setName("testkafka").
setDescription("testkafka flow with CDAP").
withFlowlets()
.add("kafkaSource", new KafkaFetcher(),2)
.add("process", new ExampleParse(),2).
connect()
.from("kafkaSource").to("process").
build();
Any idea what might have done wrong and why the messages are not consumed in the cdap-app ??
2. I also used a high level consumer to consumer the messages but seeing a scala version issue while running, any idea if its seen before, does cdap using scala internally, if so which version ?
Trace"
java.lang.NoSuchMethodError: scala.collection.mutable.HashSet$.apply(Lscala/collection/Seq;)Lscala/collection/Traversable;
at kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:24) ~[na:na]
at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:76) ~[na:na]
at com.lotame.cdap.statpump.kafka.KafkaFetcher.createConsumerConfig(KafkaFetcher.java:93) ~[na:na]
at com.lotame.cdap.statpump.kafka.KafkaFetcher.initialize(KafkaFetcher.java:70) ~[na:na]
at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver$3.apply(FlowletProcessDriver.java:378) ~[co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at co.cask.tephra.DefaultTransactionExecutor$3.apply(DefaultTransactionExecutor.java:103) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor$3.apply(DefaultTransactionExecutor.java:100) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.executeOnce(DefaultTransactionExecutor.java:136) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.executeWithRetry(DefaultTransactionExecutor.java:115) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.execute(DefaultTransactionExecutor.java:72) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.execute(DefaultTransactionExecutor.java:100) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.initFlowlet(FlowletProcessDriver.java:374) ~[co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.run(FlowletProcessDriver.java:153) ~[co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) ~[co.cask.cdap.cdap-cli-2.5.1.jar:na]
at java.lang.Thread.run(Thread.java:680) [na:1.6.0_45]
Please let me know if I am not clear anywhere or you would need additional info, thanks for your patience.
--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/70d58541-3eba-4a62-b81d-4a6a8298e380%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
kafkaConfig.addTopicPartition("reactions", 3);
is the right way below?
kafkaConfig.addTopicPartition("reactions", 0);
kafkaConfig.addTopicPartition("reactions", 1);
kafkaConfig.addTopicPartition("reactions", 2);
In my local setup I have num.partitions=2, tried with 1 and 3 as well in the kafka configs respectively.
2. is it good to place the scala dependency jar in lib folder of the cdap home folder, instead of putting it in the fat jar ?
3. another error I am seeing with dataset initialization during deployment: any ideas how to resolve it ? it dod not occur before.
Trace:
2014-12-06 22:00:04,961 - DEBUG [executor-11:o.a.t.i.ApplicationBundler@158] - finished creating bundle at file:/var/folders/3y/7glq4qvs46n_1wx85lfmqqdh0000gr/T/com.lotame.cdap.statpump.datasets.HLLCounterStore8830676246192708069.jar.bd16c39e-d26b-456e-9756-260d7e5f7d83.jar
2014-12-06 22:00:04,970 - DEBUG [executor-11:o.a.t.i.ApplicationBundler@161] - cleaned up local temporary for bundle file:/var/folders/3y/7glq4qvs46n_1wx85lfmqqdh0000gr/T/com.lotame.cdap.statpump.datasets.HLLCounterStore8830676246192708069.jar.bd16c39e-d26b-456e-9756-260d7e5f7d83.jar2474719771641710182.tmp
2014-12-06 22:00:19,705 - ERROR [New I/O worker #31:c.c.h.RequestRouter@136] - Exception caught in channel processing.
org.jboss.netty.handler.codec.frame.TooLongFrameException: HTTP content length exceeded 157286400 bytes.
at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:169) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at co.cask.http.RequestRouter.messageReceived(RequestRouter.java:79) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.codec.http.HttpContentEncoder.messageReceived(HttpContentEncoder.java:69) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at co.cask.http.NettyHttpService$2.handleUpstream(NettyHttpService.java:230) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) [na:1.6.0_45]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) [na:1.6.0_45]
at java.lang.Thread.run(Thread.java:680) [na:1.6.0_45]
2014-12-06 22:00:21,700 - ERROR [New I/O worker #32:c.c.h.RequestRouter@136] - Exception caught in channel processing.
org.jboss.netty.handler.codec.frame.TooLongFrameException: HTTP content length exceeded 157286400 bytes.
at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:169) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at co.cask.http.RequestRouter.messageReceived(RequestRouter.java:79) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.codec.http.HttpContentEncoder.messageReceived(HttpContentEncoder.java:69) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at co.cask.http.NettyHttpService$2.handleUpstream(NettyHttpService.java:230) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) [na:1.6.0_45]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) [na:1.6.0_45]
at java.lang.Thread.run(Thread.java:680) [na:1.6.0_45]
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/fb5ef4e5-a84e-43aa-a4aa-67b2d4c21165%40googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/5f1b4133-9804-4447-85f5-2e2b58e22cd6%40googlegroups.com.
ConsumerIterator<byte[],byte[]> iterator = stream.iterator(); and able to iterate through the messages in a testconsumer app, but the same code hangs in iterator.hasNext() while running in flowlet , cant see any other reason other then a restriction of size limit within cdap while connecting to external sources , any thoughts ?