Kafka: emitter suddenly stops

Skip to first unread message

Werner Gresshoff

Nov 16, 2021, 4:56:51 AM11/16/21
to Quarkus Development mailing list
as complete Kafka newbie I don't undertstand the behaviour of the request emitter. My use case is this: I have a file (consisting of ~208000 xml records, but having bigger ones, too). I'm splitting it into single records and try to emit every record to kafka:
try (Stream<String> lineStream = Files.lines(file, StandardCharsets.UTF_8)) {
AtomicReference<String> record = new AtomicReference<>("");
AtomicInteger numbers = new AtomicInteger(1);
lineStream.forEach(line -> {
if (!line.startsWith("<?xml") && !line.startsWith("</collection")) {
if (line.contains("<record")) {
line = line.replace(
"<record ", "<record " + namespace + " ");
String soFar = record.get();
record.set(soFar + line + "\n");
if (line.contains("</record")) {
LOG.info("Extracted record: " + numbers.get());
.withAck(() -> {
// Called when the message is acked
LOG.info("Sent record: " + numbers.getAndIncrement());
return CompletableFuture.completedFuture(null);
.withNack(throwable -> {
// Called when the message is nacked
return CompletableFuture.completedFuture(null);

LOG.info("File was processed.");
The code never arrives at the last LOG.info (if I remove the send command it does).
On consumer side I just ack every message without doing anything.
I have the imagination I'm running into backpressure control but don't know how to handle it...

Best regards

Werner Gresshoff

Nov 17, 2021, 3:38:22 AM11/17/21
to Quarkus Development mailing list
I checked with shorter messages (thought it might exceed a size limit) but there is no change in the behaviour.

Werner Gresshoff

Nov 17, 2021, 11:36:56 AM11/17/21
to Quarkus Development mailing list
After annotating the emitter with `@OnOverflow(value = OnOverflow.Strategy.UNBOUNDED_BUFFER)` the behaviour finally changed! Approx. 107000 records are consumed. All ended with the message

SRMSG18231: The record 341819 from topic-partition 'record-requests-0' has waited for 63 seconds to be acknowledged. This waiting time is greater than the configured threshold (60000 ms). At the moment 95995 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was 341818. This error is due to a potential issue in the application which does not acknowledged the records in a timely fashion. The connector cannot commit as a record processing has not completed.


Nov 22, 2021, 3:11:10 AM11/22/21
to werner.g...@gmail.com, Quarkus Development mailing list
Hello Werner,

Code snippet is not easily understandable inside a mail and you will have more chance that someone answer such questions if you post it to stackoverflow.
Can you please post this question to stackoverflow and include the configuration of the channel ?



You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quarkus-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/quarkus-dev/f8ef409a-1a23-4d62-9b6e-08d7d2b8aae8n%40googlegroups.com.
Reply all
Reply to author
0 new messages