Exactly Once Semantics for KStream to KTable joins

165 views
Skip to first unread message

Alper Kanat

unread,
Sep 17, 2019, 9:55:01 AM9/17/19
to confluent...@googlegroups.com
Hi,

I've a Kafka Streams application which deals with a stream of exported device ids and joins them with a KTable of Device's. The streaming app is a dockerized Spring Boot application with the following configuration:

application.id=export-processing
num.stream.threads=10
processing.guarantee=exactly_once

My Kafka version is 2.2.0 (scala 2.12) and I currently have 3 brokers which have the following configuration:

log.roll.ms=3600000
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.bytes=2000024

The code is like below:

        final KStream<String, IdListExportMessage> exportedDeviceIdsStream =
            builder.stream(TOPIC_DEVICE_IDS_EXPORTED);

        final KTable<String, Device> deviceTable = builder.table(
            TOPIC_DEVICE_STREAM,
            Consumed.with(Serdes.String(), deviceSerde)
        );

        exportedDeviceIdsStream
            .flatMap((requestId, exportMessage) ->
                CollectionUtil.nullSafe(exportMessage.getValues())
                    .stream()
                    .map(deviceId -> {
                        final ExportedDevice exportedDevice = new ExportedDevice();
                        final String key = String.join(KEY_DELIMITER, exportMessage.getAppId(), deviceId);

                        exportedDevice.setAppId(exportMessage.getAppId());
                        exportedDevice.setRequestId(requestId);

                        return KeyValue.pair(key, exportedDevice);
                    })
                    .collect(Collectors.toList())
            )
            .peek((key, exportedDevice) -> log.debug("Flatmap OK"))
            .join(
                deviceTable,
                (exportedDevice, device) -> {
                    exportedDevice.setDevice(device);

                    return exportedDevice;
                },
                Joined.with(Serdes.String(), exportedDeviceSerde, deviceSerde)
            )
            .map((deviceId, exportedDevice) -> KeyValue.pair(exportedDevice.getRequestId(), exportedDevice.getDevice()))
            .peek((requestId, device) -> log.debug("Join OK"))
            .to(TOPIC_DEVICES_EXPORTED, Produced.with(Serdes.String(), deviceSerde));

All device updates are streamed to deviceTable but some devices may not be updated for hours, days or event months. When I have a single app instance with exactly-once disabled, the device stream is joined with deviceTable even the latest update was a week ago. (I can see both "Flatmap OK" and "Join OK" logs.)

When I deploy 2 app instances to production with exactly-once enabled, I'm facing an issue where the join doesn't work. (the 2nd peek never logs any output) If I execute a service call which reads all devices from DB and streams that data into the deviceTable then the join works as expected until ~10 minutes is passed. After that interval, the join doesn't work until I re-stream all data from DB to the KTable again.

I thought that exactly-once semantics only apply to the KStream side for a KStream-KTable join and that the KTable always has the latest data ready for a join only if the left part of the join (which is the KStream in this case) has data. Am I missing something or is this the expected behaviour?

Alper Kanat

unread,
Sep 17, 2019, 11:02:17 AM9/17/19
to confluent...@googlegroups.com
Sorry if I caused any misunderstanding. I may have given a wrong impression about the join that it's not working. Actually it partially works. The deviceIds stream is a batched stream that consists about 2500 messages and flatMap produces 2500 messages. After the join, only few of them (nondeterministic; can be 38 or 83 or 1000 but not all) are produced by the join. If I read all devices and stream them into deviceTable topic then all records are successfully produced by the join.

Bruno Cadonna

unread,
Sep 17, 2019, 3:05:55 PM9/17/19
to confluent...@googlegroups.com
Hi Alper,

I am afraid, I cannot completely follow.

What does "If I read all devices and stream them into deviceTable
topic then all records are successfully produced by the join" mean? Is
the deviceTable topic empty before or just partially filled?

