My Kafka version is 2.2.0 (scala 2.12) and I currently have 3 brokers which have the following configuration:
The code is like below:
final KStream<String, IdListExportMessage> exportedDeviceIdsStream =
builder.stream(TOPIC_DEVICE_IDS_EXPORTED);
final KTable<String, Device> deviceTable = builder.table(
TOPIC_DEVICE_STREAM,
Consumed.with(Serdes.String(), deviceSerde)
);
exportedDeviceIdsStream
.flatMap((requestId, exportMessage) ->
CollectionUtil.nullSafe(exportMessage.getValues())
.stream()
.map(deviceId -> {
final ExportedDevice exportedDevice = new ExportedDevice();
final String key = String.join(KEY_DELIMITER, exportMessage.getAppId(), deviceId);
exportedDevice.setAppId(exportMessage.getAppId());
exportedDevice.setRequestId(requestId);
return KeyValue.pair(key, exportedDevice);
})
.collect(Collectors.toList())
)
.peek((key, exportedDevice) -> log.debug("Flatmap OK"))
.join(
deviceTable,
(exportedDevice, device) -> {
exportedDevice.setDevice(device);
return exportedDevice;
},
Joined.with(Serdes.String(), exportedDeviceSerde, deviceSerde)
)
.map((deviceId, exportedDevice) -> KeyValue.pair(exportedDevice.getRequestId(), exportedDevice.getDevice()))
.peek((requestId, device) -> log.debug("Join OK"))
.to(TOPIC_DEVICES_EXPORTED, Produced.with(Serdes.String(), deviceSerde));
All device updates are streamed to deviceTable but some devices may not be updated for hours, days or event months. When I have a single app instance with exactly-once disabled, the device stream is joined with deviceTable even the latest update was a week ago. (I can see both "Flatmap OK" and "Join OK" logs.)
When I deploy 2 app instances to production with exactly-once enabled, I'm facing an issue where the join doesn't work. (the 2nd peek never logs any output) If I execute a service call which reads all devices from DB and streams that data into the deviceTable then the join works as expected until ~10 minutes is passed. After that interval, the join doesn't work until I re-stream all data from DB to the KTable again.
I thought that exactly-once semantics only apply to the KStream side for a KStream-KTable join and that the KTable always has the latest data ready for a join only if the left part of the join (which is the KStream in this case) has data. Am I missing something or is this the expected behaviour?