Reactor-Kafka Consumer sample doesn't fetch messages from the topic.

531 views
Skip to first unread message

muhammed...@silicon-nile.com

unread,
Nov 17, 2016, 4:57:55 AM11/17/16
to reactor-framework
Hi All,

I've started using reactor-kafka and from samples. but the consumer doesn't fetch messages from the topic.
here is consumer code:
public Cancellation consumeMessages(String topic) {
        ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic))
                .addAssignListener(partitions -> log.info("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.info("onPartitionsRevoked {}", partitions));
        Flux<ReceiverRecord<Integer, String>> kafkaFlux = Receiver.create(options).receive();
        
        
        return kafkaFlux.log().subscribe(message -> {
                ReceiverOffset offset = message.offset();
                ConsumerRecord<Integer, String> record = message.record();
                System.out.printf("Received message: topic-partition=%s offset=%d  key=%d value=%s\n",
                        offset.topicPartition(),
                        offset.offset(),
                        record.key(),
                        record.value());
            },
        e-> {System.err.println(e);},
        ()-> System.out.println("DONE>>>>>>>>>>>>>>>>>>>>>.."));
    }

and in the main method:
SampleConsumer consumer = new SampleConsumer(BOOTSTRAP_SERVERS);
Cancellation cancellation = consumer.consumeMessages(TOPIC);

 there're many messages in the topic but the result is :
2016-11-17 11:54:36.849  INFO 5388 --- [           main] reactor.Flux.Peek.1                      : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@37f3e2c3)
2016-11-17 11:54:36.851  INFO 5388 --- [           main] reactor.Flux.Peek.1                      : request(unbounded)
2016-11-17 11:54:36.871  INFO 5388 --- [-sample-group-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
metric.reporters = []
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = sample-group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
ssl.truststore.password = null
metrics.num.samples = 2
client.id = sample-consumer
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
ssl.protocol = TLS
check.crcs = true
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest

2016-11-17 11:54:36.989  INFO 5388 --- [-sample-group-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.9.0.1
2016-11-17 11:54:36.989  INFO 5388 --- [-sample-group-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 23c69d62a0cabf06




Reply all
Reply to author
Forward
0 new messages