Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "mykafkaserver:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-itai");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton("analysis"))
.addAssignListener(partitions -> System.out.println(partitions))
.addRevokeListener(partitions -> System.out.println("Revoking " + partitions));
Flux<ReceiverRecord<Integer, String>> inboundFlux =
KafkaReceiver.create(receiverOptions)
.receive();
inboundFlux.log().flatMap(r -> {
r.receiverOffset().acknowledge();
WebClient.ResponseSpec responseSpec = client.get().retrieve();
Mono<String> restResp = responseSpec.bodyToMono(String.class);
return restResp.map(m -> m + " " + r.key());
}).subscribe(System.out::println);