Kafka connect SMT

182 views
Skip to first unread message

satyaji...@gmail.com

unread,
Aug 8, 2017, 3:22:04 PM8/8/17
to Confluent Platform
Hi All,

I am trying to write my own custom SMT, and trying to remote debug from intellij.
Before i create my custom SMT, i am trying to use the existing Timestamp converter SMT provided with kafka-connect and try to remote debug.

PFB pom and i am using confluent version of kafka,

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>*****</groupId>
<artifactId>******</artifactId>
<version>1.0-SNAPSHOT</version>

<description>
A Kafka Connect Transformer /(SMT) to convert all datetime column values into Timestamp format.
</description>

<properties>
<confluent.version>4.0.0-SNAPSHOT</confluent.version>
<kafka.version>0.11.0.0</kafka.version>
<junit.version>4.12</junit.version>
<hadoop.version>2.7.3</hadoop.version>
<hive.version>1.2.1</hive.version>
<avro.version>1.8.2</avro.version>
<parquet.version>1.7.0</parquet.version>
<commons-io.version>2.4</commons-io.version>
<joda.version>2.9.7</joda.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>


<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

</dependencies>


</project>


So when i try to build the jar file and deploy it by creating a classpath variable for the path of the directory containing the jar, and try to run connect in distributed mode with 
specific transform parameters in connect-distributed.properties. i get the below error,

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.location = null

ssl.truststore.password = null

ssl.truststore.type = JKS

status.storage.topic = connect-status

task.shutdown.graceful.timeout.ms = 5000

value.converter = class org.apache.kafka.connect.json.JsonConverter

worker.sync.timeout.ms = 3000

worker.unsync.backoff.ms = 300000

 (org.apache.kafka.connect.runtime.distributed.DistributedConfig:223)

[2017-08-08 12:11:16,053] INFO Logging initialized @7734ms (org.eclipse.jetty.util.log:186)

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:128)

at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:165)

at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:145)

at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)

Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.Metadata.<init>(JJ)V

at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:90)

	... 3 more

I suspect, it might be due to the version conflict issue (provided in the pom above) with the confluent platform.

OR 

the classpath issue,

Can someone please help me out with the situation.
aAny help would be appreciated.

Regards.

Reply all
Reply to author
Forward
0 new messages