How can i get the offset value in KStream

1,100 views
Skip to first unread message

jayant...@gmail.com

unread,
Feb 21, 2017, 1:14:36 AM2/21/17
to Confluent Platform
Hi,

I am getting my kafka stream using below code. 

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> rawStream = builder.stream(Serdes.String(), Serdes.String(), "topic_name");

After that i am processing the stream message: 

rawStream.process(() -> new AbstractProcessor<String, String>() {
    @Override
    public void process(String key, String jMsg) { 
    .....
    }
  });

During process how can i get the current offset position

This is to verify how much i am lagging during processing the message. 

Regards,
Jayanth

Matthias J. Sax

unread,
Feb 21, 2017, 2:56:43 AM2/21/17
to confluent...@googlegroups.com
Process#init() provides context object. You can get the current record's
offset from it.


-Matthias

On 2/20/17 10:14 PM, jayant...@gmail.com wrote:
> Hi,
>
> I am getting my kafka stream using below code.
>
> /KStreamBuilder builder = new KStreamBuilder();/
> /KStream<String, String> rawStream = builder.stream(Serdes.String(),
> Serdes.String(), "topic_name");/
>
> After that i am processing the stream message:
>
> /rawStream.process(() -> new AbstractProcessor<String, String>() {/
> / @Override/
> / public void process(String key, String jMsg) { /
> / ...../
> / }/
> / })/;
>
> During process *how can i get the current offset position*.
>
> This is to verify how much i am lagging during processing the message.
>
> Regards,
> Jayanth
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/0516e387-f618-4619-9d9e-85c09a071246%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/0516e387-f618-4619-9d9e-85c09a071246%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Michael Noll

unread,
Feb 21, 2017, 6:08:13 AM2/21/17
to confluent...@googlegroups.com

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/27a82ec5-96a2-967d-9e71-8430b702b0b2%40confluent.io.

For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

jayant...@gmail.com

unread,
Feb 22, 2017, 2:28:19 AM2/22/17
to Confluent Platform
Hi All,

I have included Process#init() and trying to print the offset() data as below: 

rawStream.process(() -> new AbstractProcessor<String, String>() {
    @Override

    public void process(String key, String jMsg) { 
    .....
    }

   @Override
   @SuppressWarnings("unchecked")
   public void init(ProcessorContext context) {
     System.out.println(context.offset());
   }
  });

After this i am getting exception like: 

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: java.lang.IllegalStateException: This should not happen as offset() should only be called while a record is processed
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:169)
        at com.comcast.stream.processor.KafkaStreamProcessor$1.init(KafkaStreamProcessor.java:102)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:64)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:121)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)

Any help on this?

Regards,
Jayanth
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Michael Noll

unread,
Feb 22, 2017, 2:59:05 AM2/22/17
to confluent...@googlegroups.com
See the error message:

Caused by: java.lang.IllegalStateException: This should not happen as offset() should only be called while a record is processed

You should call `context.offset()` only from within the `process()` method, not from `init()`.

-Michael




--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages