Not able to get KafkaAvroSerializer/KafkaAvroDeserializer working

2,554 views
Skip to first unread message

Ravindra Akella

unread,
Mar 1, 2017, 4:37:31 AM3/1/17
to Confluent Platform
Hi All,
I have a Kafka application that consumes Avro encoded messages, and I have used Spring Cloud streams to integrate with the Kafka Broker.
Everything works fine if If test the application using Spring Clould Streams.

In my acceptance test, I have using a standalone client (using native Kafka APIs and KafkaAvroSerializer and KafkaAvroDeserializer) and I am not able to get the flow working.

There is a difference in the message payload, as seen in the console.

Using Spring Cloud Streams:
contentType"application/octet-stream"originalContentType+"application/vnd.provisionrequest.v97+avro"ORD10304011111101

Using Standalone Client:
aORD103040414883476838911101

Am I missing something in my producer?

Another observation is that my application is not  consuming the message when the event doesn't have avro encoding in it.

When I try to consume the message, I get the following exception:
14:15:20.911 [main] ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!


Producer Code:
import avro.oracle.communications.amp.provisioning.event.ProductActionCode;
import avro.oracle.communications.amp.provisioning.event.ProvisionComplete;
import avro.oracle.communications.amp.provisioning.event.ProvisionOrderLine;
import avro.oracle.communications.amp.provisioning.event.ProvisionRequest;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

/**
 * Created by RAAKELLA on 2/4/2017.
 */
public class AvroProducerADDSchemaRegistry {

 
public static void main(String[] args) {

   
new AvroProducerADDSchemaRegistry().sendMessage();
 
}

 
private void sendMessage() {
   
KafkaProducer producer = null;
   
try {
     
Integer Partition = 0;
     
Properties configProperties = configureProperties();
      producer
= new KafkaProducer(configProperties);
     
Long correlationId = System.currentTimeMillis();
     
GenericRecord avroRecord = getRecord(correlationId);

     
HashMap<String, Object> serProps = new HashMap<String, Object>();
      serProps
.put("schema.registry.url", "http://slc02pad.us.oracle.com:8081");
     
ProducerRecord<Object, Object> producerRecord = new ProducerRecord<Object, Object>("amp.provisioning.events.provisionRequest", "key", avroRecord);
      producer
.send(producerRecord);
     
List<ProvisionComplete> response = new ResponsePollerRegistry().getProvisionResponse("amp.provisioning.events.provisionComplete", "provisionCompleteGroup", ""+correlationId);

   
}
   
catch(Throwable t) {
      t
.printStackTrace();
   
}
   
finally {
      producer
.close();
   
}
 
}

 
private Properties configureProperties() {
   
Properties properties = new Properties();
    properties
.put("bootstrap.servers", "slc02pad.us.oracle.com:9092");
    properties
.put("acks","all");
    properties
.put("block.on.buffer.full","false");
    properties
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    properties
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    properties
.put("schema.registry.url", "http://slc02pad.us.oracle.com:8081");
    properties
.put("metadata.broker.list","slc02pad.us.oracle.com:9092");
   
return properties;
 
}


 
public GenericRecord getRecord(Long correlationId) {


   
final ProvisionRequest.Builder requestBuilder = ProvisionRequest.newBuilder();
    requestBuilder
.setCorrelationId(""+correlationId);
    requestBuilder
.setOrderId("ORD1030404");
   
final ProvisionOrderLine.Builder itemBuilder = ProvisionOrderLine.newBuilder();
    itemBuilder
.setSubscriptionId("101");
    itemBuilder
.setProductActionCode(ProductActionCode.ADD);
    itemBuilder
.setProvisionOrderLineId("1");
   
List<ProvisionOrderLine> items = new ArrayList<ProvisionOrderLine>();
    items
.add(itemBuilder.build());
    requestBuilder
.setItems(items);
   
final ProvisionRequest request = requestBuilder.build();
   
System.out.println(request);

   
return request;
 
}


}
Enter code here...
Consumer Code:
import avro.oracle.communications.amp.provisioning.event.ProvisionComplete;
import avro.oracle.communications.amp.provisioning.event.ProvisionError;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * Created by RAAKELLA on 2/8/2017.
 */
public class ResponsePollerRegistry {
 
public List<ProvisionComplete> getProvisionResponse(String topicName, String consumerGroup, String correlationId)throws Exception{

   
List<ProvisionComplete> completedEvents = new ArrayList<>();

   
KafkaConsumer<String,ProvisionComplete> kafkaConsumer;
   
Properties configProperties = configureProperties(consumerGroup);
    kafkaConsumer
= new KafkaConsumer<String, ProvisionComplete>(configProperties);
   
TopicPartition tp = new TopicPartition(topicName, 0);
   
TopicPartition[] tps = {tp};
    kafkaConsumer
.assign(Arrays.asList(tps));
    kafkaConsumer
.seekToBeginning(tp);
    kafkaConsumer
.position(tp);
   
try {
     
ConsumerRecords<String, ProvisionComplete> records = kafkaConsumer.poll(15000);
     
for (ConsumerRecord<String, ProvisionComplete> record : records) {
       
ProvisionComplete temp  = record.value();
       
System.out.println(temp);
       
if(temp.getCorrelationId().equals(correlationId)) {
          completedEvents
.add(temp);
       
}

     
}

     
System.out.print(completedEvents);

   
}
   
catch (Exception e) {
      e
.printStackTrace();
   
} finally{
      kafkaConsumer
.close();
   
}


   
return completedEvents;
 
}

 
private Properties configureProperties(String consumerGroup) {
   
Properties properties = new Properties();
    properties
.put("bootstrap.servers", "slc02pad.us.oracle.com:9092");
    properties
.put("key.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    properties
.put("value.deserializer", KafkaAvroDeserializer.class);
    properties
.put("group.id", consumerGroup);
    properties
.put("schema.registry.url", "http://slc02pad.us.oracle.com:8081");
//    properties.put("specific.avro.reader", true);
    return properties;
 
}
Enter code here...


'ProvisionComplete is my avro Object.

Please tell me if I am missing something in my code.

Thanks in advance.

ravindra

Michael Noll

unread,
Mar 1, 2017, 11:19:48 AM3/1/17
to confluent...@googlegroups.com
Within our code examples for Kafka's Streams API we also showcase how to use the normal Kafka producer/consumer clients to read and write Avro data:


This should help you to understand how to correctly configure the Kafka avro serializer/deserializer when using the producer and consumer client.  Note that, in the examples above, the actual writing and reading happens in a helper class (see link below), but you should be able to extrapolate from there.



> Another observation is that my application is not consuming the message when the event doesn't have avro encoding in it.
When I try to consume the message, I get the following exception:
> 14:15:20.911 [main] ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
> org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
> Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

That's expected.  You have configured your consumer client to use the Avro deserializer (KafkaAvroDeserializer), and if a message isn't properly Avro-encoded then the configured deserializer (KafkaAvroDeserializer) can't, well, deserialize it.  The error message says that the deserializer attempted to deserialize but failed.

Hope this helps,
Michael





--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/e36d0c1e-d61e-4988-a85c-b2ede5f42d74%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

Ravindra Akella

unread,
Mar 4, 2017, 12:06:27 PM3/4/17
to Confluent Platform
Michael,
Thanks for pointing me to a sample.
After changing my code, I am consistently running into following error. Also attached complete log.
I have upgraded my kafka client from 0.9.x to 0.10.x, could that be a problem?

22:30:53.666 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at den01ykg.us.oracle.com:9092.
22:30:53.805 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
22:30:53.805 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
22:30:53.805 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
22:30:53.806 [StreamThread-1] DEBUG org.apache.kafka.common.network.Selector - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
22:30:53.806 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1
22:30:53.829 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request {topics=[amp.provisioning.events.provisionRequest]} to node -1
22:30:53.940 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
22:30:53.940 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
22:30:53.940 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
22:30:53.941 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.common.network.Selector - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
22:30:53.941 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1
22:30:53.968 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request {topics=[amp.provisioning.events.provisionRequest]} to node -1
22:30:54.116 [StreamThread-1] DEBUG org.apache.kafka.common.network.Selector - Connection with den01ykg.us.oracle.com/10.89.202.72 disconnected
java.io.IOException: An existing connection was forcibly closed by the remote host
    at sun.nio.ch.SocketDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:197)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
22:30:54.117 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Node -1 disconnected.
22:30:54.119 [StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - Bootstrap broker den01ykg.us.oracle.com:9092 disconnected
22:30:54.120 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient - Cancelled GROUP_COORDINATOR request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@ad41b4, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=specific-avro-integration-test-1-StreamThread-1-consumer}, body={group_id=specific-avro-integration-test}), createdTimeMs=1488646853518, sendTimeMs=1488646853832) with correlation id 0 due to node -1 being disconnected
22:30:54.121 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Give up sending metadata request since no node is available
22:30:54.222 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request
22:30:54.222 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at den01ykg.us.oracle.com:9092.
22:30:54.244 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.common.network.Selector - Connection with den01ykg.us.oracle.com/10.89.202.72 disconnected
java.io.EOFException: null
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
    at java.lang.Thread.run(Thread.java:745)
22:30:54.244 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.NetworkClient - Node -1 disconnected.
22:30:54.244 [kafka-producer-network-thread | producer-2] WARN org.apache.kafka.clients.NetworkClient - Bootstrap broker den01ykg.us.oracle.com:9092 disconnected
22:30:54.244 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.NetworkClient - Give up sending metadata request since no node is available
22:30:54.345 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request
22:30:54.345 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at den01ykg.us.oracle.com:9092.
To post to this group, send email to confluent...@googlegroups.com.
log (2).txt

Eno Thereska

unread,
Mar 5, 2017, 5:48:27 AM3/5/17
to Confluent Platform
Hi Ravindra,

What version of the Kafka broker do you have? I ask because Kafka Streams works with brokers 0.10.0 and above. See here for compatibility matrix: http://docs.confluent.io/3.2.0/streams/introduction.html#requirements

If you are just experimenting with this, I'd suggest moving brokers and streams to the latest release 0.10.2.

Thanks
Eno
Reply all
Reply to author
Forward
0 new messages