Kafka Connect internal Struct conversion in sink

685 views
Skip to first unread message

Eric Pheatt

unread,
Mar 16, 2017, 8:40:17 PM3/16/17
to Confluent Platform
The company I work for is evaluating kafka connect for orchestrating our internal/external integrations and we have started using the kafka-connect-salesforce source (https://github.com/jcustenborder/kafka-connect-salesforce) to stream updates from our various SFDC orgs to our data lake as the POC to move away from our current homegrown solution based on RabbitMQ + Camel + Play!.  While working through this I was inspired by the related kafka-connect-solr sink (https://github.com/jcustenborder/kafka-connect-solr) to also decouple our current solr indexing process because the number of fields we index would cumbersome to express using the properties file style of the solr sink or using KIP-66 transforms and have started on a small POC in https://github.com/epheatt/kafka-connect-morphlines to plug in the general purpose ETL commands available to morphlines prior to loading to solr.

When the put method on a sink is passed the Collection<SinkRecord> should the configured internal converter have been run already on the body (in my case json) or is the connector expected get a Struct and then handle the conversion independently at the edge? In my initial test case I am getting a "java.lang.ClassCastException: org.apache.kafka.connect.data.Struct cannot be cast to java.io.InputStream" due to the way the scenario is set up but due to my relative new-ness to the framework I'm not sure what is exactly expected here.

Is anyone else working on something similar with morphlines for kafka connect? My eventual thought is once I get the solr indexing working that the sink could be used generically to write transformed records to another topic (or fan-out using a new morphline command to write avro/json/etc.. to multiple topics) when a kstream consumer/producer would be overkill but KIP-66 transforms are not sufficient.



Gwen Shapira

unread,
Mar 21, 2017, 3:53:06 PM3/21/17
to confluent...@googlegroups.com
Hi,

I'm not working on Morphlines (I didn't like it back at Cloudera), but I may be able to help with transformations a bit...

The Transformations run after the Converter - so you will not get a JSON, you should get either a Struct (if you have schema.enable=true) or a Map (schema.enable=false).

Your "apply" method can choose which types to handle and how to handle them. You can check the InsertField transformation in the Kafka code to see how both types are handled.

Hope this helps a bit :)

Let me know if you have more questions! I think it's the first external transformation and I'm excited!

Gwen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b9ecfecc-a226-4207-990c-20bd9dc00286%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 @gwenshap
Follow us: Twitter | blog

Eric Pheatt

unread,
Mar 21, 2017, 7:07:33 PM3/21/17
to Confluent Platform
Hi Gwen,

On Tuesday, March 21, 2017 at 12:53:06 PM UTC-7, Gwen Shapira wrote:
Hi,

I'm not working on Morphlines (I didn't like it back at Cloudera), but I may be able to help with transformations a bit...

The Transformations run after the Converter - so you will not get a JSON, you should get either a Struct (if you have schema.enable=true) or a Map (schema.enable=false).

Your "apply" method can choose which types to handle and how to handle them. You can check the InsertField transformation in the Kafka code to see how both types are handled.


I'm working in the context of a SinkTask right now because I see Morphlines as being "heavier" than the lightweight internal transforms and the potential for external side effects like the loadSolr command would keep this as an edge activity from my perspective. I'll take a look at that more closely though once I have a final command record emitter for a morphlines pipeline implemented to persist the results of a transform to a destination topic. Ideally I would use the built in toAvro command provided as part of morphlines so that I can just ship the body back to a topic but it looks like that is missing schema-registry support so I may need to add that capability as well to round things out.

I initially have taken the approach to set a byte array as the _attachement_body field when it is a string (to use the readLine command) or using the internal JsonConverter to transform the struct based on the schema (to use readJson command and then extractJsonPaths). It feels a little hacky based on my knowledge so far (https://github.com/epheatt/kafka-connect-morphlines/blob/master/src/main/java/com/github/epheatt/kafka/connect/morphlines/MorphlineSinkTask.java#L163) so I may add a custom morphlines readSinkRecord command to reduce some of the code assumptions I am making to push conversion decisions into the config explicitly. 
 
Hope this helps a bit :)

Let me know if you have more questions! I think it's the first external transformation and I'm excited!

If this POC gets to a production quality level I'd like to find an upstream home for it with either Confluent or Cloudera if there is broader interest for the capabilities provided. 

Thanks,
Eric
 
Gwen

On Thu, Mar 16, 2017 at 5:40 PM, Eric Pheatt <eric....@gmail.com> wrote:
The company I work for is evaluating kafka connect for orchestrating our internal/external integrations and we have started using the kafka-connect-salesforce source (https://github.com/jcustenborder/kafka-connect-salesforce) to stream updates from our various SFDC orgs to our data lake as the POC to move away from our current homegrown solution based on RabbitMQ + Camel + Play!.  While working through this I was inspired by the related kafka-connect-solr sink (https://github.com/jcustenborder/kafka-connect-solr) to also decouple our current solr indexing process because the number of fields we index would cumbersome to express using the properties file style of the solr sink or using KIP-66 transforms and have started on a small POC in https://github.com/epheatt/kafka-connect-morphlines to plug in the general purpose ETL commands available to morphlines prior to loading to solr.

When the put method on a sink is passed the Collection<SinkRecord> should the configured internal converter have been run already on the body (in my case json) or is the connector expected get a Struct and then handle the conversion independently at the edge? In my initial test case I am getting a "java.lang.ClassCastException: org.apache.kafka.connect.data.Struct cannot be cast to java.io.InputStream" due to the way the scenario is set up but due to my relative new-ness to the framework I'm not sure what is exactly expected here.

Is anyone else working on something similar with morphlines for kafka connect? My eventual thought is once I get the solr indexing working that the sink could be used generically to write transformed records to another topic (or fan-out using a new morphline command to write avro/json/etc.. to multiple topics) when a kstream consumer/producer would be overkill but KIP-66 transforms are not sufficient.



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages