Unable to get the Java Consumer working in the Confluent Platform

451 views
Skip to first unread message

Ramkumar KB

unread,
Apr 12, 2015, 1:06:33 PM4/12/15
to confluent...@googlegroups.com
We are trying to implement the Java producer and Java consumer with the Confluent Platform.

#1. The CP platform is deployed on the RHEL (single node) and after deploying we wen thru the steps outlined here http://confluent.io/docs/current/quickstart.html . Everything is working ok (we produce messages to the "test" topic in Avro and are able to see in the consumer). 

#2: Next, we implemented the Producer in Java. It is working ok. We are able to see the message by using the command-line Consumer in the above step.

#3: However, we ran into some difficulties trying the get the Java Consumer working. Some issues we noticed are:

#3.1: Issue with the Maven POM: Out POM look like below, but we figured out that we needed to add 2 jars in our lib to make it work. It was not working via the Maven POM (not sure if some issue with the maven repo?)
(kafka-avro-serializer-1.0.jar, kafka-schema-registry-client-1.0.jar
)

Our POM for the consumer:
   <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.0</version>
        <scope>compile</scope>
    </dependency>
        <dependency>
            <artifactId>avro-parent</artifactId>
            <groupId>org.apache.avro</groupId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-mapred</artifactId>
            <version>1.7.7</version>
        </dependency>
      <!--  <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>1.0.0</version>
    </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>2.0</version>
        </dependency>-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.4</version>
        </dependency>
        <dependency><groupId>com.fasterxml.jackson</groupId>
            <artifactId>jackson-parent</artifactId>
            <version>2.5</version>
        </dependency>
    </dependencies>


#2: Now we got the consumer up and running but it looks like it is stuck in the while loop. The consumer keeps running without expiring, trying to execute
while(it.hasNext())

See below:
---------------
Log: with -verbose
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1$$anonfun$apply$mcV$sp$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded scala.collection.TraversableOnce$$anonfun$toMap$1 from file:/C:/Apps/.m2/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1$$anonfun$apply$mcV$sp$2 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcVI$sp$4 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded java.util.concurrent.locks.ReentrantReadWriteLock$Sync$HoldCounter from C:\Program Files (x86)\Java\jdk1.6.0_25\jre\lib\rt.jar]
[Loaded com.yammer.metrics.stats.ThreadLocalRandom from file:/C:/Apps/.m2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar]
[Loaded com.yammer.metrics.stats.ThreadLocalRandom$1 from file:/C:/Apps/.m2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$2 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$2$$anonfun$apply$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
2.1{test=[group1 kafka stream]}
3group1 kafka stream
[Loaded kafka.utils.ShutdownableThread$$anonfun$run$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.utils.KafkaScheduler$$anonfun$1$$anonfun$apply$mcV$sp$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ZookeeperConsumerConnector$$anonfun$autoCommit$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.utils.KafkaScheduler$$anonfun$1$$anonfun$apply$mcV$sp$4 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]

---------------

Any tips, hints or pointers would be invaluable to get us across the finish line, as far java producer and consumer with Avro messages.

One tip (for newbie on Kafka / CP) is always implement the producer and consumer as 2 separate java projects as the consumer still seems to require lot more dependecies (we guess this will be resolved in Kafka v0.9.0)

Thank you!

Ewen Cheslack-Postava

unread,
Apr 12, 2015, 3:04:21 PM4/12/15
to confluent...@googlegroups.com
Hi Ramkumar,

On Sun, Apr 12, 2015 at 10:06 AM, Ramkumar KB <ramku...@gmail.com> wrote:
We are trying to implement the Java producer and Java consumer with the Confluent Platform.

#1. The CP platform is deployed on the RHEL (single node) and after deploying we wen thru the steps outlined here http://confluent.io/docs/current/quickstart.html . Everything is working ok (we produce messages to the "test" topic in Avro and are able to see in the consumer). 

#2: Next, we implemented the Producer in Java. It is working ok. We are able to see the message by using the command-line Consumer in the above step.

#3: However, we ran into some difficulties trying the get the Java Consumer working. Some issues we noticed are:

#3.1: Issue with the Maven POM: Out POM look like below, but we figured out that we needed to add 2 jars in our lib to make it work. It was not working via the Maven POM (not sure if some issue with the maven repo?)
(kafka-avro-serializer-1.0.jar, kafka-schema-registry-client-1.0.jar
)
 
Those should be pulled in automatically by Maven and this has definitely been tested with the Confluent Maven repository.
 

Our POM for the consumer:
   <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.0</version>
        <scope>compile</scope>
    </dependency>

If you want to use Confluent's version, you can use version 0.8.2.0-cp. We'll always tag our builds with -cp so they don't conflict with Apache releases. In this case you can use either, but if you use an Apache release, you should use 0.8.2.1 since it fixed some important bugs (0.8.2.0-cp contains those same fixes, but was released before 0.8.2.1).
 
        <dependency>
            <artifactId>avro-parent</artifactId>
            <groupId>org.apache.avro</groupId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-mapred</artifactId>
            <version>1.7.7</version>
        </dependency>
      <!--  <dependency>

It looks like you started a comment here, is that intentional? If this section that includes kafka-avro-serializer is commented out, that would explain why the jars are not being included.
 
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>1.0.0</version>
    </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>2.0</version>
        </dependency>-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.4</version>
        </dependency>
        <dependency><groupId>com.fasterxml.jackson</groupId>
            <artifactId>jackson-parent</artifactId>
            <version>2.5</version>
        </dependency>
    </dependencies>

There's also some stuff included in your pom file that you shouldn't need to include explicitly. For example, kafka-schema-registry-client gets pulled in automatically by kafka-avro-serializer.
 


#2: Now we got the consumer up and running but it looks like it is stuck in the while loop. The consumer keeps running without expiring, trying to execute
while(it.hasNext())


This sounds like normal behavior when there are no new messages being published to the topic. Do you have a producer running at the same time? Alternatively, you can set the auto.offset.reset configuration for the consumer to 'smallest' to make it read from the beginning of the topic.
 
See below:
---------------
Log: with -verbose
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1$$anonfun$apply$mcV$sp$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded scala.collection.TraversableOnce$$anonfun$toMap$1 from file:/C:/Apps/.m2/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1$$anonfun$apply$mcV$sp$2 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcVI$sp$4 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded java.util.concurrent.locks.ReentrantReadWriteLock$Sync$HoldCounter from C:\Program Files (x86)\Java\jdk1.6.0_25\jre\lib\rt.jar]
[Loaded com.yammer.metrics.stats.ThreadLocalRandom from file:/C:/Apps/.m2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar]
[Loaded com.yammer.metrics.stats.ThreadLocalRandom$1 from file:/C:/Apps/.m2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$2 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$2$$anonfun$apply$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
2.1{test=[group1 kafka stream]}
3group1 kafka stream
[Loaded kafka.utils.ShutdownableThread$$anonfun$run$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.utils.KafkaScheduler$$anonfun$1$$anonfun$apply$mcV$sp$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ZookeeperConsumerConnector$$anonfun$autoCommit$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.utils.KafkaScheduler$$anonfun$1$$anonfun$apply$mcV$sp$4 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]

---------------

Any tips, hints or pointers would be invaluable to get us across the finish line, as far java producer and consumer with Avro messages.

One tip (for newbie on Kafka / CP) is always implement the producer and consumer as 2 separate java projects as the consumer still seems to require lot more dependecies (we guess this will be resolved in Kafka v0.9.0)

Yes, 0.9 should include the new consumer, which is in a separate kafka-clients artifact that has fewer dependencies than the artifact that includes the broker.

-Ewen
 

Thank you!

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bb35d240-b3c1-4540-8feb-80976fb6430d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Ramkumar KB

unread,
Apr 13, 2015, 12:39:26 PM4/13/15
to confluent...@googlegroups.com
Hi Ewen,

Thanks for the tips. We continued to try but still unable to read messages from our java consumer. It seems to get stuck at this line:
ConsumerIterator<Object, Object> it = stream.iterator();

Overall, we think we are making a "newbie mistake" but unable to put our fingers to it. In summary:

#1. On the maven file, for cetain reasons we could not make it work as you have suggested (maybe bcos we are inside the enterprise). Anyway, I think we have got the jars correct.

#2: We are using the following properties for the consumer, and Our consumer code looks like this:

    public Properties getConsumerProperties() {
       
Properties props = new Properties();
        props
.put("auto.offset.reset", "smallest");
        props
.put("zookeeper.connect", "sgls1000597.ap.net:2181");
        props
.put("group.id", "group1");
        props
.put("bootstrap.servers", "sgls1000597.ap.net:9092");
        props
.put("schema.registry.url", "http://sgls1000597.ap.net:8081");
        props
.put("key.deserializer",MyKafkaDeserializer.class);
        props
.put("value.deserializer",MyKafkaDeserializer.class);
        props
.put("partition.assignment.strategy", "range");
        props
.put("offsets.storage","zookeeper");
        props
.put("dual.commit.enabled","false");
       
// Set any other properties
       
return props;
   
}

And our consumer code looks like this (it waits on the line highlighed in yellow below):

    /**
     * Consumer reading the messages
     */

   
public void readMessage() {
       
String topic = "test";
       
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap
.put(topic, new Integer(1));
       
System.out.println("1");


       
VerifiableProperties vProps = new VerifiableProperties(getConsumerProperties());
       
System.out.println("1.1");
       
KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(vProps);
       
System.out.println("1.2");
       
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(vProps);
       
System.out.println("1.3");
       
ConsumerConfig conConfig = new ConsumerConfig(getConsumerProperties());
       
System.out.println("1.4");
       
ConsumerConnector connector = Consumer.createJavaConsumerConnector(conConfig);
       
// ConsumerConnector connector = Consumer.create(conConfig);
       
System.out.println("2");
//        Map<String, List<KafkaStream<Object, Object>>> consumerMap =
//                (HashMap<String, List<KafkaStream<Object, Object>>>)
//                connector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
       
Map<String, List<KafkaStream<Object, Object>>> consumerMap = connector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
       
System.out.println("2.1"+consumerMap);
       
KafkaStream<Object, Object> stream = consumerMap.get(topic).get(0);
       
System.out.println("3"+stream);
        ConsumerIterator<Object, Object> it = stream.iterator();
       
System.out.println("3.0" + it);
       
while (it.hasNext()) {
           
System.out.println("3.1");
           
MessageAndMetadata<Object, Object> messageAndMetadata = it.next();
           
System.out.println("3.2");
           
try {
               
//GenericData.Record rec = (GenericData.Record)messageAndMetadata.message();
               
Object rec = messageAndMetadata.message();
               
System.out.println("[rec:" + rec + "]");
//                String key = (String) messageAndMetadata.key();
//                String value = (String) messageAndMetadata.message();
//                System.out.println("[key:" + key + "; value:" + value + "]");
               
System.out.println("3.3");
           
} catch (Exception e) {
                   
System.out.println("[SOMETHING IS WRONG ]" + e.getMessage());
               
// may need to do something with it
                e
.printStackTrace();
               
System.out.println("3.4");
           
}
       
}
       
System.out.println("4");
   
}



#4: We had to write the fillowing concrete class (bcos the package -
 io.confluent.kafka.serializers
did not have one):

public class MyKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {


   
public MyKafkaDeserializer() {
   
}


   
@Override
   
public void configure(Map<String, ?> map, boolean b) {
           
System.out.println("MyKafkaDeserializer::configure");
   
}


   
@Override
   
public Object deserialize(String s, byte[] bytes) {
       
System.out.println("MyKafkaDeserializer::deserialize");
       
return super.deserialize(bytes);
   
}


   
@Override
   
public void close() {
       
System.out.println("MyKafkaDeserializer::close");
   
}
}

As before, any tips or pointers would be veyr helpful.

Thank you,
Hi Ramkumar,


Thank you!
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
Apr 13, 2015, 1:39:37 PM4/13/15
to confluent...@googlegroups.com
On Mon, Apr 13, 2015 at 9:39 AM, Ramkumar KB <ramku...@gmail.com> wrote:
Hi Ewen,

Thanks for the tips. We continued to try but still unable to read messages from our java consumer. It seems to get stuck at this line:
ConsumerIterator<Object, Object> it = stream.iterator();

Overall, we think we are making a "newbie mistake" but unable to put our fingers to it. In summary:

You might want to compare to this complete consumer example: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example That doesn't include the Avro deserialization, but it doesn't hurt to get the consumer working with just byte[] keys and values first, then add in the decoding.
 

#1. On the maven file, for cetain reasons we could not make it work as you have suggested (maybe bcos we are inside the enterprise). Anyway, I think we have got the jars correct.

#2: We are using the following properties for the consumer, and Our consumer code looks like this:

    public Properties getConsumerProperties() {
       
Properties props = new Properties();
        props
.put("auto.offset.reset", "smallest");
        props
.put("zookeeper.connect", "sgls1000597.ap.net:2181");
        props
.put("group.id", "group1");

Beware that if you're always using the same group ID, then once you have any offsets committed (which happens automatically if you don't override auto.commit.enable) then the auto.offset.reset setting won't have any effect and you'll need to produce new messages to see any consumer output.
 

        props
.put("bootstrap.servers", "sgls1000597.ap.net:9092");

There's no bootstrap.servers setting on the old consumer.
 
Are you sure it's really stuck on that line? That should just be an accessor, it shouldn't be doing anything that can block. And previously you had said it was getting to the it.hasNext() line.
 

#4: We had to write the fillowing concrete class (bcos the package -
 io.confluent.kafka.serializers
did not have one):

public class MyKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {


   
public MyKafkaDeserializer() {
   
}


   
@Override
   
public void configure(Map<String, ?> map, boolean b) {
           
System.out.println("MyKafkaDeserializer::configure");
   
}


   
@Override
   
public Object deserialize(String s, byte[] bytes) {
       
System.out.println("MyKafkaDeserializer::deserialize");
       
return super.deserialize(bytes);
   
}


   
@Override
   
public void close() {
       
System.out.println("MyKafkaDeserializer::close");
   
}
}


It looks like you're mixing up the old and new consumer APIs. The Deserializer interface is for the new consumer interface (you can tell since it lives under org.apache.kafka.common in the kafka-clients module which is the new client code). The old clients use the Encoder/Decoder interfaces in kafka.serializer, and to use the Confluent Avro format you should pass instances of io.confluent.kafka.serializers.KafkaAvroDecoder for both key and value decoders to createMessageStreams. It looks like you're already doing that, so this extra deserializer code is just superfluous, as are the key.deserializer and value.deserializer settings.

I'd recommend starting from the sample code I linked to, which should work out of the box for consuming byte[] data. Then converting it to decode the Avro should just be a couple of changed lines -- passing in the decoders and changing a couple of types from byte[] to Object.

 
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Andrew Otto

unread,
Apr 13, 2015, 2:11:00 PM4/13/15
to confluent...@googlegroups.com
While we are talking about Confluent Consumer docs, I noticed that there seems to be an error in:


I think there should be a 

import kafka.javaapi.consumer.ConsumerConnector;

and

  kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(vProps));

Should be:

  ConsumerConnector consumer =  kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(vProps));

as the next line in the code refers to a variable named ‘consumer’.

-Ao

Ewen Cheslack-Postava

unread,
Apr 13, 2015, 2:49:37 PM4/13/15
to confluent...@googlegroups.com
Thanks for the heads up Andrew! I've filed a bug internally and it should be fixed with the next release.

-Ewen


For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Prakash Kaul

unread,
Apr 14, 2015, 5:57:17 AM4/14/15
to confluent...@googlegroups.com
Thanks for all your inputs.

In its current form, I managed to make consumer read the records.

In producer code, I published message into
String key = "key1";
ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>("test", key, avroRecord);

This value should be same as that of groupId from where consumer should be reading the messages.
props.put("group.id", "key1");

Output:


Besides this, the simple consumer code worked well.

Regards,
Prakash
...

Ewen Cheslack-Postava

unread,
Apr 14, 2015, 12:22:50 PM4/14/15
to confluent...@googlegroups.com
On Tue, Apr 14, 2015 at 2:57 AM, Prakash Kaul <kaul.p...@gmail.com> wrote:
Thanks for all your inputs.

In its current form, I managed to make consumer read the records.

In producer code, I published message into
String key = "key1";
ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>("test", key, avroRecord);

This value should be same as that of groupId from where consumer should be reading the messages.
props.put("group.id", "key1");

Glad you got it working Prakash. But just to make sure there isn't any misunderstanding, I want to point out that the group.id and keys in records aren't connected in any way. They do not need to have the same value for the consumer to receive the messages. If you have the code working now, changing only one of them (or even removing the key entirely) should still work.

-Ewen
 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages