Am trying to move data from Kafka to Elasticsearch for which am studying the Kafka Connector for Elasticsearch. The message from my Kafka topic has a structure similar to this Json -
{
"top1": "",
"top2": 0,
"inner": {
"inner1": "",
"inner2": 0,
"inner_most": {
"inner_most1": 0,
"inner_most2": ""
}
}
}
So basically the messages in Kafka is hierarchical (and I have no control over changing its structure). Now, the document structure needed in Elasticsearch is NOT hierarchical but plain field and value. That is something like -
top1: ""
top2: 0
inner1: ""
inner2: 0
inner_most1: 0
inner_most2: ""
So, if the Json in Kafka simply had this structure then the connector would do the job -
{
"top1": "",
"top2": 0,
"inner1": "",
"inner2": 0,
"inner_most1": 0,
"inner_most2": ""
}
I was exploring the following options to achieve this -
- Use Kafka Connect Transformations: It has transforms like ExtractField and InsertField but I cant think of a way to transform my hierarchical message to a plain one using these. Is it possible?
- Use Kafka Connector for Elasticsearch with Custom Converter: The Elasticsearch connector takes the Kafka Connect's properties file (like connect-avro-distributed.properties) in which we can specify a custom Java value converter class in "value.converter". If I write my own version of JsonConverter and use it to transform my Json, will the resulting Json be picked up by the Kafka Connector for Elasticsearch and indexed as such? Any gotcha in my method or understanding?
- Have separate Transformed Json Topic: If both (1) and (2) above dont work then the only option left would be to write a Kafka consumer which reads off my incoming hierarchical messages, converts them and writes the plain Json to a new topic. And I could use Kafka Connector for Elasticsearch to feed off this new topic - since the structure of messages in this new topic will now map exactly to the Elasticsearch document structure the connector's schema inferencing should work out. I prefer methods (1) and (2) over this because (3) adds both processing and storage overhead
- Of course I could write a consumer that reads messages and writes directly to Elasticsearch but this is my least preferred approach because I want to minimize the code I will have as part of this project
Now, is there a different way to achieve my objective?
One final question - the features section of Elasticsearch connector documentation (link) says - "The connector can infer mappings from the Kafka Connect schemas. When enabled, the connector creates mappings based on schemas of Kafka messages". How to disable this inferencing? By setting "key.converter.schemas.enable=false" and "value.converter.schemas.enable=false"