Hi team,
I'm trying to figure out how to easily cast a object getting from the consumer to an avro object. What I know is that I can do
T t = (T) SpecificData.get().deepCopy(T.SCHEMA$, GenericRecordObject) to convert a GenericRecord to an avro class.
But I'm thinking if we can do this easier.
From previous posts, I think someone mentioned that if set 'specific.avro.reader=true', I can get a SpecificRecord out of the consumer directly, then I can do
T t = (T) record.value()
to cast it to the avro object.
But when I set 'specific.avro.reader=true', I actually cannot consume anything from the topic. (I can get everything by setting specific.avro.reader=false or not setting specific.avro.reader).
Here is the code:
Properties properties = new Properties();
InputStream props = Resources.getResource("consumer.props").openStream();
properties.load(props);
properties.put("specific.avro.reader", "true");
properties.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
properties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic_name));
try {
while (true) {
ConsumerRecords<Object, Object> records = consumer.poll(10000);
System.out.println("polling");
for (ConsumerRecord<Object, Object> record : records) {
System.out.println(record.key());
System.out.println(record.value());
KafkaMessage message = (KafkaMessage) record.value();
System.out.println(message.get("id"));
System.out.println(message.get("metadata"));
}
}
} finally {
consumer.close();
}
The KafkaMessage is the avro class compiled from the avro schema.
Here is my pom file:
<properties>
<java.source>1.8</java.source>
<java.target>1.8</java.target>
<avro.version>1.8.0</avro.version>
<maven.compiler.plugin.version>3.1</maven.compiler.plugin.version>
<kafka.avro.serializer.version>3.0.0</kafka.avro.serializer.version>
<confluet.platform.version>0.10.0.0-cp1</confluet.platform.version>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka.avro.serializer.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${confluet.platform.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
</dependencies>
Am I doing it correctly? Any help is welcomed!
Thanks,
Jun
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b9648790-cb75-4454-bf5e-f0d4969d46f8%40googlegroups.com.
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${apache.avro.version}</version>
<executions>
<execution>
<id>generate-avro-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b9648790-cb75-4454-bf5e-f0d4969d46f8%40googlegroups.com.
--Thanks,
Ewen
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b9648790-cb75-4454-bf5e-f0d4969d46f8%40googlegroups.com.
--
--Thanks,
Ewen
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/e0049333-cdef-47db-aa2c-2c956c109fb7%40googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b9648790-cb75-4454-bf5e-f0d4969d46f8%40googlegroups.com.
--Thanks,
Ewen
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
@org.apache.avro.specific.AvroGenerated
public class RegState extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord
public class RegStateKafkaConsumer {
public static void main(String[] args) throws Exception {
String topic = "test3";
String group = "group";
Properties props = new Properties();
props.put("bootstrap.servers", "xxx.net:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//props.put("key.deserializer","io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("value.deserializer","io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://xxx.net:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
KafkaConsumer<String, RegState> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
while (true) {
ConsumerRecords<String, RegState> records = consumer.poll(100);
System.out.println("Polling Topic="+ topic + "; record count="+ records.count() );
for (ConsumerRecord<String, RegState> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
RegState regState = record.value();
System.out.println(regState.getCmts());
}
}