The company I work for is evaluating kafka connect for orchestrating our internal/external integrations and we have started using the kafka-connect-salesforce source (
https://github.com/jcustenborder/kafka-connect-salesforce) to stream updates from our various SFDC orgs to our data lake as the POC to move away from our current homegrown solution based on RabbitMQ + Camel + Play!. While working through this I was inspired by the related kafka-connect-solr sink (
https://github.com/jcustenborder/kafka-connect-solr) to also decouple our current solr indexing process because the number of fields we index would cumbersome to express using the properties file style of the solr sink or using KIP-66 transforms and have started on a small POC in
https://github.com/epheatt/kafka-connect-morphlines to plug in the general purpose ETL commands available to morphlines prior to loading to solr.
When the put method on a sink is passed the Collection<SinkRecord> should the configured internal converter have been run already on the body (in my case json) or is the connector expected get a Struct and then handle the conversion independently at the edge? In my initial test case I am getting a "java.lang.ClassCastException: org.apache.kafka.connect.data.Struct cannot be cast to java.io.InputStream" due to the way the scenario is set up but due to my relative new-ness to the framework I'm not sure what is exactly expected here.
Is anyone else working on something similar with morphlines for kafka connect? My eventual thought is once I get the solr indexing working that the sink could be used generically to write transformed records to another topic (or fan-out using a new morphline command to write avro/json/etc.. to multiple topics) when a kstream consumer/producer would be overkill but KIP-66 transforms are not sufficient.