Exception comes up like below:
Exception in thread "client_id_transaction_crm_org_20180417-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_5, processor=KSTREAM-SOURCE-0000000004, topic=events.upay.trade, partition=5, offset=31444719
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with record: 5 is not in the range [0...1).
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:913)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:783)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:773)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:68)
at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96)
at com.wosai.data.kstreams.transformers.DeduplicationTransformer.rememberNewEvent(DeduplicationTransformer.java:90)
at com.wosai.data.kstreams.transformers.DeduplicationTransformer.transform(DeduplicationTransformer.java:68)
at com.wosai.data.kstreams.transformers.DeduplicationTransformer.transform(DeduplicationTransformer.java:13)
at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:56)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
... 6 more
my code snippet:
....
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final GlobalKTable<String, DenormMerchantOrg> merchantDenorms = streamsBuilder.globalTable(TOPIC_MYSQL_CRM_DENORM_MERCHANT_ORG,
Materialized.<String, DenormMerchantOrg, KeyValueStore<Bytes, byte[]>>as(CRM_DENORM_MERCHANT_ORG_K_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(merchantSerde)
);
final KStream<String, Transaction> transactionStream = streamsBuilder.stream(StreamConstants.DEFAULT_TRANSACTION_TOPIC_NAME,
Consumed.with(Serdes.String(), transactionSerde));
final KStream<String, Transaction> transactionStreamEnsureKey = transactionStream.map((key, value) -> KeyValue.pair(value.getId(), value));
// DeDuplication
final String transactionDeDupStore = "dedup-transaction-org-store";
long maintainDurationPerEventInMs = TimeUnit.MINUTES.toMillis(4);
StoreBuilder storeBuilder = deDupStoreBuild(transactionDeDupStore, maintainDurationPerEventInMs);
streamsBuilder.addStateStore(storeBuilder);
final KStream<String, Transaction> dedupTransactionStream = transactionStreamEnsureKey.transform(
() -> new DeduplicationTransformer<>(maintainDurationPerEventInMs, (key, value) -> key, transactionDeDupStore),
transactionDeDupStore);
.....
any ideas ?