how do i query windowed ktable

1,214 views
Skip to first unread message

Shannon Ma

unread,
Nov 17, 2016, 2:28:03 PM11/17/16
to Confluent Platform
Hi,

I have a question about windowing, if i have a 'one day' window with retention of 7 days, so i have 7 windowed buckets, within the current window of a message, how can i query/access the other timed window? 

Thanks
Shannon 

Eno Thereska

unread,
Nov 18, 2016, 1:47:26 PM11/18/16
to Confluent Platform

Shannon Ma

unread,
Nov 21, 2016, 12:04:02 PM11/21/16
to Confluent Platform
Thanks Eno, so looks like i need to use kafka 0.10.1.0, right? i noticed the kstream.aggregateByKey is gone, how can i aggregate with timed window in 0.10.1.0?

Shannon

Damian Guy

unread,
Nov 21, 2016, 1:11:01 PM11/21/16
to Confluent Platform
Hi Shannon,

In 0.10.1.0, you would use kstream.groupByKey().aggregate(...)

Thanks,
Damian

On Mon, 21 Nov 2016 at 09:04 Shannon Ma <shan...@gmail.com> wrote:
Thanks Eno, so looks like i need to use kafka 0.10.1.0, right? i noticed the kstream.aggregateByKey is gone, how can i aggregate with timed window in 0.10.1.0?

Shannon

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/cad3b9ed-2aeb-4f71-b156-f905def0bfcb%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Matthias J. Sax

unread,
Nov 21, 2016, 2:01:56 PM11/21/16
to confluent...@googlegroups.com
Also see the upgrade section for other API changes:

http://docs.confluent.io/3.1.1/streams/upgrade-guide.html#api-changes


On 11/21/16 10:10 AM, Damian Guy wrote:
> Hi Shannon,
>
> In 0.10.1.0, you would use kstream.groupByKey().aggregate(...)
>
> Thanks,
> Damian
>
> On Mon, 21 Nov 2016 at 09:04 Shannon Ma <shan...@gmail.com
> <mailto:shan...@gmail.com>> wrote:
>
> Thanks Eno, so looks like i need to use kafka 0.10.1.0, right? i
> noticed the kstream.aggregateByKey is gone, how can i aggregate with
> timed window in 0.10.1.0?
>
> Shannon
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it,
> send an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to
> confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/cad3b9ed-2aeb-4f71-b156-f905def0bfcb%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/CAJikTEVSMA%3D37rSjzqeELdzrJAiRCwF3Q5_QGi3frXq5nt%3DASA%40mail.gmail.com
> <https://groups.google.com/d/msgid/confluent-platform/CAJikTEVSMA%3D37rSjzqeELdzrJAiRCwF3Q5_QGi3frXq5nt%3DASA%40mail.gmail.com?utm_medium=email&utm_source=footer>.
signature.asc

Shannon Ma

unread,
Nov 21, 2016, 2:27:46 PM11/21/16
to Confluent Platform
Thanks i saw that, after upgraded also confluent 3.1.1, i am seeing this error


Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Topic not found during partition assignment: RekeyedIntermediateTopic19


What did i miss?


Shannon 

Shannon Ma

unread,
Nov 21, 2016, 2:38:52 PM11/21/16
to Confluent Platform
never mind, my bad.

Shannon Ma

unread,
Nov 21, 2016, 2:56:17 PM11/21/16
to Confluent Platform
Hi,

This is what i am doing, please take a look if i am doing the right way.

Basically for my message <key,value> i need to check if for the same key, there are two messages within say 2 min, i need to flag one (second one) as the duplicate. 

So 

1, building a ktable wit the key and value is the list of the messages (aggregate), this is where i save to the store
2, convert ktable to kstream, and do flatmapvalues to break the list to individual record, during this i like to query the store to check if there exists another record within 2min, then i can flag.
3, send results to a new topic

My questions

1, getting store can only from kafkastream, usually i put this line at the end

final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);

if i moved it up i have this error

Assigned partition rt_txn_dup-txnstore-repartition-0 for non-subscribed topic.

2, i can have a new kstream from the same topic and do a join with this ktable, but my understanding is that bcoz ktable can be lagging so join will not work, is that true?

3, is there another way to access the store/ktable?

Thanks
Shannon


Shannon Ma

unread,
Nov 21, 2016, 3:14:56 PM11/21/16
to Confluent Platform
a little more info, in my flatmapvalues i have the current list, but i need to query the list from the other window, maybe i did it wrong. The reason i want a window is so that it will expire, thus data will not keep growing, ,maybe there is another way to achieve this.

Shannon Ma

unread,
Nov 21, 2016, 3:17:46 PM11/21/16
to Confluent Platform
When convert a windowed ktable to kstream, in my case, say two windowed ktables (two lists), what happens? do they combine into one list or there are two messages each with a list?

Shannon Ma

unread,
Nov 21, 2016, 4:49:33 PM11/21/16
to Confluent Platform
I am thinking in the first stream i will put to a new topic (thus data is in the store), then have a second stream where i can now access the store and process there, not sure if will work.

Now i am having this error (no second stream logic yet, just the first one group and store), i dont know where to check. my code is like 

-----code ----------------------

final Serde<ArrayList<GenericRecord>> aggValueSerde =  new ArrayListSerde<>(keyAvroSerde,valueAvroSerde);

TimeWindows tw = TimeWindows.of(24 * 60 * 60 * 1000L);
Windows<TimeWindow> ww = tw.until(7 * 24 * 60 * 60 * 1000L);
KGroupedStream<String, GenericRecord> groupedStream = keyedtxnStream.groupByKey(stringSerdeKey, valueAvroSerde);
KTable<Windowed<String>, ArrayList<GenericRecord>> listTableStream = groupedStream.aggregate(new Initializer<ArrayList<GenericRecord>>() {

@Override
public ArrayList<GenericRecord> apply() {
// TODO Auto-generated method stub
return new ArrayList<GenericRecord>();
}
} ,
  new Aggregator() {

@Override
public Object apply(Object aggKey, Object value, Object aggregate) {
ArrayList<GenericRecord> list = (ArrayList<GenericRecord>)aggregate;
GenericRecord v = (GenericRecord)value;
for (GenericRecord r: list) {
String vtxnid = ((org.apache.avro.util.Utf8) v.get("TRANSACTION_ID")).toString();
String rtxnid = ((org.apache.avro.util.Utf8) r.get("TRANSACTION_ID")).toString();
if (r.equals(v)) {
return list;
}
}
list.add((GenericRecord) value);
logger.debug("aggkey==="+aggKey);
logger.debug("value==="+value);
logger.debug("list==="+list);
return list;
}}
, ww, aggValueSerde, "txnstore"); 


---------------------------------------------------------------error -----------------------------------------------------------------------------------------------------------------------------------------------------------------------

16:33| ERROR | StreamThread.java 330 | stream-thread [StreamThread-1] Failed while executing StreamTask 1_0 duet to close state manager:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to close state store txnstore
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:351)
        at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:120)
        at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:348)
        at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328)
        at org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:344)
        at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:305)
        at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252)
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to org.apache.avro.generic.GenericRecord
        at io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25)
        at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
        at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:124)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:349)
        ... 7 more

Shannon Ma

unread,
Nov 21, 2016, 8:09:23 PM11/21/16
to Confluent Platform
if i remove the window, just plain ktable, i am getting this


 Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store txnstore
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
        at io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25)
        at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:43)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329)
        ... 6 more

 

Shannon Ma

unread,
Nov 21, 2016, 9:42:37 PM11/21/16
to Confluent Platform
looks like it is not the ktable/store, it is in the flatmapvalues

here is the code, basically i am convert <String, ArrayList<GenericRecord>> to <String, GenericRecord>, what did i do wrong?

 KStream<String, GenericRecord> dupstream = 
liststream.flatMapValues(new ValueMapper<ArrayList<GenericRecord>, Iterable<GenericRecord>>() {
      

@Override
public Iterable<GenericRecord> apply(ArrayList<GenericRecord> value) {


                                GenericRecord newRecord = new GenericData.Record(schema);
logger.debug("newr==="+newr);
ArrayList<GenericRecord> result = new ArrayList<GenericRecord>();
result.add(newr);
return result;

}


Shannon Ma

unread,
Nov 21, 2016, 10:11:48 PM11/21/16
to Confluent Platform
okay, i added

.map((key, value) -> new KeyValue<>(null, value)

before write to topic 

seems working now.

Matthias J. Sax

unread,
Nov 22, 2016, 1:54:02 PM11/22/16
to confluent...@googlegroups.com
Shannon,

I am a little lost in this thread now. Hope you could resolve your
problems. If not, please summarize your question into a new post. It's
all spread out over multiple posts and I cannot follow anymore.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/08c1e9c7-0493-490c-b33b-3080082169a6%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/08c1e9c7-0493-490c-b33b-3080082169a6%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Shannon Ma

unread,
Nov 22, 2016, 2:20:09 PM11/22/16
to Confluent Platform
Thanks Matthias, sorry i post as i move along, we can close this one.


can you help?


Thanks
Shannon
Reply all
Reply to author
Forward
0 new messages