Kafka Streams pause processing

1379 views
Skip to first unread message

Emiliano Capoccia

unread,
Aug 4, 2016, 11:58:51 AM8/4/16
to Confluent Platform
Hello,

With my team we've been investigating Kafka Streams to use for a new application development. 

The scenario is that we are enriching a message in a processor using external sources. So our topology looks like source (input topic) -> processor (content enricher) -> sink (output topic).

Under certain circumstances, however, we need to pause processing (and sending messages down the output topic).

Is it doable? It feels like once the stream is started it continue to process messages forever, without the possibility to being paused.

Thanks, 

Emiliano

Matthias J. Sax

unread,
Aug 4, 2016, 12:37:58 PM8/4/16
to confluent...@googlegroups.com
Hi Emiliano,

can you elaborate why you want to pause? Actually, Kafka Streams handles
"back pressure" out-of-the box and there should not be any reason to
pause an application.

Can you give some more details?


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Emiliano Capoccia

unread,
Aug 4, 2016, 2:09:20 PM8/4/16
to Confluent Platform
Hello Matthias, thanks for your answer.

The reason why I want to "pause" is that the content enricher depends on an external system which might not be available for a long time (minutes / hours).

As such, we are not in a position to process the incoming message BUT we still need to guarantee processing and delivery of the enriched message when the external system will be back online.

So, the situation is: internally to the KStream the consumer has picked up the input message, has not acknowledged as yet as consumed (AFAIK this happens when the output message is successfully delivered to the output topic), but needs to wait an indefinitely long time before I can process the message and send it to the output topic.
Hence the idea of "pausing" the stream.

However, I should rephrase it, rather than pausing I should be able to continue to try process the same input message without advancing the stream (so without acknowledging the input message).
What I'm not a position to do is to complete the computation and I shouldn't advance the input stream.

I look forward to hearing your thoughts on the subject.

Emiliano

On Thursday, August 4, 2016 at 5:37:58 PM UTC+1, Matthias J. Sax wrote:
Hi Emiliano,

can you elaborate why you want to pause? Actually, Kafka Streams handles
"back pressure" out-of-the box and there should not be any reason to
pause an application.

Can you give some more details?


-Matthias

On 08/04/2016 05:58 PM, Emiliano Capoccia wrote:
> Hello,
>
> With my team we've been investigating Kafka Streams to use for a new
> application development.
>
> The scenario is that we are enriching a message in a processor using
> external sources. So our topology looks like source (input topic) ->
> processor (content enricher) -> sink (output topic).
>
> Under certain circumstances, however, we need to pause processing (and
> sending messages down the output topic).
>
> Is it doable? It feels like once the stream is started it continue to
> process messages forever, without the possibility to being paused.
>
> Thanks,
>
> Emiliano
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Matthias J. Sax

unread,
Aug 4, 2016, 2:42:49 PM8/4/16
to confluent...@googlegroups.com
Hi Emiliano,

if I understand you correctly, there is nothing special you need to do
(at least not in your application code). You are taking about "back
pressure" and Kafka Streams can handle it out of the box.

I assume, that you use something like .map() to do the lookup in the
external system. If your external system is not online, map() cannot
complete the call.

a) If your lookup call blocks with no time-out, there is nothing more
you need to do. Just wait until the system is back online, your call
returns, and processing will resume.
b) If your lookup call blocks and times-out at some point, just issue
the loopup within map() again (ie use a loop within map()). Repeat this
until the system is back online and your call succeeds.

As long as map() does not return an output record, you whole application
will just block (ie, pause) (except map() itself). Thus, the currently
processed record in map() will not be committed, and no new records will
be fetched from the input topic (this is called back pressure, and
results in "auto pausing" in your case -- not sure if "auto pausing" is
a good term for it though).



There is one important thing you need to consider though: as long as
your Streams application blocks, the internally used Kafka consumer
client will not send any heartbeats to Kafka and might time out. Thus,
you need to set the time-out configuration parameter higher than your
expected maximum down time of your external system.

The consumer client timeout issue is known for this case, and there is
already a KIP to fix it:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread


Hope this helps.


-Matthias
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Emiliano Capoccia

unread,
Aug 4, 2016, 3:43:20 PM8/4/16
to Confluent Platform
Hi Matthias, thanks for your interesting reply.

you're right, I'm using a flavor of map() to call the external system-- namely mapValues().
And, in my case, I'm wrapping the call to the external system in a hystrix command, so I the case b) applies, I can issue more hystrix commands in a loop until one succeed.

The reason why I didn't do it as yet was exactly what you just described as the caveat. Although I wasn't aware that the underlying reason / mechanism was the heartbeat  I was pretty sure that if you don't acknowledge a message for long enough after consuming it, kafka is going to tear down the consumer (in this case the consumer internal to the stream) and reassign the partition to another consumer in the group. 
That's why I did't try to stay in the call to map() indefinitely.

I think your answer clarify the situation, I believe that in order to effectively use the Kstream we'll need to wait for the background heartbeat to be in mainstream kafka (I guess that's a tricky implementation though, how are you going to tell apart a genuine hang?? we'll see :))

Thanks a lot,
Emiliano
>     > an email to confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent-platform+unsub...@googlegroups.com
>     <javascript:>>.
>     > To post to this group, send email to confluent...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent...@googlegroups.com <javascript:>>.
>     > To view this discussion on the web visit
>     >
>     https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com>
>
>     >
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Matthias J. Sax

unread,
Aug 4, 2016, 5:27:18 PM8/4/16
to confluent...@googlegroups.com
You are welcome. I was not sure how much background knowledge you have,
so sorry for explaining stuff you already considered.

-Matthias
> > > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <javascript:>
> > > <mailto:confluent-platf...@googlegroups.com
> <javascript:>
> > <javascript:>>.
> > > To post to this group, send email to
> confluent...@googlegroups.com
> > <javascript:>
> > > <mailto:confluent...@googlegroups.com <javascript:>>.
> > > To view this discussion on the web visit
> > >
> >
> https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com>
>
> >
> <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com>>
>
> >
> > >
> >
> <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer>
>
> >
> <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer>>>.
>
> >
> > > For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>
> > <https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>>.
> >
> > --
> > You received this message because you are subscribed to the Google
> > Groups "Confluent Platform" group.
> > To unsubscribe from this group and stop receiving emails from it,
> send
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> > To view this discussion on the web visit
> >
> https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com>
>
> >
> <https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
> > For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/989450ee-3069-43a8-9a9f-51cec46053f0%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/989450ee-3069-43a8-9a9f-51cec46053f0%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Michael Noll

unread,
Aug 5, 2016, 11:45:31 AM8/5/16
to confluent...@googlegroups.com
Emiliano ,

> The reason why I want to "pause" is that the content enricher depends on an
> external system which might not be available for a long time (minutes / hours).

To throw in another possible option:  Could you feed the data from this external system to Kafka, e.g. through Kafka Connect?  If so, you'd be able to decouple the processing behavior of your stream processing application from the uptime/availability of this external system.

-Michael



>     >     > an email to confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     >     <javascript:>
>     >     > <mailto:confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     >     <javascript:>>.
>     >     > To post to this group, send email to
>     confluent...@googlegroups.com
>     >     <javascript:>
>     >     > <mailto:confluent...@googlegroups.com <javascript:>>.
>     >     > To view this discussion on the web visit
>     >     >
>     >
>     https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com>
>
>     >
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com>>
>
>     >
>     >     >
>     >
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer>
>
>     >
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer>>>.
>
>     >
>     >     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>
>     >     <https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>>.
>     >
>     > --
>     > You received this message because you are subscribed to the Google
>     > Groups "Confluent Platform" group.
>     > To unsubscribe from this group and stop receiving emails from it,
>     send
>     > an email to confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent-platform+unsub...@googlegroups.com

>     <javascript:>>.
>     > To post to this group, send email to confluent...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent...@googlegroups.com <javascript:>>.
>     > To view this discussion on the web visit
>     >
>     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5fff1bdf-fb12-1485-673a-c428a81bbf2a%40confluent.io.

Emiliano Capoccia

unread,
Aug 6, 2016, 3:23:14 AM8/6/16
to Confluent Platform
There is nothing to be sorry Matthias, actually your reply helped us as it clarified how the system is supposed to behave.

In fact, following your considerations, we actually tried to just loop in the request to the downstream systems, and it turns out that we're ok with that. There are duplicates on reconnecting (partition rebalance) but the semantic we have specified in our SLA is at-least-once, so all ok.

Thanks a lot

Emiliano
>     >     > an email to confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     >     <javascript:>
>     >     > <mailto:confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     >     <javascript:>>.
>     >     > To post to this group, send email to
>     confluent...@googlegroups.com
>     >     <javascript:>
>     >     > <mailto:confluent...@googlegroups.com <javascript:>>.
>     >     > To view this discussion on the web visit
>     >     >
>     >    
>     https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com>
>
>     >    
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com>>
>
>     >
>     >     >
>     >    
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer>
>
>     >    
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/d17a604b-9354-4d1f-884a-5fe8eda51e0d%40googlegroups.com?utm_medium=email&utm_source=footer>>>.
>
>     >
>     >     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>
>     >     <https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>>.
>     >
>     > --
>     > You received this message because you are subscribed to the Google
>     > Groups "Confluent Platform" group.
>     > To unsubscribe from this group and stop receiving emails from it,
>     send
>     > an email to confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent-platform+unsub...@googlegroups.com
>     <javascript:>>.
>     > To post to this group, send email to confluent...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent...@googlegroups.com <javascript:>>.
>     > To view this discussion on the web visit
>     >
>     https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com>
>
>     >
>     <https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/f2cb51b6-7d29-46fd-aa65-7de4956b6f34%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Emiliano Capoccia

unread,
Aug 6, 2016, 3:32:11 AM8/6/16
to Confluent Platform
Thanks Michael for your answer,

I think your option is not viable in this particular case, as the input messages are event messages for a particular entity, and you can't anticipate which one you need, but surely your solution is something to consider in a different scenario.

Emiliano
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Alejandro

unread,
Oct 12, 2018, 2:59:59 PM10/12/18
to Confluent Platform
Hello, I'm sorry if it is very late but I'm facing the same problem i read from a source I have to make some map operations and then I have to write in a destination, the problem is that the destination has to run some jobs and they ask us to stop sending information during certain periods of time,what I was wondering to solve the problem was to maintain a reference of the  KafkaStreams streams in a separate thread and it will be calling an api that will determinate if we need to stop the process, in that case I will call the streams.close(); method, and then when I have to start the process again I will use the component that uses the KStreamBuilder to build my stream again and continue with my process, what do you think about it? do you think it can work based on your experience ?
Reply all
Reply to author
Forward
0 new messages