Not able to Update document Elastic search

1,096 views
Skip to first unread message

Bala Murugan

unread,
May 18, 2017, 9:42:11 AM5/18/17
to Confluent Platform

Hi All,

I have installed confluent.

In kafka-connect-jdbc/quickstart-postgres.properties 

name=task_2
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:postgresql://localhost/v2?user=cw&password=****
table.types=table
query=select  es_test_id ,name,last_update_on from es_test
mode=timestamp+incrementing
timestamp.column.name=last_update_on
incrementing.column.name=es_test_id
validate.non.null=false
topic.prefix=new-topic-two


In kafka-connect-elasticsearch/quickstart-elasticsearch.properties

name=es-name-2
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=new-topic-two
key.ignore=true
connection.url=http://localhost:9200
type.name=mytype1

while inserting value in table (es_test) , works fine .

My issue is while updating the "name" and last_update_on in  the table . the value not updating Elasticsearch .i inserting Elasticsearch.

so elastic search showing old and updated values.

Please help me to update the value in elasticsearch .

Thanks.
Bala.

I


Ewen Cheslack-Postava

unread,
May 18, 2017, 6:44:38 PM5/18/17
to Confluent Platform
Bala,

Since your ES connector config has key.igore=true, it is ignoring the key from the Kafka message and instead setting the document ID in ES to <topic>+<partition>+<offset>.

If your Kafka messages have keys, you probably just want to set key.ignore=false. If they don't you may need to use a single message transform to fill in the correct key by extracting it from the value. There is a transformation ValueToKey that will do this, you can find documentation about it here http://kafka.apache.org/documentation.html#connect_transforms

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/6f6f2d22-d944-457c-b5ec-eba0e290a8b2%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Bala Murugan

unread,
May 19, 2017, 9:44:11 AM5/19/17
to Confluent Platform
Ewen,

Thank you so much for replying.

your reply help me lot to understand how kafka transfer the record to ES and i had gone through kafka connect docs and transforms


Please help me to create document ID in which config file and how to create the document ID.

Below are my database table information.

My Database is postgresql.


query : select  es_test_id ,name,last_update_on from es_test.


Column Name

Data type

es_test_id

Serial(auto increment)

name

varying character

last_update_on

timestamp with timezone

 

Table contains below data


es_test_id

name

last_update_on

1

bala

2017-05-18 19:04:31.2378+05:30

2

murugan

2017-05-18 19:04:32.3078+05:30


-
Bala

Gwen Shapira

unread,
May 21, 2017, 12:59:26 AM5/21/17
to Confluent Platform
As Ewen said, you want to:

1. Set key.ignore = false in the ES connector configuration.

2. Make sure your events have a key, by using the ValueToKey transformation. Here's an example:

transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=es_test_id

Gwen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/05253969-d3b0-4ae7-bcd9-a12f4acf3394%40googlegroups.com.

Bala Murugan

unread,
May 21, 2017, 7:31:23 AM5/21/17
to Confluent Platform
Gwen,

Thank you so for replying.

I have included the configuration as you posted  as below.

name=es-name-2

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=new-topic-two

key.ignore=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=es_test_id

connection.url=http://localhost:9200
type.name=mytype1

I'm getting this exception .

org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id.
at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Kindly help me to resolve this.

-
Bala

Pratikshya Kuinkel

unread,
May 23, 2017, 2:20:23 AM5/23/17
to Confluent Platform
Hi All,

I am having similar issue when using elasticsearch-sink connector. I am reading data using debezium connector from table to kafka. So the data in Kafka is stored in following format :

{
"before": null,
"after": {
"id": 1,
"perm_id": 1,
"title": "Big Data Control Center"
},
"source": {
"name": "dbserver1",
"server_id": 1,
"ts_sec": 1495502210,
"gtid": null,
"file": "ON.000013",
"pos": 2355,
"row": 0,
"snapshot": null,
"thread": 10,
"db": "test",
"table": "test_message"
},
"op": "c",
"ts_ms": 1495502210527
}


Kafka input is already in Map format. So when I try to index it with key.ignore= false, it complaints about MAP is not supported as the document id - which makes sense as without any transform it wouldn't work. I don't know how I can generate document id on this so that I can use "id" value as document id - I tried above mentioned method with key as after.id but doesn't seem to work :

transforms=InsertKey

transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey

transforms.InsertKey.fields=after.id



 It still complaints "MAP is not supported as the document id." If you can provide any documentation or example which is doing similar stuff, it would be really helpful. 

Thanks,
Pratikshya 

singhraj...@gmail.com

unread,
Oct 11, 2018, 12:27:56 AM10/11/18
to Confluent Platform
Use this as configuration

transforms=InsertKey,extractKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=after.id
transforms.extractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractKey.field=after.id
Reply all
Reply to author
Forward
0 new messages