kafka steams deduplication facing Invalid partition given with record exception

740 views
Skip to first unread message

朱健

unread,
May 2, 2018, 2:38:31 PM5/2/18
to Confluent Platform
I have a kstreams  app to do  kstream-ktable join. First I ensure key with the kstream and then I want to do deduplication with the kstream key. But when I add a state store like the https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java

Exception comes up like below:

Exception in thread "client_id_transaction_crm_org_20180417-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_5, processor=KSTREAM-SOURCE-0000000004, topic=events.upay.trade, partition=5, offset=31444719
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with record: 5 is not in the range [0...1).
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:913)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:783)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:773)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:68)
at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96)
at com.wosai.data.kstreams.transformers.DeduplicationTransformer.rememberNewEvent(DeduplicationTransformer.java:90)
at com.wosai.data.kstreams.transformers.DeduplicationTransformer.transform(DeduplicationTransformer.java:68)
at com.wosai.data.kstreams.transformers.DeduplicationTransformer.transform(DeduplicationTransformer.java:13)
at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:56)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
... 6 more


my code snippet:


           ....

final StreamsBuilder streamsBuilder = new StreamsBuilder();

final GlobalKTable<String, DenormMerchantOrg> merchantDenorms = streamsBuilder.globalTable(TOPIC_MYSQL_CRM_DENORM_MERCHANT_ORG,
Materialized.<String, DenormMerchantOrg, KeyValueStore<Bytes, byte[]>>as(CRM_DENORM_MERCHANT_ORG_K_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(merchantSerde)
);

final KStream<String, Transaction> transactionStream = streamsBuilder.stream(StreamConstants.DEFAULT_TRANSACTION_TOPIC_NAME,
Consumed.with(Serdes.String(), transactionSerde));

final KStream<String, Transaction> transactionStreamEnsureKey = transactionStream.map((key, value) -> KeyValue.pair(value.getId(), value));

// DeDuplication
final String transactionDeDupStore = "dedup-transaction-org-store";
long maintainDurationPerEventInMs = TimeUnit.MINUTES.toMillis(4);
StoreBuilder storeBuilder = deDupStoreBuild(transactionDeDupStore, maintainDurationPerEventInMs);
streamsBuilder.addStateStore(storeBuilder);

final KStream<String, Transaction> dedupTransactionStream = transactionStreamEnsureKey.transform(
() -> new DeduplicationTransformer<>(maintainDurationPerEventInMs, (key, value) -> key, transactionDeDupStore),
transactionDeDupStore);

           .....

any ideas ?


朱健

unread,
May 2, 2018, 2:39:59 PM5/2/18
to Confluent Platform
Is there any other way to do dedupcation? 

在 2018年5月3日星期四 UTC+8上午2:38:31,朱健写道:

Guozhang Wang

unread,
May 2, 2018, 8:19:51 PM5/2/18
to Confluent Platform
Hello Jian,

It seems you have pre-created the changelog topic for state store "dedup-transaction-org-store" with one partition only, while later you've scaled your application to have at least 5 tasks since I saw your exceptional task is 2_5.

You can try deleting the changelog topic with names related to "dedup-transaction-org-store" and try again.


Guozhang

朱健

unread,
May 3, 2018, 2:33:04 AM5/3/18
to Confluent Platform
The whole error log  is not just 1 task. 

 I boot two kstreams app instances. Each has 5 task thread. I will try deleting the changelog first. 

Thanks for your suggestion.

在 2018年5月3日星期四 UTC+8上午8:19:51,Guozhang Wang写道:

Matthias J. Sax

unread,
May 3, 2018, 7:49:58 AM5/3/18
to confluent...@googlegroups.com
It seems that the number of partitions does not match. Note, that both
KStream and KTable must have the same number of partitions to do a
KStream-KTable join.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/57837919-21f9-4d91-a711-e4ccb69f75e8%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/57837919-21f9-4d91-a711-e4ccb69f75e8%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc
Reply all
Reply to author
Forward
0 new messages