Cannot consume anything after set specific.avro.reader=true

4,974 views
Skip to first unread message

Jun MA

unread,
Jun 1, 2016, 7:16:09 PM6/1/16
to Confluent Platform

Hi team,


I'm trying to figure out how to easily cast a object getting from the consumer to an avro object. What I know is that I can do 


T t = (T) SpecificData.get().deepCopy(T.SCHEMA$, GenericRecordObject) to convert a GenericRecord to an avro class.


But I'm thinking if we can do this easier.


From previous posts, I think someone mentioned that if set 'specific.avro.reader=true', I can get a SpecificRecord out of the consumer directly, then I can do 


T t = (T) record.value()


to cast it to the avro object.


But when I set 'specific.avro.reader=true', I actually cannot consume anything from the topic. (I can get everything by setting specific.avro.reader=false or not setting specific.avro.reader).


Here is the code:


        Properties properties = new Properties();

        InputStream props = Resources.getResource("consumer.props").openStream();

        properties.load(props);

        properties.put("specific.avro.reader", "true");

        properties.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        properties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Arrays.asList(topic_name));


        try {

            while (true) {

                ConsumerRecords<Object, Object> records = consumer.poll(10000);

                System.out.println("polling");

                for (ConsumerRecord<Object, Object> record : records) {

                    System.out.println(record.key());

                    System.out.println(record.value());

                    KafkaMessage message = (KafkaMessage) record.value();

                    System.out.println(message.get("id"));

                    System.out.println(message.get("metadata"));

                }

            }

        } finally {

            consumer.close();

        }


The KafkaMessage is the avro class compiled from the avro schema.


Here is my pom file:

    <properties>

        <java.source>1.8</java.source>

        <java.target>1.8</java.target>

        <avro.version>1.8.0</avro.version>

        <maven.compiler.plugin.version>3.1</maven.compiler.plugin.version>

        <kafka.avro.serializer.version>3.0.0</kafka.avro.serializer.version>

        <confluet.platform.version>0.10.0.0-cp1</confluet.platform.version>

    </properties>

    <dependencies>

        <dependency>

            <groupId>io.confluent</groupId>

            <artifactId>kafka-avro-serializer</artifactId>

            <version>${kafka.avro.serializer.version}</version>

            <exclusions>

                <exclusion>

                    <groupId>org.slf4j</groupId>

                    <artifactId>slf4j-log4j12</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.11</artifactId>

            <version>${confluet.platform.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.avro</groupId>

            <artifactId>avro</artifactId>

            <version>${avro.version}</version>

        </dependency>

    </dependencies>


Am I doing it correctly? Any help is welcomed!


Thanks,

Jun

Ewen Cheslack-Postava

unread,
Jun 2, 2016, 2:20:07 AM6/2/16
to Confluent Platform
Does anything get logged? You're correct about the expected behavior -- it should deserialize to a SpecificRecord and you can cast directly. If nothing else changed, then it would be expected to see the data either consumed or an exception thrown.

Is this trivial to reproduce? One other reason you might see a difference on the second execution is if offsets were committed. If you join the same group in the second run (using the same ID from the consumer props file you load) and offsets were committed (which can happen automatically if you use the default enable.auto.commit setting of true). Does toggling back and forth consistently produce the same behavior?

-Ewen


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b9648790-cb75-4454-bf5e-f0d4969d46f8%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

gerard...@dizzit.com

unread,
Jun 2, 2016, 2:49:31 AM6/2/16
to Confluent Platform
How did you generate the avro object? We gad some issues, till we used the maven plugin, like this (this could be done in a seperate project):

<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${apache.avro.version}</version>
<executions>
<execution>
<id>generate-avro-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

Michael Noll

unread,
Jun 2, 2016, 4:25:48 AM6/2/16
to confluent...@googlegroups.com
Jun,

you may also want to take a look at the (specific + generic) Avro examples at https://github.com/confluentinc/examples/tree/master/kafka-streams.  These demonstrate how to read + write specific Avro records;  the focus is on how to do this with Kafka Streams, but the "normal" Kafka producer + consumer clients are also used to 1. generate the input data (producer) and 2. read + verify the output data generated by Streams (consumer0.  These examples use Confluent schema registry, btw.

Notably:

Hope this helps a bit,
Michael



To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Jun MA

unread,
Jun 2, 2016, 7:22:22 PM6/2/16
to Confluent Platform
Thanks guys for your help! I found the issue. 
I used to use KafkaAvroSerializer as the key.serializer, but I actually passed a String as the key, and when I deserialize it using KafkaAvroDeserializer with specific.avro.reader=true, I consumed nothing with no log.
But after I change the key deserializer to org.apache.kafka.common.serialization.StringDeserializer, it works. 
Or I can use an actual Avro object as the key and keep the avro serializer/deserializer, it also works.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Jagadeesh Dharmalingam

unread,
Jun 28, 2018, 6:00:37 PM6/28/18
to Confluent Platform
 cant consume anything when SPECIFIC_AVRO_READER_CONFIG=true . when commented out, getting java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.xxx.RegState
RegState class () generated from avro_tools.jar
@org.apache.avro.specific.AvroGenerated
public class RegState extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord

Here's my producer:

kafka-avro-console-producer --broker-list localhost:9092 --topic test3  --property parse.key=true --property key.separator=: --property key.schema='{"type":"string"}' --property value.schema='{"type":"record","name":"RegState","fields":[{"name":"mac","type":"string"},{"name":"node","type":"string"},{"name": "cmts","type":"string"}]}'
"key4":{"mac":"12","node":"ABC","cmts":"CMTS3"}
"key5":{"mac":"12","node":"XYZ","cmts":"CMTS3"}


consumer:

public class RegStateKafkaConsumer {
public static void main(String[] args) throws Exception {
String topic = "test3";
String group = "group";
Properties props = new Properties();
props.put("bootstrap.servers", "xxx.net:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//props.put("key.deserializer","io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("value.deserializer","io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://xxx.net:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

KafkaConsumer<String, RegState> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);

while (true) {
ConsumerRecords<String, RegState> records = consumer.poll(100);
System.out.println("Polling Topic="+ topic + "; record count="+ records.count() );
for (ConsumerRecord<String, RegState> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
RegState regState = record.value();
System.out.println(regState.getCmts());
}
}
Reply all
Reply to author
Forward
0 new messages