issues pulling from kafka in a flowlet

177 views
Skip to first unread message

Jyotirmoy Sundi

unread,
Dec 6, 2014, 9:55:05 PM12/6/14
to cdap...@googlegroups.com
Hi Folks,
       I am trying to create a flowlet which pulls from kafka. 
Things I did:
2. created a class extending Kafka08ConsumerFlowlet
3. added zookeeper3.4.6 library to cdap lib folder to avoid some CNF issues
Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.Watcher
4. uploaded build jar to cdap web using GUI.
5.  although no errors in cdap logs, dont see any incoming data through the flowlet inspite of the kafka having data in it. 
Also verified the same setup with another storm implementation where I used a high level consumer api using ConsumerConfig , all messages are pulled successfully in this storm topology.

source kafka flowlet:

public class KafkaFetcher extends Kafka08ConsumerFlowlet<byte[], byte[]> { 

private static final Logger LOG = LoggerFactory.getLogger(KafkaFetcher.class);

@Output("outSpitter")

    private OutputEmitter<MessageTuple> emitter;

@Override

protected void configureKafka(KafkaConfigurer kafkaConfig) {

kafkaConfig.addTopicPartition("reactions", 2);

kafkaConfig.setZooKeeper("localhost:2182");

kafkaConfig.setBrokers("localhost:9092");

}

@Override

protected void processMessage(byte[] payload) throws Exception {

LOG.debug("payload: {}", payload);

//sample:

MessageTuple mp = new MessageTuple();

emitter.emit(mp,"profile",1);

}

}


flow definitaion:

FlowSpecification.Builder.with().

                setName("testkafka").

                setDescription("testkafka flow with CDAP").

                withFlowlets()

                .add("kafkaSource", new KafkaFetcher(),2)

                .add("process", new ExampleParse(),2).

                connect()

                .from("kafkaSource").to("process").

                build();


Any idea what might have  done wrong and why the messages are not consumed in the cdap-app ??


2. I also used a high level consumer to consumer the messages but seeing a scala version issue while running, any idea if its seen before, does cdap using scala internally, if so which version ?

Trace"

java.lang.NoSuchMethodError: scala.collection.mutable.HashSet$.apply(Lscala/collection/Seq;)Lscala/collection/Traversable;

at kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:24) ~[na:na]

at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:76) ~[na:na]

at com.lotame.cdap.statpump.kafka.KafkaFetcher.createConsumerConfig(KafkaFetcher.java:93) ~[na:na]

at com.lotame.cdap.statpump.kafka.KafkaFetcher.initialize(KafkaFetcher.java:70) ~[na:na]

at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver$3.apply(FlowletProcessDriver.java:378) ~[co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]

at co.cask.tephra.DefaultTransactionExecutor$3.apply(DefaultTransactionExecutor.java:103) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]

at co.cask.tephra.DefaultTransactionExecutor$3.apply(DefaultTransactionExecutor.java:100) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]

at co.cask.tephra.DefaultTransactionExecutor.executeOnce(DefaultTransactionExecutor.java:136) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]

at co.cask.tephra.DefaultTransactionExecutor.executeWithRetry(DefaultTransactionExecutor.java:115) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]

at co.cask.tephra.DefaultTransactionExecutor.execute(DefaultTransactionExecutor.java:72) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]

at co.cask.tephra.DefaultTransactionExecutor.execute(DefaultTransactionExecutor.java:100) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]

at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.initFlowlet(FlowletProcessDriver.java:374) ~[co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]

at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.run(FlowletProcessDriver.java:153) ~[co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]

at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) ~[co.cask.cdap.cdap-cli-2.5.1.jar:na]

at java.lang.Thread.run(Thread.java:680) [na:1.6.0_45]


Please let me know if I am not clear anywhere or you would need additional info, thanks for your patience.



Terence Yim

unread,
Dec 7, 2014, 12:48:26 AM12/7/14
to Jyotirmoy Sundi, cdap...@googlegroups.com
Hi Jyotirmoy,

1. One thing I notice is you have 
  
  addTopicPartition(“reactions”, 2);

This means only the partition two of the topic “reactions” will be consumed. Have you configure the Kafka topic to have at least 3 partitions (0,1,2)? And also make sure some messages has been published to that one?

2. For the no such method issue, although CDAP uses Scala internally, the Scala classes are not visible to the Flow ClassLoader (behavior in 2.5.x), hence you have to make sure you have the right version of Scala in your application jar (usually done through maven dependence management together with the bundle jar plugin).

Terence

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/70d58541-3eba-4a62-b81d-4a6a8298e380%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jyotirmoy Sundi