What happens if you run one app instance with exactly-once enabled?
What happens if you run 2 app instance with exactly-once disabled?

Note, that exactly-once is applied to the whole app and not only to parts of it.

Best,
Bruno
> --
> You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAPMuxnRHz-B_CSYzJ%2Br6gman2wyKbxmb8t8xJN12pMgoF4VfcQ%40mail.gmail.com.

Alper Kanat

unread,
Sep 17, 2019, 6:37:42 PM9/17/19
to confluent...@googlegroups.com
Hi Bruno,

The devices exist in Elasticsearch and I publish all updates to devices to a Kafka topic called "device_stream". A device consists of a deviceId, deviceModel etc. and this data both exists in Elasticsearch and the "device_stream" topic. When I need to export some of the devices, I export their deviceId's to a Kafka topic called "devices_exported". "device_stream" is a compacted topic and I'm using it as a KTable within the app. (deviceTable) I use this table to enrich exported devices and then forward this data to another microservice.I'm

The deviceTable topic starts empty but then through the lifecycle of the system, updates to devices are published to this topic. Eventually it'll have all device's latest updates. Currently it's full of devices (I'm able to read it using kafka-topics.sh with --from-beginning argument).

Considering the code I shared with you, imagine device with ID "A" is received at exportedDeviceIdsStream. I know that "A" exists in deviceTable (as I read the topic using kafka-topics.sh --from-beginning and grep it using "A"). So when "A" is received on the left side of the join, I expect it to continue with the join and see the log "Join OK". On production, the exportedDeviceIdsStream receives a batch of 2500 device ids and I see "Flatmap OK" 2500 times. However I don't see 2500 "Join OK" messages while assuming that all those devices exist in deviceTable. (You may ask how I can be sure about this.. I'm reading the topic with kafka-topics.sh with --from-beginning and I write the output into a file, then count the number of lines first. Then I pick random deviceId's from the initial list and check them against the file. I know that deviceTable may have more than 1 record per deviceId since it stores updates but when I count the number of records, I can see that there are millions of records in that file so I strongly believe that I'd at least see something close to 2500 for that batch)

When exactly-once is enabled, the join stops producing records to downstream after the batch is produced at least once. "A" is only processed once and when the next time "A" is received from the stream, the join doesn't produce any messages despite "A" exists in deviceTable. The join only continues producing if I publish "A" again into "device_stream" topic and then send "A" again to "devices_exported" topic. So for production, I'll have to republish all devices into "device_stream".

If I disable exactly-once but don't republish devices into "device_stream" the join also doesn't produce. However once I republish all devices into "device_stream" (only once and despite they exist inside the topic), the join works as expected and I won't have to republish devices.

So far, I couldn't make any connection to the number of app instances. When I disable exactly-once and refresh "device_stream" the join seems to be working fine. When I enable exactly-once, the join works once and it won't until I refresh the "device_stream" topic. (by refresh, I mean republishing all devices into "device_stream" KTable despite they exist in the topic)

According to the docs [1], the left hand side (KStream) of the inner join is responsible for triggering the join while the right hand side (KTable) only updates the join state. I think exactly-once has nothing to do with this scenario but somehow disabling it and refreshing the KTable seems to solve the issue.


--
Alper Kanat


Bruno Cadonna

unread,
Sep 17, 2019, 7:43:41 PM9/17/19
to confluent...@googlegroups.com
Hi Alper,

That is indeed a strange behaviour.

Did you clean up your local state with `KafkaStreams#cleanUp()`
between the runs with and without exactly-once enabled?
Are you sure you do not send devices with value null from
ElasticSearch to topic device_stream? Records with value null remove
records with the same key from the KTable.

Best,
Bruno
> To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAPMuxnS23p%3DF5L7eJKA19zVAp3uD0DLJQr-Sc90KtoN3FHReOQ%40mail.gmail.com.

Alper Kanat

unread,
Sep 18, 2019, 3:36:33 AM9/18/19
to confluent...@googlegroups.com
Hi Bruno,

The app runs in a container and whenever I restart it (for changing env vars etc.) the local state is gone as I don't set a reserved volume for the local state. I'm 100% positive that I'm not sending null valued devices from Elasticsearch to topic device_stream.

Of all the things I've read so far, I'm almost sure that exactly-once has nothing to do with this weird behavior but it seems somehow related as once I refresh the device_stream and disable exactly-once the problem seems to be gone. However because of disabling exactly-once, I've to decrease the number of app instances to 1 and set restart_policy condition (of docker) to none to prevent the app from auto restarting for making sure no duplicate messages are produced in case of a failure.

--
Alper Kanat


Alper Kanat

unread,
Sep 18, 2019, 10:59:44 AM9/18/19
to confluent...@googlegroups.com
Does exactly-once guarantee and the settings it enables affect joins between KStream and KTable in any way?

--
Alper Kanat

Bruno Cadonna

unread,
Sep 18, 2019, 11:28:31 AM9/18/19
to confluent...@googlegroups.com
Exactly-once should not affect the semantics of a DSL operation. If it
does, it is a bug.

Best,
Bruno
> To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAPMuxnRt6Rsxaty1th9esOhNcXP4DWNzrEkKP6aM9x5%2B7Y1sTw%40mail.gmail.com.

Bruno Cadonna

unread,
Sep 19, 2019, 5:59:21 PM9/19/19
to confluent...@googlegroups.com
Hi Alper,

It could be that you get the behaviour you describe because of timing
issues. If the stream is processed before the ktable is completely
loaded, the join will not produce what you expect.

How do you set the timestamp in your records?
Could you try to set max.task.idle.ms to something larger than 0 --
let's say 1000ms -- and see if the issue persist?

Best,
Bruno

Alper Kanat

unread,
Sep 20, 2019, 4:17:42 AM9/20/19
to confluent...@googlegroups.com
Hi Bruno,

By KTable being completely loaded, do you mean the backing changelog topic or the KTable instance inside the app? If you mean the latter, once the app is restarted or redeployed (and given that it's working inside a non-persistent container) its state is gone and with the default settings, it'll start from the latest offset, am I right? Persisting the local state and starting from the earliest offset might help, am I right? Still, the lack of this configuration doesn't explain why, after a while despite we refreshed all records inside the KTable, we were unable to produce (by the join).

I believe the timestamps for both the KStream and KTable side are automatically assigned in this case. (producing messages using Spring Kafka's KafkaTemplate) I'll have a look at this and will try the setting you mentioned.

Thanks!

Bruno Cadonna

unread,
Sep 20, 2019, 10:22:39 PM9/20/19
to confluent...@googlegroups.com
Hi Alper,

> By KTable being completely loaded, do you mean the backing changelog topic or the KTable instance inside the app?

I mean the latter.

> If you mean the latter, once the app is restarted or redeployed (and given that it's working inside a non-persistent container) its state is gone and with the default settings, it'll start from the latest offset, am I right?

Yes

> Persisting the local state and starting from the earliest offset might help, am I right?

I think so.

> I believe the timestamps for both the KStream and KTable side are automatically assigned in this case. (producing messages using Spring Kafka's KafkaTemplate) I'll have a look at this and will try the setting you mentioned.

Looking forward to your findings.

Best,
Bruno

Alper Kanat

unread,
Oct 17, 2019, 6:28:43 AM10/17/19
to confluent...@googlegroups.com
Hi,

Sorry for the late reply. Looks like my streaming apps with joins store a copy of the right (KTable) side locally and keep an offset of it. Since I didn't persist the directory within the container, that state is gone and therefore I was no longer be able to join with some records. Persisting state stores and increasing the standby replicas (in case of a failure) solved the problem.

--
Alper Kanat


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages