Oracle 11g Sink connector Issue

372 views
Skip to first unread message

Gaurav Khare

unread,
Jun 12, 2018, 6:20:26 AM6/12/18
to debe...@googlegroups.com
Hi

I am using Debezium to transfer data from Postgres to Kafka to Oracle.
I am receiving events into kafka topics for all delete , update and insert operation.

I am using the confluent-3.3.0, oracle 11g and ojdbc6.jar .

with  "insert.mode" as "upsert", both insert and update are not working.
and with "insert.mode" as "insert", only  insert is working . As update topic recieve update event , it prints below error.

---------------
java.sql.BatchUpdateException: ORA-00001: unique constraint (OWB.SYS_C0094697) violated

        at oracle.jdbc.driver.DatabaseError.throwBatchUpdateException(DatabaseError.java:629)
        at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9409)
        at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:211)
        at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:101)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:79)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
[2018-06-12 09:53:09,220] ERROR Task oracle-sink-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: java.sql.BatchUpdateException: ORA-00001: unique constraint (OWB.SYS_C0094697) violated
----------------------------------------
Below is the configuration for oracle sink connector  

curl -X POST \
  -H 'accept: application/json' \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/json' \
  -d '
{
  "name": "oracle-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "PAYMENT",
    "connection.url": "xxx",
    "connection.user": "xxx",
    "connection.password": "xxx",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "insert.mode": "upsert",                                                 
    "pk.fields": "id",                                                       
    "pk.mode": "record_value",
    "max.retries":3,
    "batch.size":1
  }
}
'


-----------------

Although , when I was using the oracle 12g, ojdbc7.jar and confulent-4.1.0 , Both insert and update were working fine.

Any suggestion ?

--
Thanks
Gaurav Khare

Gunnar Morling

unread,
Jun 12, 2018, 6:41:15 AM6/12/18
to debezium
> with  "insert.mode" as "upsert", both insert and update are not working.

What does "not work" mean here, do you also get that unique key violation? What's the definition of that constraint (which columns)? Is it something you created? I can't think of a good reason why the upsert mode should fail, unless there's another unique constraint (i.e. not on the PK columns).

Also I'm curious why you are going down from Confluent Platform 4.1 to 3.3? Perhaps this all is related to a bug that was fixed in 4.1?

--Gunnar

Gaurav Khare

unread,
Jun 12, 2018, 9:01:38 AM6/12/18
to debe...@googlegroups.com
Hi Gunnar 

1. 
When I put "insert.mode" as upsert ,  records does not insert into oracle Database  and there is no error in the logs.
----------
Logs

[2018-06-12 12:56:44,120] INFO UPSERT records:1 , but no count of the number of rows it affected is available (io.confluent.connect.jdbc.sink.BufferedRecords:119)
[2018-06-12 12:56:45,541] INFO Finished WorkerSourceTask{id=postgres-source-connector-0} commitOffsets successfully in 3 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:373)
[2018-06-12 12:56:50,914] INFO WorkerSinkTask{id=oracle-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)

--------------

2. In our Production server , we are using 3.3 version of confluent. So I  am using 3.3 version for testing.
 

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/45c9b3fc-4873-425a-9f6f-09280b64f4b5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Gunnar Morling

unread,
Jun 12, 2018, 9:23:01 AM6/12/18
to debe...@googlegroups.com
> "no count of the number of rows it affected is available"

That seems suspicious. Can you share the definition of the table in question, esp. its PK?

All in all though (and given that it works with 4.1) I think you're better off asking this question in the Confluent community, as it really seems to be an issue with the JDBC sink connector.

Gaurav Khare

unread,
Jun 13, 2018, 3:03:21 AM6/13/18
to debe...@googlegroups.com

Hi Gunnar 

I got the issue. Issue was on the Oracle side that it was able to run the merge command.
Thanks a lot for your time.

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages