Issue with io.debezium.transforms.UnwrapFromEnvelope

4,502 views
Skip to first unread message

Vincent

unread,
Nov 26, 2017, 11:52:52 AM11/26/17
to debezium
Hi, 

I currently try to sink my postgres db to an elasticseach using Debezium and Confluent Kafka connect and I have an issue with the connector (sink) configuration. 

When I put the UnwrapFromEnvelope transform to the sink configuration I have the following error: 

value io.debezium.transforms.UnwrapFromEnvelope for configuration transforms.custom.type: Class io.debezium.transforms.UnwrapFromEnvelope could 
not be found.

But it's work when I move the transform to the PostgresConnector configuration. 
I don't understand why and I need to keep the full message in my Kafka topic.

(For information I used a custom docker-compose.yml based on confluent. 
But I also try with the default Confluent Dockerfile + loading Debezium jars, and the one provided by Debezium).


Thank you for your response. 


curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "database-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "database",
"database.server.name": "test",
"table.whitelist": "public.film"
}
}'


curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "dvdrental-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "test.public.film",
"type.name": "test_type",
"tasks.max": "1",
"key.ignore": "false",
"schema.ignore": "true",
"batch.size": 5,
"transforms": "unwrap,ExtractId",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractId.field": "id"
}
}'

Gunnar Morling

unread,
Nov 27, 2017, 3:48:16 AM11/27/17
to debezium
Hi Vincent,

Are your source and sink connectors deployed to the same Kafka Connect cluster? If not, you should make sure that one of the Debezium connectors is deployed to the sink cluster, too, so to make the unwrap SMT available there.

Also, if you're working with multiple workers in a single cluster, you should make sure that the Debezium connectors are deployed to all worker nodes.

You can find a complete example which uses the unwrap SMT on the sink side here:


This is using the Confluent JDBC sink connector, but things should be very similar when using the ES connector.

--Gunnar

Vincent

unread,
Nov 27, 2017, 6:23:04 AM11/27/17
to debezium
Hi,

Thank for your response.

I have currently one docker with Kafka Connect and one with Kafka. I install the Debezium connector on Connect like your example. I also check the logs:
connect_1          | 2017-11-27 11:15:25,229 INFO   ||  Added plugin 'io.debezium.transforms.UnwrapFromEnvelope'   [org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader]


I also try your Docker image for connect like this: 

FROM debezium/connect:0.6

ADD ./custom/pleiads.kefta.pg.events-1.0.jar $KAFKA_CONNECT_PLUGINS_DIR

RUN cd $KAFKA_HOME/libs &&\
function confluent_dep { curl -O http://packages.confluent.io/maven/io/confluent/$1/$CONFLUENT_VERSION/$1-$CONFLUENT_VERSION.jar; };\
confluent_dep kafka-connect-elasticsearch


Same result i can't use the transform on sink connector. 

Gunnar Morling

unread,
Nov 27, 2017, 6:30:12 AM11/27/17
to debezium
Could you take the example I gave as a starting point and replace the JDBC sink connector with the ES one?

Jiri Pechanec

unread,
Nov 27, 2017, 6:31:39 AM11/27/17
to debezium
Hi,

please do not forget that there were chnges in classpath handling by Kafka Connect. Could you please try to place a copy of debezium-core JAR into the same directory where your sink connector resides?

Thanks

J.

Vincent

unread,
Nov 27, 2017, 7:12:44 AM11/27/17
to debezium
Done same issue.

Vincent

unread,
Nov 27, 2017, 7:12:58 AM11/27/17
to debezium
I put debezium-core JAR into the sink connector folder (/usr/share/java/kafka-connect-elasticsearch/) 
It fixed my issue, 

Thanks you !!

Gunnar Morling

unread,
Nov 28, 2017, 5:20:48 AM11/28/17
to debezium
Interesting, actually this shouldn't be necessary. At least if the directory with the ES connector is part of the plugin path. In that case the SMT will be obtained from one of the DBZ plug-ins (if they are also on the plugin path).

I've adapted the existing SMT example to use the ES sink connector and pushed it to my fork:


This loads the Unwrap SMT successfully. Would be interesting to see how that's different from your set-up and why that didn't work. In any case we should add this to the examples repo.

--Gunnar

Vincent

unread,
Dec 5, 2017, 8:34:35 AM12/5/17
to debezium
My current setup for Kafka Connect (working)
The ES connector is installed by Confluentinc it should be part of the classpath.

ES Connector path: /usr/share/java/kafka-connect-elasticsearch/ 
My plugin path:  /usr/share/java/kafka-plugins/


Dockerfile : 

FROM confluentinc/cp-kafka-connect:3.3.1

RUN mkdir -p /usr/share/java/kafka-plugins/
ENV CONNECT_PLUGIN_PATH=/usr/share/java/kafka-plugins/

RUN curl -fSL -o /tmp/plugin.tar.gz \
https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/0.6.2/debezium-connector-postgres-0.6.2-plugin.tar.gz &&\
tar -xzf /tmp/plugin.tar.gz -C $CONNECT_PLUGIN_PATH &&\
rm -f /tmp/plugin.tar.gz;

# Mandatory to allow Debezium transforms with Elastic sink connector
RUN cp $CONNECT_PLUGIN_PATH/debezium-connector-postgres/*.jar /usr/share/java/kafka-connect-elasticsearch/
Reply all
Reply to author
Forward
0 new messages