unread,
Dec 7, 2014, 1:10:44 AM12/7/14
to cdap...@googlegroups.com, sund...@gmail.com
Thanks Yim for a qucik reply, appreciate it.

1. nice observation, but I tried with 1 and 3 both,
kafkaConfig.addTopicPartition("reactions", 1); 
 and

kafkaConfig.addTopicPartition("reactions", 3);

is the right way below?

kafkaConfig.addTopicPartition("reactions", 0);

kafkaConfig.addTopicPartition("reactions", 1);

kafkaConfig.addTopicPartition("reactions", 2);

In my local setup I have num.partitions=2, tried with 1 and 3 as well in the kafka configs respectively.


2. is it good to place the scala dependency jar in lib folder of the cdap home folder, instead of putting it in the fat jar ?

3. another error I am seeing with dataset initialization during deployment: any ideas how to resolve it ? it dod not occur before.

Trace:

2014-12-06 22:00:04,961 - DEBUG [executor-11:o.a.t.i.ApplicationBundler@158] - finished creating bundle at file:/var/folders/3y/7glq4qvs46n_1wx85lfmqqdh0000gr/T/com.lotame.cdap.statpump.datasets.HLLCounterStore8830676246192708069.jar.bd16c39e-d26b-456e-9756-260d7e5f7d83.jar

2014-12-06 22:00:04,970 - DEBUG [executor-11:o.a.t.i.ApplicationBundler@161] - cleaned up local temporary for bundle file:/var/folders/3y/7glq4qvs46n_1wx85lfmqqdh0000gr/T/com.lotame.cdap.statpump.datasets.HLLCounterStore8830676246192708069.jar.bd16c39e-d26b-456e-9756-260d7e5f7d83.jar2474719771641710182.tmp

2014-12-06 22:00:19,705 - ERROR [New I/O worker #31:c.c.h.RequestRouter@136] - Exception caught in channel processing.

org.jboss.netty.handler.codec.frame.TooLongFrameException: HTTP content length exceeded 157286400 bytes.

at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:169) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at co.cask.http.RequestRouter.messageReceived(RequestRouter.java:79) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.handler.codec.http.HttpContentEncoder.messageReceived(HttpContentEncoder.java:69) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at co.cask.http.NettyHttpService$2.handleUpstream(NettyHttpService.java:230) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) [na:1.6.0_45]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) [na:1.6.0_45]

at java.lang.Thread.run(Thread.java:680) [na:1.6.0_45]

2014-12-06 22:00:21,700 - ERROR [New I/O worker #32:c.c.h.RequestRouter@136] - Exception caught in channel processing.

org.jboss.netty.handler.codec.frame.TooLongFrameException: HTTP content length exceeded 157286400 bytes.

at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:169) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at co.cask.http.RequestRouter.messageReceived(RequestRouter.java:79) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.handler.codec.http.HttpContentEncoder.messageReceived(HttpContentEncoder.java:69) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at co.cask.http.NettyHttpService$2.handleUpstream(NettyHttpService.java:230) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]

at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) [na:1.6.0_45]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) [na:1.6.0_45]

at java.lang.Thread.run(Thread.java:680) [na:1.6.0_45]

2014-12-06 22:00:21,737 - WARN  [executor-11:c.c.c.g.h.AppFabricHttpHandler@1773] - co.cask.cdap.data2.dataset2.DatasetManagementException: Error during talking to Dataset Service at http://localhost:53310/v2/data/modules/com.lotame.cdap.statpump.datasets.HLLCounterStore while doing PUT with headers X-Class-Name=com.lotame.cdap.statpump.datasets.HLLCounterStore and body co.cask.cdap.common.io.Locations$3@26a0aa1e
java.util.concurrent.ExecutionException: co.cask.cdap.data2.dataset2.DatasetManagementException: Error during talking to Dataset Service at http://localhost:53310/v2/data/modules/com.lotame.cdap.statpump.datasets.HLLCounterStore while doing PUT with headers X-Class-Name=com.lotame.cdap.statpump.datasets.HLLCounterStore and body co.cask.cdap.common.io.Locations$3@26a0aa1e
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:294) ~[co.cask.cdap.cdap-cli-2.5.1.jar:na]
at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:281) ~[co.cask.cdap.cdap-cli-2.5.1.jar:na]
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[co.cask.cdap.cdap-cli-2.5.1.jar:na]
at co.cask.cdap.gateway.handlers.AppFabricHttpHandler.deploy(AppFabricHttpHandler.java:1768) [co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at co.cask.cdap.gateway.handlers.AppFabricHttpHandler.access$1100(AppFabricHttpHandler.java:160) [co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at co.cask.cdap.gateway.handlers.AppFabricHttpHandler$5.finished(AppFabricHttpHandler.java:1726) [co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at co.cask.http.HttpMethodInfo.chunk(HttpMethodInfo.java:91) [co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at co.cask.http.HttpDispatcher.messageReceived(HttpDispatcher.java:43) [co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.execution.ChannelUpstreamEventRunnable.doRun(ChannelUpstreamEventRunnable.java:43) [co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:67) [co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:314) [co.cask.cdap.cdap-explore-jdbc-2.5.1.jar:na]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) [na:1.6.0_45]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) [na:1.6.0_45]
at java.lang.Thread.run(Thread.java:680) [na:1.6.0_45]

Terence Yim

unread,
Dec 7, 2014, 1:34:33 AM12/7/14
to Jyotirmoy Sundi, cdap...@googlegroups.com
1. One thing just come to my mind. Currently there is a bug in the kafka-pack library that if you start from consuming the latest message by specifying “-1L” as the initial offset, you will never get message 


2. The scala jar is in the CDAP master lib directory, however, as I mentioned before, it get “hidden” by the Flow ClassLoader to minimize potential library version between CDAP and the user program (e.g. CDAP uses scala-2.10, but program may be using scala-2.8 instead for whatever reason).

3. Seems like it’s caused by the size of the jar is pretty big (150M), which the server side doesn’t use chunk processing properly. I opened a ticket to track that (CDAP-874). In the meantime, is it possible to reduce the size of the jar by removing dependencies that are not needed (by specifying “provided” scope or with dependency exclusion in the pom.xml file).

Terence

Jyotirmoy Sundi

unread,
Dec 7, 2014, 8:22:32 AM12/7/14
to cdap...@googlegroups.com, sund...@gmail.com
can see some activity in the cdap logs as it tries to pull from kafka , see a different error now , any pointers ?
Trace:
2014-12-07 05:09:48,617 - INFO  [FlowletProcessDriver-kafkaSource-0-EventThread:o.a.t.i.z.DefaultZKClientService$ServiceDelegate@411] - Connected to ZooKeeper: localhost:2182
2014-12-07 05:09:48,617 - INFO  [FlowletProcessDriver-kafkaSource-1-EventThread:o.a.t.i.z.DefaultZKClientService$ServiceDelegate@411] - Connected to ZooKeeper: localhost:2182
2014-12-07 05:09:48,625 - INFO  [FlowletProcessDriver-kafkaSource-0:c.c.c.i.a.r.f.FlowletProcessDriver$3@379] - Flowlet initialized: flowlet=kafkaSource, instance=0, groupsize=2, accountId=developer, applicationId=StatpumpApp, program=StatpumpFlow, runid=2feea95f-6998-4405-85f5-d13bb5280e9e
2014-12-07 05:09:48,625 - INFO  [FlowletProcessDriver-kafkaSource-1:c.c.c.i.a.r.f.FlowletProcessDriver$3@379] - Flowlet initialized: flowlet=kafkaSource, instance=1, groupsize=2, accountId=developer, applicationId=StatpumpApp, program=StatpumpFlow, runid=2feea95f-6998-4405-85f5-d13bb5280e9e
2014-12-07 05:09:48,626 - WARN  [FlowletProcessDriver-kafkaSource-0:c.c.t.TransactionContext@201] - Transaction 1417957748103000000 is not in progress.
co.cask.tephra.TransactionNotInProgressException: canCommit() is called for transaction 1417957748103000000 that is not in progress (it is known to be invalid)
at co.cask.tephra.TransactionManager.commit(TransactionManager.java:732) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.inmemory.InMemoryTxSystemClient.commit(InMemoryTxSystemClient.java:71) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.TransactionContext.commit(TransactionContext.java:198) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.TransactionContext.finish(TransactionContext.java:80) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.executeOnce(DefaultTransactionExecutor.java:142) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.executeWithRetry(DefaultTransactionExecutor.java:115) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.execute(DefaultTransactionExecutor.java:72) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.execute(DefaultTransactionExecutor.java:100) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.initFlowlet(FlowletProcessDriver.java:374) [co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.run(FlowletProcessDriver.java:153) [co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) [co.cask.cdap.cdap-cli-2.5.1.jar:na]
at java.lang.Thread.run(Thread.java:680) [na:1.6.0_45]
2014-12-07 05:09:48,626 - WARN  [FlowletProcessDriver-kafkaSource-1:c.c.t.TransactionContext@201] - Transaction 1417957748113000000 is not in progress.
co.cask.tephra.TransactionNotInProgressException: canCommit() is called for transaction 1417957748113000000 that is not in progress (it is known to be invalid)
at co.cask.tephra.TransactionManager.commit(TransactionManager.java:732) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.inmemory.InMemoryTxSystemClient.commit(InMemoryTxSystemClient.java:71) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.TransactionContext.commit(TransactionContext.java:198) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.TransactionContext.finish(TransactionContext.java:80) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.executeOnce(DefaultTransactionExecutor.java:142) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.executeWithRetry(DefaultTransactionExecutor.java:115) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.execute(DefaultTransactionExecutor.java:72) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.tephra.DefaultTransactionExecutor.execute(DefaultTransactionExecutor.java:100) [co.cask.tephra.tephra-core-0.3.0.jar:na]
at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.initFlowlet(FlowletProcessDriver.java:374) [co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.run(FlowletProcessDriver.java:153) [co.cask.cdap.cdap-app-fabric-2.5.1.jar:na]
at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) [co.cask.cdap.cdap-cli-2.5.1.jar:na]
at java.lang.Thread.run(Thread.java:680) [na:1.6.0_45]
2014-12-07 05:09:48,626 - INFO  [FlowletProcessDriver-kafkaSource-0:c.c.t.TransactionManager@845] - Tx invalid list: removed aborted tx 1417957748103000000
2014-12-07 05:09:48,627 - INFO  [FlowletProcessDriver-kafkaSource-1:c.c.t.TransactionManager@845] - Tx invalid list: removed aborted tx 1417957748113000000
2014-12-07 05:09:48,627 - ERROR [FlowletProcessDriver-kafkaSource-0:c.c.c.i.a.r.f.FlowletProcessDriver@384] - Flowlet throws exception during flowlet initialize: flowlet=kafkaSource, instance=0, groupsize=2, accountId=developer, applicationId=StatpumpApp, program=StatpumpFlow, runid=2feea95f-6998-4405-85f5-d13bb5280e9e
co.cask.tephra.TransactionNotInProgressException: canCommit() is called for transaction 1417957748103000000 that is not in progress (it is known to be invalid)
at co.cask.tephra.TransactionManager.commit(TransactionManager.java:732) ~[co.cask.tephra.tephra-core-0.3.0.jar:na]

Terence Yim

unread,
Dec 7, 2014, 1:16:46 PM12/7/14
to Jyotirmoy Sundi, cdap...@googlegroups.com
Looks like it is caused by transaction timeout, meaning it takes longer than 30 seconds in polling and processing messages from kafka. What's the fetch size that you set? You can try to reduce it so that less messages is fetched per transaction and seem if you still see the same error. We are also working on improving this to fetch message outside of transaction and just keeping the processing inside transaction

Terence 

Sent from my iPhone

Jyotirmoy Sundi

unread,
Dec 8, 2014, 10:58:48 AM12/8/14
to Terence Yim, cdap...@googlegroups.com
1. Which setting is it exactly? is it mentioned in http://docs.cdap.io/cdap/2.5.0/en/admin.html
2. is there any size limit while pulling messages in a flowlet from an external source ? I am using a 

ConsumerIterator<byte[],byte[]> iterator = stream.iterator(); and able to iterate through the messages in a testconsumer app, but the same code hangs in iterator.hasNext() while running in flowlet , cant see any other reason other then a restriction of size limit within cdap while connecting to external sources , any thoughts ?


--
Best Regards,
Jyotirmoy Sundi

Jyotirmoy Sundi

unread,
Dec 8, 2014, 11:11:17 AM12/8/14
to Terence Yim, cdap...@googlegroups.com
Just for info,
I am not using the CDAP Kafka (embedded version of Kafka as mentioned in cdap docs http://docs.cdap.io/cdap/2.5.0/en/admin.html) but using an kafka0.8 downloaded from kafka site, is there any difference between the two ? Tests are run locally with kafka and cdap.

Terence Yim

unread,
Dec 8, 2014, 1:21:16 PM12/8/14
to Jyotirmoy Sundi, cdap...@googlegroups.com

If you don’t set it, default is 1Mb.

Terence

Terence Yim

unread,
Dec 8, 2014, 1:22:03 PM12/8/14
to Jyotirmoy Sundi, cdap...@googlegroups.com
That should be ok. The one comes with CDAP is the same as the one downloaded from Kafka project site.

Terence

Jyotirmoy Sundi

unread,
Dec 8, 2014, 1:56:06 PM12/8/14
to Terence Yim, cdap...@googlegroups.com
1. Increased the size but didnot help using ,
Dont see any activity in the kafka server/request logs 
2014-12-08 10:51:49,512 - INFO  [FlowletProcessDriver-kafkaSource-0:c.c.c.i.a.r.f.FlowletProcessDriver$3@377] - Initializing flowlet: flowlet=kafkaSource, instance=0, groupsize=1, accountId=developer, applicationId=StatpumpApp, program=StatpumpFlow, runid=24758fad-922c-4caf-b4dc-916f09c28b34
2014-12-08 10:51:49,708 - INFO  [FlowletProcessDriver-kafkaSource-0-EventThread:o.a.t.i.z.DefaultZKClientService$ServiceDelegate@411] - Connected to ZooKeeper: localhost:2182
2014-12-08 10:51:49,712 - INFO  [FlowletProcessDriver-kafkaSource-0:c.c.c.i.a.r.f.FlowletProcessDriver$3@379] - Flowlet initialized: flowlet=kafkaSource, instance=0, groupsize=1, accountId=developer, applicationId=StatpumpApp, program=StatpumpFlow, runid=24758fad-922c-4caf-b4dc-916f09c28b34

2. on the other side when I use a high level consumer, the request goes to the kafka server, but iterator hangs on the consumer,
a. kafka request in logs from a working standalone client/storm client
[2014-12-08 09:01:40,145] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 22; ClientId: reactions-ConsumerFetcherThread-reactions_spare.local-1418058098690-22c8ee57-0-0; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [reactions,0] -> PartitionFetchInfo(16882,1048576) from client /127.0.0.1:53729;totalTime:105,queueTime:1,localTime:3,remoteTime:101,sendTime:0 (kafka.request.logger)

b. kafka request in logs from a non-working client within a cdap flowlet , same code of client as above:
[2014-12-08 08:58:30,730] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 383; ClientId: reactions-ConsumerFetcherThread-reactions_spare.local-1418057567444-14d3b750-0-0; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [reactions,0] -> PartitionFetchInfo(7340,1048576) from client /127.0.0.1:52298;totalTime:1,queueTime:0,localTime:0,remoteTime:0,sendTime:1 (kafka.request.logger)
any ideas?

I had to remove the scala-library in lib folder of cdap home to resolve the scala dependency issues.

Thanks
Sundi

Jyotirmoy Sundi

unread,
Dec 8, 2014, 9:42:02 PM12/8/14
to Terence Yim, cdap...@googlegroups.com
Got the kafka working using from a class which extends Kafka08ConsumerFlowlet
See this in logs:
2014-12-08 18:32:45,537 - INFO  [AsyncAppender-Worker-Thread-3:c.c.c.l.w.AvroFileWriter@184] - Rotating file file:/Users/spare/Downloads/passport/images/cdap-sdk-2.5.1/data/logs/avro/developer/StatpumpApp/flow-StatpumpFlow/2014-12-08/1418092363878.avro
2014-12-08 18:32:45,550 - INFO  [AsyncAppender-Worker-Thread-3:c.c.c.l.w.AvroFileWriter@159] - Creating Avro file file:/Users/spare/Downloads/passport/images/cdap-sdk-2.5.1/data/logs/avro/developer/StatpumpApp/flow-StatpumpFlow/2014-12-08/1418092365549.avro

We have around 24 partitions in total across 3 node kafka cluster , So it seems like we have to get the partitions of the topic using zk api and set it programmatically using addTopicPartition, is that the right approach here, any other better alternatives ?


Terence Yim

unread,
Dec 8, 2014, 10:47:32 PM12/8/14
to Jyotirmoy Sundi, cdap...@googlegroups.com
Hi Jyotirmoy,

We are adding support to subscript to all partitions by automatically detecting partition changes. In the meanwhile, you can simply over-subscript partitions (e.g. use a loop to call addTopicPartitions from 0-100) if you expect number of partitions will grown dynamically. Over-subscribing won’t hurt performance as partitions that are not exists in kafka will get ignored by he kafka flowlet until they are available (the kafka flowlet watch for changes through ZK).

Terence

Jyotirmoy Sundi

unread,
Dec 9, 2014, 11:55:56 AM12/9/14
to Terence Yim, cdap...@googlegroups.com
cool Yim, is there any JIRA to track that dev ?

Thanks
Sundi

Terence Yim

unread,
Dec 9, 2014, 1:37:37 PM12/9/14
to Jyotirmoy Sundi, cdap...@googlegroups.com
Yes. This is the JIRA https://issues.cask.co/browse/CDAP-622

Terence
Reply all
Reply to author
Forward
0 new messages