Hello,
as complete Kafka newbie I don't undertstand the behaviour of the request emitter. My use case is this: I have a file (consisting of ~208000 xml records, but having bigger ones, too). I'm splitting it into single records and try to emit every record to kafka:
```java
try (Stream<String> lineStream = Files.lines(file, StandardCharsets.UTF_8)) {
AtomicReference<String> record = new AtomicReference<>("");
AtomicInteger numbers = new AtomicInteger(1);
lineStream.forEach(line -> {
if (!line.startsWith("<?xml") && !line.startsWith("</collection")) {
if (line.contains("<record")) {
line = line.replace(
"<record ", "<record " + namespace + " ");
}
String soFar = record.get();
record.set(soFar + line + "\n");
if (line.contains("</record")) {
LOG.info("Extracted record: " + numbers.get());
recordRequestEmitter.send(Message.of(record.get())
.withAck(() -> {
// Called when the message is acked
LOG.info("Sent record: " + numbers.getAndIncrement());
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
LOG.info(throwable.getMessage());
LOG.info(throwable.getCause());
return CompletableFuture.completedFuture(null);
}));
record.set("");
}
}
});
}
LOG.info("File was processed.");
```
The code never arrives at the last LOG.info (if I remove the send command it does).
On consumer side I just ack every message without doing anything.
I have the imagination I'm running into backpressure control but don't know how to handle it...
Best regards
Werner