Kafka: emitter suddenly stops

Werner Gresshoff

Nov 16, 2021, 4:56:51 AM11/16/21
to Quarkus Development mailing list
as complete Kafka newbie I don't undertstand the behaviour of the request emitter. My use case is this: I have a file (consisting of ~208000 xml records, but having bigger ones, too). I'm splitting it into single records and try to emit every record to kafka:
try (Stream<String> lineStream = Files.lines(file, StandardCharsets.UTF_8)) {
AtomicReference<String> record = new AtomicReference<>("");
AtomicInteger numbers = new AtomicInteger(1);
lineStream.forEach(line -> {
if (!line.startsWith("<?xml") && !line.startsWith("</collection")) {
if (line.contains("<record")) {
line = line.replace(
"<record ", "<record " + namespace + " ");
String soFar = record.get();
record.set(soFar + line + "\n");
if (line.contains("</record")) {
LOG.info("Extracted record: " + numbers.get());
.withAck(() -> {
// Called when the message is acked
LOG.info("Sent record: " + numbers.getAndIncrement());
return CompletableFuture.completedFuture(null);
.withNack(throwable -> {
// Called when the message is nacked
return CompletableFuture.completedFuture(null);

LOG.info("File was processed.");
The code never arrives at the last LOG.info (if I remove the send command it does).
On consumer side I just ack every message without doing anything.
I have the imagination I'm running into backpressure control but don't know how to handle it...

Best regards

Werner Gresshoff

Nov 17, 2021, 3:38:22 AM11/17/21
to Quarkus Development mailing list
I checked with shorter messages (thought it might exceed a size limit) but there is no change in the behaviour.

Werner Gresshoff

Nov 17, 2021, 11:36:56 AM11/17/21
to Quarkus Development mailing list
After annotating the emitter with `@OnOverflow(value = OnOverflow.Strategy.UNBOUNDED_BUFFER)` the behaviour finally changed! Approx. 107000 records are consumed. All ended with the message

SRMSG18231: The record 341819 from topic-partition 'record-requests-0' has waited for 63 seconds to be acknowledged. This waiting time is greater than the configured threshold (60000 ms). At the moment 95995 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was 341818. This error is due to a potential issue in the application which does not acknowledged the records in a timely fashion. The connector cannot commit as a record processing has not completed.


Nov 22, 2021, 3:11:10 AM11/22/21
to werner.g...@gmail.com, Quarkus Development mailing list
Hello Werner,

Code snippet is not easily understandable inside a mail and you will have more chance that someone answer such questions if you post it to stackoverflow.
Can you please post this question to stackoverflow and include the configuration of the channel ?



