kafka-connect-jdbc-sink BatchUpdateException when upgrading to kafka-stream 0.10.1.1

610 views
Skip to first unread message

nicolas....@finn.no

unread,
Jan 9, 2017, 3:15:10 AM1/9/17
to Confluent Platform
Hello,

I encounter problems with the JDBC sink when using kafka-streams 0.10.1.1 to produce some aggregate which used to work without any problem in 0.10.0.1.

I have a kafka-streams app that aggregates some data out of several kafka-topics and publishes it onto a new topic. Then, I have configured a kafka-connect-jdbc-sink to upsert that stream of aggregates into a Postgres db.

In 0.10.0.1 my kafka-streams code looked like this:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperServers);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Object> source = builder.stream(/* list of topics omitted for brevity */);

KTable<String, Long> counts = source
.flatMap(toAdIdAndTimeBucketKeys()) // <= one key in the topic results in 4 new keys with the same value (4 different time buckets: year, month, day, all)
.through("Private.statistics.rekeyed-page-views")
.countByKey("page-views");

counts.toStream()
.map(PageViewsAggregator::toAvro)
.to(avroKeySerde, avroValueSerde, "Private.statistics.page-views");

streams = new KafkaStreams(builder, props);

/* error handling and start the stream */

In 0.10.1.1 my kafka-streams code looks like this:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperServers);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Object> source = builder.stream(/* list of topics ommitted for brevity */);

KTable<String, Long> counts = source
.flatMap(toAdIdAndTimeBucketKeys()) // <= one key in the topic results in 4 new keys with the same value (4 different time buckets: year, month, day, all)
    // .through("Private.statistics.rekeyed-page-views") <= tried also with this one in case it could mess up with message ordering but did not prevent sink from crashing
.groupByKey()
.count("page-views");

counts.toStream()
.map(PageViewsAggregator::toAvro)
.to(avroKeySerde, avroValueSerde, "Private.statistics.page-views");

streams = new KafkaStreams(builder, props);

/* error handling and start the stream */

The sink configuration remains the same, however the 0.10.1.1 code bit crashes my sink with the following exception (each and every time, even after reset of the app, deletion of topic, wipe of the database):

[2017-01-06 15:26:53,812] WARN Write of 500 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask:68)

java.sql.BatchUpdateException: Batch entry 9 INSERT INTO "Private.statistics.page-views" ("adid","interval","type","count") VALUES ('12345678','MONTH_201701','sometype',1) ON CONFLICT ("adid","interval","type") DO UPDATE SET "count"=EXCLUDED."count" was aborted.  Call getNextException to see the cause.

at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2778)

at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1912)

at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:338)

at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2959)

at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:98)

at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:77)

at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)



I also tried to decrease batch.size=10 of the sink to see if it influenced the behavior, but it looks like it is ignored (ref the log message Write of 500 records failed )

What am I doing wrong?


Thanks in advance.
Best regards
Nicolas


Ewen Cheslack-Postava

unread,
Jan 9, 2017, 3:40:05 AM1/9/17
to Confluent Platform
The batch.size thing is probably unrelated -- the 500 you are seeing is 500 messages being passed from the Connect framework to the connector, whereas batch.size is a connector configuration which adjusts how large the subsets of those 500 messages are used to send a single batch write via SQL transactions. If you wanted to force the framework to also reduce it's batch size, you could set consumer.max.poll.records=10 in your worker configurations, but note that it can also hurt performance if you set it too low.

If the output of the streams job is in the same exact format, it would be surprising if the Connect sink suddenly failed because of these changes unless there was some change to the outputs that could cause problems. Without the underlying exception (which is masked by the BatchUpdateException -- it'd be nice for the JDBC sink to be able to extract that), it'll be tougher to diagnose. Have you tried manually executing the statement after it fails to see what the error is?

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/028ee14b-2a60-4d93-9a13-0a0e1c4d9790%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

nicolas....@finn.no

unread,
Jan 9, 2017, 4:30:29 AM1/9/17
to Confluent Platform
Understood. The output should be exactly the same format (they're avro, both key and value and haven't changed). I also inspected the table structure that has been generated based on the key, and I get the exact same structure both form the 0.10.0.1 and 0.10.1.1 topics.

I did try executing the statement manually in Postgres and it went through without any problem. However, I forgot to mention that: the strange thing in the exception log is that one of the values in the upsert statement is actually truncated:
In the following:
INSERT INTO "Private.statistics.page-views" ("adid","interval","type","count") VALUES ('12345678','MONTH_201701','sometype',1) ON CONFLICT ("adid","interval","type") DO UPDATE SET "count"=EXCLUDED."count"


the value 'sometype' should actually have been 'sometype_of_pageview'; I don't know if it is the logger that is truncating it, or if the value is actually truncated before insert. Note also that I do get the full value 'sometype_of_pageview' until the sink crashes.
That's also a lead to the potential thing failing, but I'm unsure how to test that (maye kafka-avro-consumer and grep on the truncated value so as to check the topic is generated correctly?)

I reduced the batch.size because I thought that the JDBC layer crashed on conflicting upsert statements within the same batch.
-Ewen

To post to this group, send email to confluent...@googlegroups.com.

nicolas....@finn.no

unread,
Jan 9, 2017, 4:58:26 AM1/9/17
to Confluent Platform
But you confirm that (0.10.0.1):

KTable<String, Long> counts = source
.flatMap(toAdIdAndTimeBucketKeys())
    .through("Private.statistics.rekeyed-page-views")
.countByKey("page-views");

Is strictly equivalent to (0.10.1.1):

KTable<String, Long> counts = source
.flatMap(toAdIdAndTimeBucketKeys())
    .groupByKey()
.count("page-views");

Right?

nicolas....@finn.no

unread,
Jan 9, 2017, 6:29:09 AM1/9/17
to Confluent Platform
Found out some more; I grep-ed the output of the kafka-stream topic to see if I got those truncated values, and I do get them!

{"adid":"12345678","interval":"INTERVAL_ALL","type":"sometype_of_pageviews"} {"count":3}
{"adid":"12345678","interval":"INTERVAL_ALL","type":"\u0000\u0000\u0012sometype"} {"count":1}
{"adid":"12345678","interval":"INTERVAL_ALL","type":"sometype_of_pageviews"} {"count":4}

That said, I don't know if it is incoming data that contains these unicodes or if it is produced by the 0.10.1.1 stream itself. Strange. Will investigate further.

nicolas....@finn.no

unread,
Jan 9, 2017, 8:51:03 AM1/9/17
to Confluent Platform
That was it: the input kafka topics contained some "corrupted" data with unicode chars that crashed the upsert in Postgres. 
I cannot say whether it is a coincidence or not, but either I didn't get corrupted data when I tested on 0.10.0.1 (I doubt that since I tested both versions several times alternatively the whole day of yesterday), or the 0.10.0.1 kafka-streams handles these unicode chars in a different way than the 0.10.1.1 (stripping them either on the way in or out).
Anyways, I now sanitize the input topics and got everything working again on 0.10.1.1.

Michael Noll

unread,
Jan 13, 2017, 5:55:43 AM1/13/17
to confluent...@googlegroups.com
Glad to hear it works for you now, Nicolas.

> I cannot say whether it is a coincidence or not [...] the 0.10.0.1 kafka-streams handles these unicode chars
> in a different way than the 0.10.1.1 (stripping them either on the way in or out).

I'm pretty sure nothing changed between 0.10.0.1 and 0.10.1.1 on the side of the Streams API in this regard.

-Michael




-Ewen

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages