Kafka - Sink Issue

335 views
Skip to first unread message

ASIF DBA

unread,
Oct 10, 2018, 7:58:26 AM10/10/18
to confluent...@googlegroups.com
Dear All,


Oracle Database (SOURCE) > KAFKA > Oracle Database (TARGET)

We are able to get the data from the source database (oracle) into KAFKA, but its not getting replicated into Target (oracle) database from KAFKA.


Can you please advice what is the issue here ? Any changes to be made ?



[2018-10-10 16:01:57,126] ERROR WorkerSinkTask{id=ora_sink_cdc12-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.

Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table 'CDCTAB2' is RECORD_VALUE witth configured PK fields [ID], but record value schema does not contain field: ID
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extractRecordValuePk(FieldsMetadata.java:251)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:102)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:79)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)


SINK Connector File :
[root@ebsnew kafka-connect-jdbc]# cat sink-oracle-cdc1.properties
name=ora_sink_cdc12
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=CDCTESTAB
connection.url=jdbc:oracle:thin:@hostname:1521/EBSNEW
connection.user=kfktgt
connection.password=target123
table.name.format=CDCTAB2
auto.create=true
insert.mode=upsert
pk.mode=record_value
pk.fields=ID
fields.whitelist=ID,CITY,NATIONALITY


Regards,
Asif

ASIF DBA

unread,
Oct 10, 2018, 1:04:22 PM10/10/18
to confluent...@googlegroups.com
Dear All, 

Any help on this issue would be greatly appreciated

Thanks in advance

Regards
Asif. 

Somasundaram Sekar

unread,
Oct 10, 2018, 1:10:54 PM10/10/18
to confluent...@googlegroups.com
Sample data will be useful

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAAS4VYjj%3Dsbx7Yh9Fs7BYaWmZMeryC85YxSwWsO9%3DFV%2BmHQhUQ%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

ASIF DBA

unread,
Oct 10, 2018, 1:14:34 PM10/10/18
to confluent...@googlegroups.com
Hello,

As requested data from the source table has been shown in the attached file for your reference.

Regards.

Data - Rows.PNG

Somasundaram Sekar

unread,
Oct 10, 2018, 1:20:50 PM10/10/18
to confluent...@googlegroups.com
Can you include the Kafka record as well


ASIF DBA

unread,
Oct 10, 2018, 1:44:42 PM10/10/18
to confluent...@googlegroups.com
Please find the requested information below :


[root@ebsnew bin]# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic CDCTESTAB --from-beginning
{"SCN":12206116841348,"SEG_OWNER":"KFKUSER","TABLE_NAME":"CDCTAB2","TIMESTAMP":1539162785000,"SQL_REDO":"insert into \"KFKUSER\".\"CDCTAB2\"(\"ID\",\"CITY\",\"NATIONALITY\") values (634789,'AHMEDABAD','INDIA')","OPERATION":"INSERT","data":{"value":{"ID":634789,"CITY":{"string":"AHMEDABAD"},"NATIONALITY":{"string":"INDIA"}}},"before":null}
{"SCN":12206116841465,"SEG_OWNER":"KFKUSER","TABLE_NAME":"CDCTAB2","TIMESTAMP":1539162844000,"SQL_REDO":"insert into \"KFKUSER\".\"CDCTAB2\"(\"ID\",\"CITY\",\"NATIONALITY\") values (63489,'DELHI','INDIA')","OPERATION":"INSERT","data":{"value":{"ID":63489,"CITY":{"string":"DELHI"},"NATIONALITY":{"string":"INDIA"}}},"before":null}
{"SCN":12206116842718,"SEG_OWNER":"KFKUSER","TABLE_NAME":"CDCTAB2","TIMESTAMP":1539163441000,"SQL_REDO":"insert into \"KFKUSER\".\"CDCTAB2\"(\"ID\",\"CITY\",\"NATIONALITY\") values (60009,'CHENNAI','INDIA')","OPERATION":"INSERT","data":{"value":{"ID":60009,"CITY":{"string":"CHENNAI"},"NATIONALITY":{"string":"INDIA"}}},"before":null}
{"SCN":12206116897232,"SEG_OWNER":"KFKUSER","TABLE_NAME":"CDCTAB2","TIMESTAMP":1539193338000,"SQL_REDO":"insert into \"KFKUSER\".\"CDCTAB2\"(\"ID\",\"CITY\",\"NATIONALITY\") values (6077009,'MUMBAI','INDIA')","OPERATION":"INSERT","data":{"value":{"ID":6077009,"CITY":{"string":"MUMBAI"},"NATIONALITY":{"string":"INDIA"}}},"before":null}

Somasundaram Sekar

unread,
Oct 10, 2018, 1:46:37 PM10/10/18
to confluent...@googlegroups.com
This is the record value, as it is a nested structure you need to find a way to point to ID in pk.fields

ASIF DBA

unread,
Oct 10, 2018, 1:46:40 PM10/10/18
to confluent...@googlegroups.com
Somasundaram,

Hope the below info helps

[root@ebsnew bin]# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic CDCTESTAB --from-beginning
{"SCN":12206116841348,"SEG_OWNER":"KFKUSER","TABLE_NAME":"CDCTAB2","TIMESTAMP":1539162785000,"SQL_REDO":"insert into \"KFKUSER\".\"CDCTAB2\"(\"ID\",\"CITY\",\"NATIONALITY\") values (634789,'AHMEDABAD','INDIA')","OPERATION":"INSERT","data":{"value":{"ID":634789,"CITY":{"string":"AHMEDABAD"},"NATIONALITY":{"string":"INDIA"}}},"before":null}
{"SCN":12206116841465,"SEG_OWNER":"KFKUSER","TABLE_NAME":"CDCTAB2","TIMESTAMP":1539162844000,"SQL_REDO":"insert into \"KFKUSER\".\"CDCTAB2\"(\"ID\",\"CITY\",\"NATIONALITY\") values (63489,'DELHI','INDIA')","OPERATION":"INSERT","data":{"value":{"ID":63489,"CITY":{"string":"DELHI"},"NATIONALITY":{"string":"INDIA"}}},"before":null}
{"SCN":12206116842718,"SEG_OWNER":"KFKUSER","TABLE_NAME":"CDCTAB2","TIMESTAMP":1539163441000,"SQL_REDO":"insert into \"KFKUSER\".\"CDCTAB2\"(\"ID\",\"CITY\",\"NATIONALITY\") values (60009,'CHENNAI','INDIA')","OPERATION":"INSERT","data":{"value":{"ID":60009,"CITY":{"string":"CHENNAI"},"NATIONALITY":{"string":"INDIA"}}},"before":null}
{"SCN":12206116897232,"SEG_OWNER":"KFKUSER","TABLE_NAME":"CDCTAB2","TIMESTAMP":1539193338000,"SQL_REDO":"insert into \"KFKUSER\".\"CDCTAB2\"(\"ID\",\"CITY\",\"NATIONALITY\") values (6077009,'MUMBAI','INDIA')","OPERATION":"INSERT","data":{"value":{"ID":6077009,"CITY":{"string":"MUMBAI"},"NATIONALITY":{"string":"INDIA"}}},"before":null}

ASIF DBA

unread,
Oct 10, 2018, 1:50:28 PM10/10/18
to confluent...@googlegroups.com
Thanks Somasundram for your prompt response.

I am using the below values in my sink connector . Do I need to modify anything else to fix this problem? Pls advice.
what change has to be made?

insert.mode=upsert
pk.mode=record_value
pk.fields=ID



Robin Moffatt

unread,
Oct 11, 2018, 7:05:37 AM10/11/18
to confluent...@googlegroups.com
The problem is that your message body is nested:

{

  "SCN": 12206116841348,
  "SEG_OWNER": "KFKUSER",
  "TABLE_NAME": "CDCTAB2",
  "TIMESTAMP": 1539162785000,
  "SQL_REDO": "insert into \"KFKUSER\".\"CDCTAB2\"(\"ID\",\"CITY\",\"NATIONALITY\") values (634789,'AHMEDABAD','INDIA')",
  "OPERATION": "INSERT",
  "data": {
    "value": {
      "ID": 634789,
      "CITY": {
        "string": "AHMEDABAD"
      },
      "NATIONALITY": {
        "string": "INDIA"
      }
    }
  },
  "before": null
}

The ID column that you're trying to use is actually data.value.ID.

AFAIK the sink connector cannot use nested values for the PK column.

It's not clear what CDC tool you're using, but one option is to get the CDC tool to just emit a flat record. The other is to post-process the message in the Kafka topic to flatten it/extract the fields. You could use KSQL or Kafka Streams to do this, for example. That then would write to a new Kafka topic, which would be the source for your Sink connector.



-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff



ASIF DBA

unread,
Oct 11, 2018, 8:21:42 AM10/11/18
to confluent...@googlegroups.com
Robin,

Thanks a bunch for your prompt response.

Can you please throw some light on the usage of KSQL or Kafka Stream, probably an example that you have done would be greatly appreciated.

Pls give us one sample how to convert this nested loop using KSQL of Kafka Stream to the desired format.

Thanks in advance.

Thanks
Asif.



Robin Moffatt

unread,
Oct 11, 2018, 12:15:04 PM10/11/18
to confluent...@googlegroups.com


-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff


ASIF DBA

unread,
Oct 11, 2018, 2:41:07 PM10/11/18
to confluent...@googlegroups.com
Thanks Robin. 
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CA%2BJsER3QLghksx87kCRF8eJ3XWixBp1_GvPNzsr3c%3DFM1vzrXw%40mail.gmail.com.

ASIF DBA

unread,
Oct 12, 2018, 8:14:46 AM10/12/18
to confluent...@googlegroups.com
Robert,

Thanks for your kind response. 

As advised by you, we registered the topic in KSQL & also created a flattened topic.

Below is the error from the KSQL console irrespective of whether VALUE_FORMAT='AVRO' or VALUE_FORMAT='JSON'

Kindly help further to fix this issue. Thanks in advance.

=> When the stream is created with VALUE_FORMAT='AVRO'
-------------------------------------------------------
-------------------------------------------------------

[root@ebsnew bin]# ./connect-standalone /u01/kafka/confluent-5.0.0/etc/schema-registry/connect-avro-standalone2.properties /u01/kafka/confluent-5.0.0/etc/kafka-connect-jdbc/sink-oracle-UAT-cdc3.properties

Issue : Records are getting inserted into the target table, but values for all columns are (null)



=> When the stream is created with VALUE_FORMAT='JSON'
-------------------------------------------------------
-------------------------------------------------------

[root@ebsnew bin]# ./connect-standalone /u01/kafka/confluent-5.0.0/etc/schema-registry/connect-avro-standalone2.properties /u01/kafka/confluent-5.0.0/etc/kafka-connect-jdbc/sink-oracle-UAT-cdc2.properties


Issue : Even NULL records are NOT getting populated in the target table. 


Error : Below error is the same for both options mentioned above (From KSQL console)

[2018-10-12 16:06:46,117] WARN task [0_0] Skipping record due to deserialization error. topic=[CDCTESTAB] partition=[0] offset=[13] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: CDCTESTAB
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x91f3a5b4 (above 0x0010ffff) at char #1, byte #7)
        at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
        at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:243)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2331)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:646)
        at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
        at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:88)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:77)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:45)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)



Regards,
Asif.

ASIF DBA

unread,
Oct 14, 2018, 3:56:35 AM10/14/18
to confluent...@googlegroups.com
Robin/Team,

Any help on the below mail would be much appreciated.

Regards
Asif.

nirmal kumar/my. mm

unread,
Oct 15, 2018, 6:59:25 AM10/15/18
to confluent...@googlegroups.com
Robin/Team

Any update in this regard would be much appreciated, since we are terribly struck in this?

Regards,
Nirmal

ASIF DBA

unread,
Oct 16, 2018, 11:09:04 AM10/16/18
to confluent...@googlegroups.com
Robert,

Any help from your end on this issue would be helpful.

As advised we have already flattened the data using KSQL but still we are stuck..
Thanks in advance.

Regards
Asif.

Reply all
Reply to author
Forward
0 new messages