Avro converter in sink connector

1,348 views
Skip to first unread message

Adarsh Lal

unread,
Dec 27, 2016, 9:45:01 AM12/27/16
to Confluent Platform
Hi,

I am new to Schema registry and Avro.
We are developing sink connector. We are using Avro as the key and value converter in the connect. 
Does the sinkRecord.value() will give a fully deserialized message? 
Do we need to do anything else to get the message?

The value schema used is bytes.

Thanks and Regards,
Adarshlal

Andrew Xue

unread,
Dec 27, 2016, 6:10:22 PM12/27/16
to Confluent Platform
these are the properties you need

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=your_schema_registry_url
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=your_schema_registry_url

Adarsh Lal

unread,
Dec 28, 2016, 12:45:03 AM12/28/16
to Confluent Platform
Hi,

I am using Avro converter in the connect. I have already given these configurations you mentioned.

My question is 

If I am using Avro as the key and value converter, whether I need to perform any deserialization or conversion, in the sinkConnector side.

Is the message produced by the source connector into the Kafka topic is already deserialized?

If we are are using source connector -> streams -> sink connector flow for the ETL pipeline, where does the deserialization gets performed? 

Thanks and Regards,
Adarshlal

and...@datamountaineer.com

unread,
Dec 28, 2016, 3:57:44 AM12/28/16
to Confluent Platform
Hi Adarsh,

Depending on your target sink you will need to convert from the Connect Struct for the key and value you get from the SinkRecord and convert to the target type.

For example


/**
* Convert SinkRecord type to Kudu and add the column to the Kudu row
*
* @param field SinkRecord Field
* @param record Sink record
* @param row The Kudu row to add the field to
* @return the updated Kudu row
**/
private def addFieldToRow( record: SinkRecord,
field: Field,
row: PartialRow): PartialRow = {
val fieldType = field.schema().`type`()
val fieldName = field.name()
val struct = record.value().asInstanceOf[Struct]
fieldType match {
case Type.STRING => row.addString(fieldName, struct.getString(fieldName))
case Type.INT8 => row.addByte(fieldName, struct.getInt8(fieldName).asInstanceOf[Byte])
case Type.INT16 => row.addShort(fieldName, struct.getInt16(fieldName))
case Type.INT32 => row.addInt(fieldName, struct.getInt32(fieldName))
case Type.INT64 => row.addLong(fieldName, struct.getInt64(fieldName))
case Type.BOOLEAN => row.addBoolean(fieldName, struct.get(fieldName).asInstanceOf[Boolean])
case Type.FLOAT32 => row.addFloat(fieldName, struct.getFloat32(fieldName))
case Type.FLOAT64 => row.addFloat(fieldName, struct.getFloat64(fieldName).toFloat)
case Type.BYTES => row.addBinary(fieldName, struct.getBytes(fieldName))
case _ => throw new UnsupportedOperationException(s"Unknown type $fieldType")
}
row
}

If you are using a Source Connector, for example the JDBC Source, the key and value are serialized as Avro and written to Kafka, in KStreams, depending on what you are doing you are deserializing and serializing as you read and write to Kafka. The Sink Connector will deserialize the Avro from Kafka and convert it to a Connect Struct which is wrapped in the SinkRecord. This is what you get in the Sink.

Hope this helps.

Andrew

Ewen Cheslack-Postava

unread,
Dec 29, 2016, 12:42:42 PM12/29/16
to Confluent Platform
To clarify, sink connectors shouldn't ever need to do their own *deserialization* of data from Kafka. The framework handles the deserialization step (via the converters in the user's configuration) and passes *Connect Data API* formatted data to the converter (e.g. Structs, as Andrew mentioned). However, you may need to perform additional transformations to get the data into a format compatible with the sink system (e.g. converting that struct into a database row in the case of a JDBC sink).

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/1bee9ff8-dcb4-429b-a7bf-f66fd5cbed07%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Adarsh Lal

unread,
Jan 4, 2017, 8:49:30 AM1/4/17
to Confluent Platform
Hi

I am using the Avro converter in the connect.
I am sending message Bytes schema from the source connector.


new SourceRecord(null, null, kafkaTopic, Schema.BYTES_SCHEMA, message)

But I get this error in both streams and sink connecter.

[B cannot be cast to org.apache.avro.generic.GenericRecord

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=confluent-connect-cassandra, partition=47, offset=0
 at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:96)
 at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
 at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:144)
 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:415)
 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.lang.ClassCastException: [B cannot be cast to org.apache.avro.generic.GenericRecord
 at com.attinad.cantiz.message.properties.demo.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:36)
 at com.attinad.cantiz.message.properties.demo.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:1)
 at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:43)
 at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:94)
 ... 4 more

Ewen Cheslack-Postava

unread,
Jan 4, 2017, 5:42:25 PM1/4/17
to Confluent Platform
[B is the type for a byte array. The issue is that the GenericAvroDeserializer you're using is assuming that you're always deserializing a GenericRecord and trying to cast to that, but you're actually sending just a bare byte[]. The deserializer should really just return Object to fully support all types that Avro can send.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages