Spring reactive webclient + kafka chain

39 views
Skip to first unread message

Yaron Wahl

unread,
Nov 12, 2017, 7:46:59 AM11/12/17
to reactor-framework
Hi

I would like to do something like that

Reactive Kafka consumer=> enriched received records by calling Reactive WebClient=> sink enriched events back to reactive kafka producer

e.g Kafka in => Flux of (userID, userrData)  => reactive Webclient getEnrichedUserData(userID) => (userID, userData, enrichedUserData) => Kafka out


Thanks,

Aron

Yaron Wahl

unread,
Nov 13, 2017, 2:46:39 AM11/13/17
to reactor-framework
FYI:
        WebClient client = WebClient.create("http://mywebserver");

        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "mykafkaserver:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-itai");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ReceiverOptions<Integer, String> receiverOptions =
                ReceiverOptions.<Integer, String>create(consumerProps)
                        .subscription(Collections.singleton("analysis"))
                        .addAssignListener(partitions -> System.out.println(partitions))
                        .addRevokeListener(partitions -> System.out.println("Revoking " + partitions));


        Flux<ReceiverRecord<Integer, String>> inboundFlux =
                KafkaReceiver.create(receiverOptions)
                        .receive();




        inboundFlux.log().flatMap(r -> {
            r.receiverOffset().acknowledge();
            WebClient.ResponseSpec responseSpec = client.get().retrieve();
            Mono<String> restResp = responseSpec.bodyToMono(String.class);

            return restResp.map(m -> m + " " + r.key());
        }).subscribe(System.out::println);
Reply all
Reply to author
Forward
0 new messages