Question on Kafka Connect Transforms with Connector for Elasticsearch

2,002 views
Skip to first unread message

Bharadwaj Narasimha

unread,
Apr 20, 2017, 6:07:37 AM4/20/17
to Confluent Platform
Am trying to move data from Kafka to Elasticsearch for which am studying the Kafka Connector for Elasticsearch. The message from my Kafka topic has a structure similar to this Json -

{
  "top1": "",
  "top2": 0,
  "inner": {
    "inner1": "",
    "inner2": 0,
    "inner_most": {
      "inner_most1": 0,
      "inner_most2": ""
    }
  }
}

So basically the messages in Kafka is hierarchical (and I have no control over changing its structure). Now, the document structure needed in Elasticsearch is NOT hierarchical but plain field and value. That is something like -

top1: ""
top2: 0
inner1: ""
inner2: 0
inner_most1: 0
inner_most2: ""

So, if the Json in Kafka simply had this structure then the connector would do the job - 

{
  "top1": "",
  "top2": 0,
  "inner1": "",
  "inner2": 0,
  "inner_most1": 0,
  "inner_most2": ""
}

I was exploring the following options to achieve this -
  1. Use Kafka Connect Transformations: It has transforms like ExtractField and InsertField but I cant think of a way to transform my hierarchical message to a plain one using these. Is it possible?
  2. Use Kafka Connector for Elasticsearch with Custom Converter: The Elasticsearch connector takes the Kafka Connect's properties file (like connect-avro-distributed.properties) in which we can specify a custom Java value converter class in "value.converter". If I write my own version of JsonConverter and use it to transform my Json, will the resulting Json be picked up by the Kafka Connector for Elasticsearch and indexed as such? Any gotcha in my method or understanding?
  3. Have separate Transformed Json Topic: If both (1) and (2) above dont work then the only option left would be to write a Kafka consumer which reads off my incoming hierarchical messages, converts them and writes the plain Json to a new topic. And I could use Kafka Connector for Elasticsearch to feed off this new topic - since the structure of messages in this new topic will now map exactly to the Elasticsearch document structure the connector's schema inferencing should work out. I prefer methods (1) and (2) over this because (3) adds both processing and storage overhead
  4. Of course I could write a consumer that reads messages and writes directly to Elasticsearch but this is my least preferred approach because I want to minimize the code I will have as part of this project
Now, is there a different way to achieve my objective? 

One final question - the features section of Elasticsearch connector documentation (link) says - "The connector can infer mappings from the Kafka Connect schemas. When enabled, the connector creates mappings based on schemas of Kafka messages". How to disable this inferencing? By setting "key.converter.schemas.enable=false" and "value.converter.schemas.enable=false"

Ewen Cheslack-Postava

unread,
Apr 20, 2017, 7:26:48 PM4/20/17
to Confluent Platform
On Thu, Apr 20, 2017 at 3:07 AM, Bharadwaj Narasimha <bharat...@gmail.com> wrote:
Am trying to move data from Kafka to Elasticsearch for which am studying the Kafka Connector for Elasticsearch. The message from my Kafka topic has a structure similar to this Json -

{
  "top1": "",
  "top2": 0,
  "inner": {
    "inner1": "",
    "inner2": 0,
    "inner_most": {
      "inner_most1": 0,
      "inner_most2": ""
    }
  }
}

So basically the messages in Kafka is hierarchical (and I have no control over changing its structure). Now, the document structure needed in Elasticsearch is NOT hierarchical but plain field and value. That is something like -

top1: ""
top2: 0
inner1: ""
inner2: 0
inner_most1: 0
inner_most2: ""

So, if the Json in Kafka simply had this structure then the connector would do the job - 

{
  "top1": "",
  "top2": 0,
  "inner1": "",
  "inner2": 0,
  "inner_most1": 0,
  "inner_most2": ""
}

Are you saying it is a requirement of your application that the structure is flattened like this? Or that the ES connector doesn't support nested data? The ES connector should handle nested data, so I wouldn't expect to need to flatten data like this.
 

I was exploring the following options to achieve this -
  1. Use Kafka Connect Transformations: It has transforms like ExtractField and InsertField but I cant think of a way to transform my hierarchical message to a plain one using these. Is it possible?
https://github.com/apache/kafka/pull/2458 is in flight and includes a Flatten transformation. In general, if you can do the transformation by looking at only a single record and outputting a single record, you can do it with single message transformations.

  1. Use Kafka Connector for Elasticsearch with Custom Converter: The Elasticsearch connector takes the Kafka Connect's properties file (like connect-avro-distributed.properties) in which we can specify a custom Java value converter class in "value.converter". If I write my own version of JsonConverter and use it to transform my Json, will the resulting Json be picked up by the Kafka Connector for Elasticsearch and indexed as such? Any gotcha in my method or understanding?
Generally I'd suggest not forking connectors. There's usually a better way to accomplish this, either by using a single message transform or by making a configurable feature into the original connector. 
  1. Have separate Transformed Json Topic: If both (1) and (2) above dont work then the only option left would be to write a Kafka consumer which reads off my incoming hierarchical messages, converts them and writes the plain Json to a new topic. And I could use Kafka Connector for Elasticsearch to feed off this new topic - since the structure of messages in this new topic will now map exactly to the Elasticsearch document structure the connector's schema inferencing should work out. I prefer methods (1) and (2) over this because (3) adds both processing and storage overhead
For more complex transformations, this would be a good solution. You could also do this as a Kafka Streams app for a higher level, easier to use API.
 
  1. Of course I could write a consumer that reads messages and writes directly to Elasticsearch but this is my least preferred approach because I want to minimize the code I will have as part of this project
Now, is there a different way to achieve my objective? 

One final question - the features section of Elasticsearch connector documentation (link) says - "The connector can infer mappings from the Kafka Connect schemas. When enabled, the connector creates mappings based on schemas of Kafka messages". How to disable this inferencing? By setting "key.converter.schemas.enable=false" and "value.converter.schemas.enable=false"


-Ewen
 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/a4466165-864b-422c-bc93-4c39bd42e619%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Bharadwaj Narasimha

unread,
Apr 20, 2017, 10:49:49 PM4/20/17
to Confluent Platform
Thanks Ewen. 

AFAIK, the ES connector maps the top level fields in the struct to ES document fields. Nested fields in the struct may go in as collections in the ES document but that is something I dont want to let happen. My usecase seems to match exactly what Flatten is probably intended to do. Is there a example on this transform that I can read somewhere? Am also looking for examples for these other transform functions - HoistToStruct, ExtractFromStruct and ValueToKeyI think  I have usecases to use these also in my pipeline.

To post to this group, send email to confluent...@googlegroups.com.

Gwen Shapira

unread,
Apr 21, 2017, 12:31:46 PM4/21/17
to confluent...@googlegroups.com
Hoist is included in the example in the docs:


We are working on adding additional examples, stay tuned :)

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 @gwenshap
Follow us: Twitter | blog

Reply all
Reply to author
Forward
0 new messages