State store crash recovery

301 views
Skip to first unread message

Peng Liu

unread,
Aug 10, 2016, 11:47:29 AM8/10/16
to Confluent Platform
Hi guys,

It was mentioned in a previous post that RocksDB caches 1000 items. In the case of large joins, would using an in-memory store (using the Processor API) be better suited? Does an in-memory store back itself up in an internal Kafka topic as well? Specifically, if a state is an aggregation of a topic up to some point, and then the process dies, is Streams able to recover some checkpointed state and continue building the aggregate from where it left off? (Since an aggregate is the sum of the history of the initial topic, what I'm afraid of is being forced to replay the entire topic to reconstruct the aggregation store, which may be impossible due to compaction. As such, if the aggregate store could maintain checkpoints of the aggregation up to index i and if the process dies, we can start with the checkpointed aggregate and start aggregating from index i in the initial topic, that would be ideal)

On a somewhat related note, for a multi-table join, are their unforeseen consequences (namely in terms of consistency and recovery from a crash) for doing a single-state-store join (using the Processor API and a state-store that is shared between processors that process different topics) vs converting each topic into a KTable and pairwise joining them until they are all joined? I'd PREFER the former since it would generate far fewer intermediate topics, but not at the cost of consistency or recoverability.

Thank you so much,

Peng Liu

Matthias J. Sax

unread,
Aug 15, 2016, 5:13:11 AM8/15/16
to confluent...@googlegroups.com
Hi,

if you use an custom store, you need to take care about fault-tolerance
by custom code. There is no out-of-the box backup via a topic. The
provided in-memory states are not backed by a topic (that why they are
called "in-memory"). If a larger cache size would be better depends (as
always).

For KTable, backup happens automatically. Also, KTable write the latest
aggregation result into the topic -- thus, changelog compaction ensure
that recovery time does not grow if the application keeps running, and
at the same time, the latest aggregation result will always be present,
thus allowing to replay the KStream from a certain point in time (still,
with at least-once processing semantics).

About multi-way joins. Using a customized low-level implementation
should work.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/f2db127a-adc9-4e30-8dc4-3f37a479660e%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/f2db127a-adc9-4e30-8dc4-3f37a479660e%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Peng Liu

unread,
Aug 15, 2016, 1:21:30 PM8/15/16
to Confluent Platform
Hi Matthias,

Thank you for the clarification. Is there anywhere you could point me to use as a reference regarding how to properly implement fault-tolerance and recovery for a state store (like how it is implemented for a KTable)? I'm guessing I would also have to implement storing the various per-topic offsets that a given state snapshot has read up to, correct?

Thank you :)

Matthias J. Sax

unread,
Aug 16, 2016, 4:58:26 AM8/16/16
to confluent...@googlegroups.com
You can have a look into

> org.apache.kafka.streams.state.internals.RocksDBStore

It uses

> private StoreChangeLogger<Bytes, byte[]> changeLogger;

to backup all state updates in a changelog topic.

Of course, using a changelog topic is only one way to implement a
fault-tolerant store. In general, you need to put the backup into a
distributed storage system to make it available to all application
instances (in case of re-balance), e.g., distributed database/key-value
store, distributed file system or such.

And yes. You need to somehow sync the input offsets with the state
snapshots. In Kafka Stream, it works as follows:

1. initiate commit at source level
2. wait until all downstream operators did commit (including state
snapshots and flushing buffered records)
3. commit offsets


Have a look into

> org.apache.kafka.streams.processor.internals.StreamTask#commit()

for further details.


-Matthias
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/f2db127a-adc9-4e30-8dc4-3f37a479660e%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/a34659f4-1c24-4d95-a6ca-4096900ec9d4%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/a34659f4-1c24-4d95-a6ca-4096900ec9d4%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages