I am thinking in the first stream i will put to a new topic (thus data is in the store), then have a second stream where i can now access the store and process there, not sure if will work.
Now i am having this error (no second stream logic yet, just the first one group and store), i dont know where to check. my code is like
-----code ----------------------
final Serde<ArrayList<GenericRecord>> aggValueSerde = new ArrayListSerde<>(keyAvroSerde,valueAvroSerde);
TimeWindows tw = TimeWindows.of(24 * 60 * 60 * 1000L);
Windows<TimeWindow> ww = tw.until(7 * 24 * 60 * 60 * 1000L);
KGroupedStream<String, GenericRecord> groupedStream = keyedtxnStream.groupByKey(stringSerdeKey, valueAvroSerde);
KTable<Windowed<String>, ArrayList<GenericRecord>> listTableStream = groupedStream.aggregate(new Initializer<ArrayList<GenericRecord>>() {
@Override
public ArrayList<GenericRecord> apply() {
// TODO Auto-generated method stub
return new ArrayList<GenericRecord>();
}
} ,
new Aggregator() {
@Override
public Object apply(Object aggKey, Object value, Object aggregate) {
ArrayList<GenericRecord> list = (ArrayList<GenericRecord>)aggregate;
GenericRecord v = (GenericRecord)value;
for (GenericRecord r: list) {
String vtxnid = ((org.apache.avro.util.Utf8) v.get("TRANSACTION_ID")).toString();
String rtxnid = ((org.apache.avro.util.Utf8) r.get("TRANSACTION_ID")).toString();
if (r.equals(v)) {
return list;
}
}
list.add((GenericRecord) value);
logger.debug("aggkey==="+aggKey);
logger.debug("value==="+value);
logger.debug("list==="+list);
return list;
}}
, ww, aggValueSerde, "txnstore");
---------------------------------------------------------------error -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
16:33| ERROR | StreamThread.java 330 | stream-thread [StreamThread-1] Failed while executing StreamTask 1_0 duet to close state manager:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to close state store txnstore
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:351)
at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:120)
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:348)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328)
at org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:344)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:305)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252)
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to org.apache.avro.generic.GenericRecord
at io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25)
at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118)
at org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:349)
... 7 more