Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperServers);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Object> source = builder.stream(/* list of topics omitted for brevity */);
KTable<String, Long> counts = source
.flatMap(toAdIdAndTimeBucketKeys()) // <= one key in the topic results in 4 new keys with the same value (4 different time buckets: year, month, day, all)
.through("Private.statistics.rekeyed-page-views")
.countByKey("page-views");
counts.toStream()
.map(PageViewsAggregator::toAvro)
.to(avroKeySerde, avroValueSerde, "Private.statistics.page-views");
streams = new KafkaStreams(builder, props);
/* error handling and start the stream */
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperServers);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Object> source = builder.stream(/* list of topics ommitted for brevity */);
KTable<String, Long> counts = source
.flatMap(toAdIdAndTimeBucketKeys()) // <= one key in the topic results in 4 new keys with the same value (4 different time buckets: year, month, day, all)// .through("Private.statistics.rekeyed-page-views") <= tried also with this one in case it could mess up with message ordering but did not prevent sink from crashing
.groupByKey()
.count("page-views");counts.toStream()
.map(PageViewsAggregator::toAvro)
.to(avroKeySerde, avroValueSerde, "Private.statistics.page-views");streams = new KafkaStreams(builder, props);
/* error handling and start the stream */
[2017-01-06 15:26:53,812] WARN Write of 500 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask:68)
java.sql.BatchUpdateException: Batch entry 9 INSERT INTO "Private.statistics.page-views" ("adid","interval","type","count") VALUES ('12345678','MONTH_201701','sometype',1) ON CONFLICT ("adid","interval","type") DO UPDATE SET "count"=EXCLUDED."count" was aborted. Call getNextException to see the cause.
at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2778)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1912)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:338)
at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2959)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:98)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:77)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/028ee14b-2a60-4d93-9a13-0a0e1c4d9790%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
INSERT INTO "Private.statistics.page-views" ("adid","interval","type","count") VALUES ('12345678','MONTH_201701','sometype',1) ON CONFLICT ("adid","interval","type") DO UPDATE SET "count"=EXCLUDED."count"
-Ewen
To post to this group, send email to confluent...@googlegroups.com.
KTable<String, Long> counts = source
.flatMap(toAdIdAndTimeBucketKeys())
.through("Private.statistics.rekeyed-page-views")
.countByKey("page-views");
KTable<String, Long> counts = source
.flatMap(toAdIdAndTimeBucketKeys())
.groupByKey()
.count("page-views");
{"adid":"12345678","interval":"INTERVAL_ALL","type":"sometype_of_pageviews"} {"count":3}
{"adid":"12345678","interval":"INTERVAL_ALL","type":"\u0000\u0000\u0012sometype"} {"count":1}
{"adid":"12345678","interval":"INTERVAL_ALL","type":"sometype_of_pageviews"} {"count":4}
-Ewen
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/028ee14b-2a60-4d93-9a13-0a0e1c4d9790%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/44e07912-ca3e-4cc9-88b0-3662f8f1c878%40googlegroups.com.To post to this group, send email to confluent-platform@googlegroups.com.