Kafka: emitter suddenly stops

39 views
Skip to first unread message

Werner Gresshoff

unread,
Nov 16, 2021, 4:56:51 AM11/16/21
to Quarkus Development mailing list
Hello,
as complete Kafka newbie I don't undertstand the behaviour of the request emitter. My use case is this: I have a file (consisting of ~208000 xml records, but having bigger ones, too). I'm splitting it into single records and try to emit every record to kafka:
```java
try (Stream<String> lineStream = Files.lines(file, StandardCharsets.UTF_8)) {
AtomicReference<String> record = new AtomicReference<>("");
AtomicInteger numbers = new AtomicInteger(1);
lineStream.forEach(line -> {
if (!line.startsWith("<?xml") && !line.startsWith("</collection")) {
if (line.contains("<record")) {
line = line.replace(
"<record ", "<record " + namespace + " ");
}
String soFar = record.get();
record.set(soFar + line + "\n");
if (line.contains("</record")) {
LOG.info("Extracted record: " + numbers.get());
recordRequestEmitter.send(Message.of(record.get())
.withAck(() -> {
// Called when the message is acked
LOG.info("Sent record: " + numbers.getAndIncrement());
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
LOG.info(throwable.getMessage());
LOG.info(throwable.getCause());
return CompletableFuture.completedFuture(null);
}));

record.set("");
}
}
});
}
LOG.info("File was processed.");
```
The code never arrives at the last LOG.info (if I remove the send command it does).
On consumer side I just ack every message without doing anything.
I have the imagination I'm running into backpressure control but don't know how to handle it...

Best regards
Werner

Werner Gresshoff

unread,
Nov 17, 2021, 3:38:22 AM11/17/21
to Quarkus Development mailing list
I checked with shorter messages (thought it might exceed a size limit) but there is no change in the behaviour.

Werner Gresshoff

unread,
Nov 17, 2021, 11:36:56 AM11/17/21
to Quarkus Development mailing list
After annotating the emitter with `@OnOverflow(value = OnOverflow.Strategy.UNBOUNDED_BUFFER)` the behaviour finally changed! Approx. 107000 records are consumed. All ended with the message

SRMSG18231: The record 341819 from topic-partition 'record-requests-0' has waited for 63 seconds to be acknowledged. This waiting time is greater than the configured threshold (60000 ms). At the moment 95995 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was 341818. This error is due to a potential issue in the application which does not acknowledged the records in a timely fashion. The connector cannot commit as a record processing has not completed.

Loïc MATHIEU

unread,
Nov 22, 2021, 3:11:10 AM11/22/21
to werner.g...@gmail.com, Quarkus Development mailing list
Hello Werner,

Code snippet is not easily understandable inside a mail and you will have more chance that someone answer such questions if you post it to stackoverflow.
Can you please post this question to stackoverflow and include the configuration of the channel ?

Regards,

Loïc

--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quarkus-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/quarkus-dev/f8ef409a-1a23-4d62-9b6e-08d7d2b8aae8n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages