Performance Issue with Jdbc source connector

1,505 views
Skip to first unread message

Santhosh Kumar

unread,
Oct 18, 2019, 11:35:13 AM10/18/19
to Confluent Platform
Hi ,

we are using confluent jdbc connector to move the data from oracle to Cassandra which are almost near to 100 tables , some of them are having 100 million records. In order to do that we are having like 7 kafka brokers running.

So problem here is , to complete this process it is taking 2-3 days and some times we see data missing also.

Please find below source connector configuration:

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "MODIFIED",
"connection.password": "********",
"transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"tasks.max": "54",
"table.whitelist": "54 table list",
"mode": "timestamp",
"topic.prefix": "Test.",
"poll.interval.ms": "300000",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"validate.non.null": "false",
"batch.max.rows": "100",
"timestamp.delay.interval.ms": "5000",
"value.converter.schema.registry.url": "localregURL",
"connection.user": "***********",
"numeric.mapping": "best_fit",
"connection.url": "jdbc:oracle:thin:@*******************",
"key.converter.schema.registry.url": "localregURL"
}

Note : For 54 tables we have 54 topics with each consists of 1 partition.

Please can anyone suggest what we are missing, we are running out of time . Any help greatly appreciated!

Mich Talebzadeh

unread,
Oct 18, 2019, 2:21:40 PM10/18/19
to confluent...@googlegroups.com
When you are connecting to Oracle source what syntax you are using for JDBC connection. You can easily find out what is going on at source by looking at Oracle connections. Are these 54 Oracle tables have primary key and are you reading data from the same source Oracle instance for all?

I have not used jdbc connecter for Kafka but are you using partitioning as part of load? This is an example for reading Oracle in spark with JDBC

val s = HiveContext.read.format("jdbc").options(
       Map("url" -> _ORACLEserver,
       "dbtable" -> "(SELECT column1, column2, columnN FROM oracle.table",
       "partitionColumn" -> "ID",
       "lowerBound" -> minID,
       "upperBound" -> maxID,
       "numPartitions" -> "2",
       "user" -> _username,
       "password" -> _password)).load

HTH

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5b20a635-8866-4437-9f32-f05b3d85ea4e%40googlegroups.com.

BadwolF ForeveR

unread,
Oct 18, 2019, 5:26:13 PM10/18/19
to Confluent Platform
Hi Santhosh,

I am assuming you are doing historical load of all those tables. Let me first start with Kafka then we will move to Kafka Connect.

In order to increase through put to/from Kafka you need to have as much parallelism (read memory, CPU and bandwidth) that your producer nodes, broker nodes and consumer nodes can handle. Having said that rather than having 1 partition per topic have many partitions. If all the tables have almost 100 million records and you wish to continue to use the same topics for delta changes as well after this load then I would suggest atleast go between 7-14 partitions (why 7 or 14? so that the number of partitions are equally distributed across the broker nodes and thus the amount of data is also distributed properly). So try to increase the partitions in multiples of number of brokers. Remember this is not necessary and is not required by Kafka. You can have any number of partitions on any number of brokers. This approach keeps the data fairly distributed and you do not have to worry about one node's disk getting full while other nodes still have disk space left.

Also, you have not mentioned what you are using to push data to Cassandra. However, with more number of partitions whether you use Cassandra Sink Connector or Spark or any other distributed framework it would be performant. However, in case you are using Cassandra Sink Connector then make sure to keep number of tasks equal to number of partitions of the topic to achieve maximum possible throughput.

Now coming to Kafka JDBC Source Connector. For this there are a few important thing to understand.

1. poll.interval.ms - This is the time after which Kafka JDBC connector will make a fresh call to the table to fetch fresh data based on last saved offset.
2. batch.max.rows - This is the max number of records Connect will request to DB from the "fresh call" made at every "poll.interval.ms" to be fetched within that interval and saves the offset once that batch of records is committed to Kafka topic.

Now take a look at what happens on oracle side in sequence (considering getting data from one table having 100 million records; for 54 multiple the same logic by 54). Note that there are other steps that happen in between which I am omitting which does not relate to data fetch.

1. Connector starts for the first time and checks for last saved offsets which is not there so it makes the start time as "0" (Epoch).
2. Connector make another call to DB "select current timestamp from dual" to get the end time (which will be current time)
3. Connector make a query using the name provided in config and the start time and end time (the query changes based on mode and fields configured; the below query is for timestamp mode): 
    select * from TABLE where timestamp_field_from_config > start_time_from_step_1 and timestamp_field_from_config < end_time_from_step_2 order by timestamp_field_from_config asc

Now this happens at every "poll.interval.ms". So now you can see what problems can come if your poll interval is small and the data to fetch is huge. The job may fail and one poll is already running and you are initiating another poll or there will be duplicates as the offsets gets store after every "batch.max.rows" records are committed to Kafka.

Now what happens on DB side (specifically ORACLE):

After step 3 above, oracle reads all the data based on that query (100 million records) into TEMP space (which may take long time depending upon hardware configuration of ORACLE but for 100 million row it will be well above or close to 5 minutes which is the poll.interval.ms you are configuring). Check with your DBAs and see how many sessions are open on ORACLE side and you will get the idea.

Having said all that what is needed on Kafka Connect side:

1. For fetching huge amount of data; the poll.interval.ms must be long (say 1 day i.e. 86400000 ms)
2. Keep the "batch.max.rows" on higher side say 100000 (depending on how much hardware is on ORACLE, Connect and Kafka brokers) and how much is the connectivity bandwidth between database server and Connect cluster. Since you are running multiple jobs and each job if going to get 100000 records then at one instant you may be looking at 5.4 million rows over network.

Enough explanations. Below is the updated configurations that you can use to increase the performance and get the data faster to Kafka and to Cassandra eventually. BUT REMEMBER Kafka Connect is not a framework for large bulk loads but for small or medium sized incremental loads, but it can do the job (has done for us we have bulk loaded tables with 100 million records).

{
 
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
 
"timestamp.column.name": "MODIFIED",
 
"connection.password": "********",
 
"transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
 
"tasks.max": "54",
 
"table.whitelist": "54 table list",
 
"mode": "timestamp",
 
"topic.prefix": "Test.",

 
"poll.interval.ms": "86400000",

 
"value.converter": "io.confluent.connect.avro.AvroConverter",
 
"key.converter": "io.confluent.connect.avro.AvroConverter",
 
"validate.non.null": "false",

 
"batch.max.rows": "100000",
 
"timestamp.delay.interval.ms": "0",

 
"value.converter.schema.registry.url": "localregURL",
 
"connection.user": "***********",
 
"numeric.mapping": "best_fit",
 
"connection.url": "jdbc:oracle:thin:@*******************",
 
"key.converter.schema.registry.url": "localregURL"
}

Hope this helps to solve your issues.

BadwolF ForeveR

unread,
Oct 18, 2019, 5:34:02 PM10/18/19
to confluent...@googlegroups.com
Another thing, once the historical load is complete just change the configuration of hence connect job to lower poll.interval.ms and batch.max.rows and you will be able to use the same connector to fetch delta data also from the same set of tables.

Sent from my iPhone

On Oct 18, 2019, at 17:26, BadwolF ForeveR <alias....@gmail.com> wrote:


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

Santhosh Kumar

unread,
Oct 22, 2019, 3:30:25 AM10/22/19
to Confluent Platform
Hi ,

First I would like to say thank you very much for quick response.

We have tried the approaches you have given for 'One' table to start analysing its performance. One thing is sure that after increasing the partitions and consumers it is quick enough to dump the data from Topic to Cassandra(Cassandra Sink Connector). But here the actual problem we see is data transfer from oracle to Topic.

For an example , we have the data in input table with around 40Million (40000000) records for which it has taken 4 to 5 hrs to dump the data from Oracle to Topic by using the same configuration you have given.We tried by increasing the 'tasks.max' to 6 for One table which didn't help , even though we tried to create, but the status of the connector is showing only 1 task running.

One more issue is, sometimes we see random data missing in Topics and because of that it is getting difficult to rely on kafka jdbc connector . Is there any thing we can do to avoid data mismatch.

 Your response really helps.


Regards,
Santhosh


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

BadwolF ForeveR

unread,
Oct 22, 2019, 8:33:55 AM10/22/19
to confluent...@googlegroups.com
Glad to help. Regarding tasks.max, Kafka JDBC connect uses ONLY one task per table, you cannot make it to work with multiple task per table. So even if you specify tasks.max to 6 it will still only run one task per table.

Regarding missing data, I have not encountered missing data but I have seen duplicate data which is understandable and we have handled that. However, there is one scenario when using timestamp mode where certain rows can get missed in an extreme case. Wen you look at the logs you would see the query that is fired for timestamp mode is like this:

Select + from table where timestamp_col > start_time and timestamp_col < end_time order by timestamp_col asc

So with this query, if there are multiple rows that have the same timestamp values for timestamp_col and somehow one batch.max.rows got only only 1 or 2 of those values and somehow the job restarted then the query would not pick the rest of the records for the last commuted timestamp. It’s an extreme case but can happen.

If you have an incrementing numeric column in your table then it’s better to use timestamp+incrementing mode. If all 54 tables that you are trying to fetch does not have the same incrementing column then you can run multiple connectors or can group few tables that have the same incrementing column.

Regarding running parallel tasks for getting data from tables Kafka connect would not be able to do that. You may want to use Spark or Sqoop for the same but those would need incrementing column as well timestamp column in table to be able to do that.

One other approach could be run a connector with all 54 tables in bulk mode with the same configuration mentioned earlier. Bulk loads for large data sets is more performant on oracle side as the order by clause is removed from the query and it may resolve the issue of missing sata as it will not have the where clause that may cause skipping of some rows. However, bulk will always get all the data so is not advisable for delta changes.

Hope it helps.

On Oct 22, 2019, at 03:30, Santhosh Kumar <ksantho...@gmail.com> wrote:


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/8ad21af5-b39c-408e-80d7-ed1e407656a4%40googlegroups.com.

Santhosh Kumar

unread,
Oct 28, 2019, 1:56:09 PM10/28/19
to confluent...@googlegroups.com
Hi bro,

This is regarding missing records, basically we don't have increament column to try with timestamp+increamental mechanism.we tried to introduce that incremental column in our DB but as per our design it is quite impossible to add that. Is there any other way to avoid this missing records. We are still facing the issue when there are huge records (like 10millions). For less records it is working fine.

Configuration we are using as below:

poll.interval.ms : 600000(10min)
Batch.max.rows: 1000

As we are having huge delta changes trying to use above configuration.

Could please suggest if any work around or any configuration changes.


Thanks in advance.


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5635a638-a399-4d1b-9135-d335aee691f0%40googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/8ad21af5-b39c-408e-80d7-ed1e407656a4%40googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

BadwolF ForeveR

unread,
Oct 28, 2019, 3:27:28 PM10/28/19
to confluent...@googlegroups.com
For missing records, can you check in the source table that whether the records missing has exact same value (till millisecond level) for the column that you are using for the timestamp mode? for example, if you are using modified_dt as you timestamp column, are there multiple records/rows in the source table that has exact same modified_dt?

Also, for huge historical load, I would recommend using higher value for poll.interval.ms, say 86400000, (I believe I explained the reasons in my earlier reply) and also increase the batch.max.rows to a higher value like 50000 or 100K (so that there are less iterations to get, say 10 million, records).

You can keep timestamp.delay.interval.ms to 30 secs or can increase/decrease it based on the time taken by transaction commits on source system.




--
Cheers,

Vinay Vyas

Julianos Andrea

unread,
Sep 3, 2020, 4:40:08 AM9/3/20
to Confluent Platform
Hi, as your say,  Kafka Connect is not a framework for large bulk loads but for small or medium sized incremental loads, 
if we need for large bulk loads for kafka, where framework we could use?
Thanks, have a good day! 

Zia Uddin

unread,
Sep 3, 2020, 7:27:48 AM9/3/20
to confluent...@googlegroups.com
Hi ,

thats not true. we have been using confluent jdbc source connector and corresponding datastax cassandra connector  to sink data in cloud and i found it is able to migrate millions of records with sufficient json fields in the same AZ from on premise to cloud within an hour

regards,
Zia

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

Julianos Andrea

unread,
Sep 3, 2020, 7:55:42 AM9/3/20
to Confluent Platform
Hi Zia, 
Thanks for your reply!
I want to know how many records/sec in your  confluent jdbc source connector  and sink ,  For the  records/sec , I looked through JMX, And it is dynamic.  I can't find any benchmarks about throughput in confluent.io and other website. 
For improve throughput, It looks like I can only debug it over and over again on the configuration. But I can not find a target value about records/sec 

Regards, 
Andrea 
Reply all
Reply to author
Forward
0 new messages