KTable list value question

948 views
Skip to first unread message

Shannon Ma

unread,
Nov 30, 2016, 11:54:03 AM11/30/16
to Confluent Platform
Hi,

I have a question about KTable,

I am building/aggregate a ktable as a list for each key, for example, have m1, m2 and m3

m1 -> [m1]
m2 --> [m1,m2]
m3-> [m1,m2,m3]

after some business logic, i am putting ktable back to kstream, using flatMapvalues, my understanding since it is a change log, in the flat method, i only return the last element in the list at the time. Is this correct?

After that i moved to using a windowed ktable [5 hr window, retention 10 hr]  and sending result also to kibana, thats when I noticed i am missing data, most of the time i have two in the list, since i am returning only the last element [m1,m2], i dont see the first element. After change the method to return all in the list i see all elements now, but i am not sure if this is correct? Or is this only bcoz i am using windowed ktable?


Thanks
Shannon

Guozhang Wang

unread,
Dec 1, 2016, 1:20:46 AM12/1/16
to Confluent Platform
Hello Shannon,

I am not clear about your example of

m1 -> [m1]
m2 --> [m1,m2]
m3-> [m1,m2,m3]

Do you mean that in your KTable, m1 -> [m1] is a record where m1 is the key and [m1] is a value as a single-element list? In this case is the m2 that both in key and in the value list the same?

Anyways, assuming my interpretation of the example above is correct, then KTable.toStream().flatMapValues should have all the elements, i.e. the resulted stream is:

m1 -> m1, m2 -> m1, m2 -> m2, m3 -> m1, m3 -> m2, m3 -> m3


And again I am not very clear about your description about "i moved to using a window ktable .. After change the method to return all in the list i see all elements now, but i am not sure if this is correct" Could you send me your code snippet as well as your observed stream content so I can understand that better?


Guozhang

Shannon Ma

unread,
Dec 1, 2016, 9:40:27 AM12/1/16
to Confluent Platform
Sorry if not clear, these are the values for the same key, so as they come in i am building the list

m1 comes in, key->[m1]
m2 comes in, key->[m1,m2]
m3 comes in, key->[m1,m2,m3]

so when i convert back to kstream, they will be three messages
 key->[m1]
key->[m1,m2]
key->[m1,m2,m3]

i want to flat the values so it sends m1, m2 and m3 to the output topic, thus in my method, i only return the last element, for example when key->[m1,m2] comes in i only return the last element [m2] as m1 is already returned in last message key->[m1], that is my understanding. is that correct?

Guozhang Wang

unread,
Dec 1, 2016, 8:40:25 PM12/1/16
to Confluent Platform
In that case, you will still see

key -> m1, key -> m1, key -> m2, key -> m1, key -> m2, key -> m3 after the toStream().flatMap(), since after toStream each message is now considered just as a key-value pair and not an update any more.

If you only want the last element in each of the list, you could use a mapValues instead of flatMap to just take the last element for each list.

Guozhang

Shannon Ma

unread,
Dec 2, 2016, 10:25:34 AM12/2/16
to Confluent Platform
Thanks Guozhang, let me show my code snippet, I have two kafka streams


-------------------- this one builds a ktable with window

final KStreamBuilder builder = new KStreamBuilder();
KStream<GenericRecord, GenericRecord> txnStream = builder.stream(txnTopic);
KStream<String, GenericRecord> keyedtxnStream = 
txnStream.selectKey(new KeyValueMapper<GenericRecord, GenericRecord, String>() {

@Override
public String apply(GenericRecord key, GenericRecord value) {

String newkey = value.get("TAG_ID").toString() + "_" + value.get("PLAZA_ID").toString();
return newkey;
}
});

TimeWindows tw = TimeWindows.of(window_size);
Windows<TimeWindow> ww = tw.until(2 * window_size);
KGroupedStream<String, GenericRecord> groupedStream = keyedtxnStream.groupByKey(stringSerdeKey, valueAvroSerde);
KTable<Windowed<String>, ArrayList<GenericRecord>> listTableStream = groupedStream.aggregate(new Initializer<ArrayList<GenericRecord>>() {
@Override
public ArrayList<GenericRecord> apply() {
return new ArrayList<GenericRecord>();
}
} ,
  new Aggregator() {
@Override
public Object apply(Object aggKey, Object value, Object aggregate) {
ArrayList<GenericRecord> list = (ArrayList<GenericRecord>)aggregate;
GenericRecord v = (GenericRecord)value;
list.add((GenericRecord) value);
logger.debug("aggkey==="+aggKey);
logger.debug("list==="+list);
return list;
}}
, ww, aggValueSerde, storename); 

KStream<Windowed<String>, ArrayList<GenericRecord>> liststream =  listTableStream.toStream();
liststream.map((key, value) -> new KeyValue<>(populate(schema, value.get(0), null), populateArray(schemaArray, schema, value))).to(txnIntTopic);
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

Shannon Ma

unread,
Dec 2, 2016, 10:31:17 AM12/2/16
to Confluent Platform
The second one, i am read from store for some logic, at the end i have flatMapValues (sorry i did use flatMapValues if said flatMap b4), I was only returning the last one in the list but did not see the first element in kibana, when i return all in the list i see all the data, but i am not sure this is correct.
-------------------------------------
final KStreamBuilder builder2 = new KStreamBuilder();
KStream<GenericRecord, GenericRecord> txnStream2 = builder2.stream(txnIntTopic);
ReadOnlyWindowStore<String, ArrayList<GenericRecord>> store = getStore(streams);
KStream<GenericRecord, GenericRecord> dupstream = txnStream2.map(new KeyValueMapper<GenericRecord, GenericRecord, KeyValue<String, ArrayList<GenericRecord>>>() {

@Override
public KeyValue<String, ArrayList<GenericRecord>> apply(GenericRecord key, GenericRecord value) {
String newkey = key.get("TAG_ID").toString() + "_" + key.get("PLAZA_ID").toString();
GenericData.Array<GenericRecord> array = (GenericData.Array<GenericRecord>) value.get("list");
ArrayList<GenericRecord> newvalue = new ArrayList<GenericRecord>();
for (Iterator<GenericRecord> it = array.iterator(); it.hasNext();) {
GenericRecord r = it.next();
newvalue.add(r);
}
return new KeyValue<String, ArrayList<GenericRecord>>(newkey, newvalue);
}
})
.flatMapValues(new ValueMapper<ArrayList<GenericRecord>, Iterable<GenericRecord>>() {
      

@Override
public Iterable<GenericRecord> apply(ArrayList<GenericRecord> value) {
logger.debug("flatMapValues#value=="+value);
logger.debug("listTableStream=="+listTableStream.getStoreName());
GenericRecord last = value.get(value.size()-1);
String key = last.get("TAG_ID").toString() + "_" + last.get("PLAZA_ID").toString();
long lastdate = (Long) last.get("TRANSACTION_DATE");
String lastid = ((org.apache.avro.util.Utf8) last.get("TRANSACTION_ID")).toString();
logger.debug("last#value=="+last);
ArrayList<GenericRecord> result = new ArrayList<GenericRecord>();
String dupid = null;
for (int i =0 ; i<value.size() - 1; i++ ) {
GenericRecord r = value.get(i);
String duptxnid = ((org.apache.avro.util.Utf8) r.get("TRANSACTION_ID")).toString();
logger.debug("lastid="+lastid+",dupid="+duptxnid);
long rdate = (Long) r.get("TRANSACTION_DATE");
if (Math.abs(rdate - lastdate) < 2000) {
dupid = duptxnid;
logger.debug("DUP FOUND : lastid="+lastid+",dupid="+duptxnid);
}
GenericRecord nr = populate(schema, r, null);
result.add(nr);
}
ArrayList<GenericRecord> oldlist = new ArrayList<GenericRecord>();
long current = System.currentTimeMillis();
if (dupid == null &&  current - lastdate > window_size - 2000) 
{
logger.debug("lastdate="+lastdate);
WindowStoreIterator<ArrayList<GenericRecord>> olddata =  store.fetch(key, lastdate-window_size, current);
while (olddata.hasNext()) {
ArrayList<GenericRecord> l = olddata.next().value;
if (l.size() > 0) {
oldlist.addAll(l);
}
}
}
logger.debug("oldlist==="+oldlist);
for (GenericRecord r : oldlist ) {
String duptxnid = ((org.apache.avro.util.Utf8) r.get("TRANSACTION_ID")).toString();
logger.debug("OLD lastid="+lastid+",dupid="+duptxnid);
long rdate = (Long) r.get("TRANSACTION_DATE");
if (!lastid.equals(duptxnid) && Math.abs(rdate - lastdate) < 2000) {
dupid = duptxnid;
logger.debug("DUP FOUND in OLD list : lastid="+lastid+",dupid="+duptxnid);
}
}
GenericRecord newr = populate(schema, last, dupid);
logger.debug("newr==="+newr);
result.add(newr);
logger.debug("result==="+result);
return result;
}
   }).map((key, value) -> new KeyValue<>(null, value));
dupstream.to(duptxnTopic);;
final KafkaStreams streams2 = new KafkaStreams(builder2, streamsConfiguration2);
streams2.start();

Guozhang Wang

unread,
Dec 2, 2016, 2:45:40 PM12/2/16
to Confluent Platform
Your description was a bit fuzzy to me, about "I was only returning the last one in the list but did not see the first element in kibana"...

My read on your source code, is that you are putting all but the last element into the "result" list first, then re-decorate the last element from the readed old list and put to the result again, so in the end the result's size should be the same as the original one.

If by saying "only returning the last one in the list" I read it as you commented out the section of

----
for (int i =0 ; i<value.size() - 1; i++ ) {
GenericRecord r = value.get(i);
String duptxnid = ((org.apache.avro.util.Utf8) r.get("TRANSACTION_ID")).toString();
logger.debug("lastid="+lastid+",dupid="+duptxnid);
long rdate = (Long) r.get("TRANSACTION_DATE");
if (Math.abs(rdate - lastdate) < 2000) {
dupid = duptxnid;
logger.debug("DUP FOUND : lastid="+lastid+",dupid="+duptxnid);
}
GenericRecord nr = populate(schema, r, null);
result.add(nr);
}
----

That puts all but the last element in the list, then I believe your result stream (after the final stage map() that get rid of the key) should be containing single-element lists, which is the re-decorated last element, so it is correct that you "do not see the first element".



Guozhang

Shannon Ma

unread,
Dec 2, 2016, 3:25:07 PM12/2/16
to Confluent Platform
At the end i put the last one result.add(newr); I DID NOT have this result.add(nr) in the for loop and I only SAW the last one, after i ADDED result.add(nr) I see all the elements.

That's my question that the list is growing whenever there is a new message comes in, what should i do in the flat method so that i only send each element ONCE to the topic.

My understanding is the flat method will be called whenever there is a new message (list changed), is that correct? What i want is when m1 comes in i add to the list/store and send m1 to the output topic, when m2 comes in (can be hours later) i add to the list and there is logic to check the list and send m2 to the output topic. If i dont use the windowed ktable the list will keep growing so i use the window to expire old data, I need to use the store to get other windows bcoz in certain cases i need to check only just the list in the current window. Am i on the right path?

------------------------

for (int i =0 ; i<value.size() - 1; i++ ) { 
GenericRecord r = value.get(i);
String duptxnid = ((org.apache.avro.util.Utf8) r.get("TRANSACTION_ID")).toString();
logger.debug("lastid="+lastid+",dupid="+duptxnid);
long rdate = (Long) r.get("TRANSACTION_DATE");
if (Math.abs(rdate - lastdate) < 2000) {
dupid = duptxnid;
logger.debug("DUP FOUND : lastid="+lastid+",dupid="+duptxnid);
}
GenericRecord nr = populate(schema, r, null);
result.add(nr);
}


--------------------------

Shannon Ma

unread,
Dec 2, 2016, 3:27:35 PM12/2/16
to Confluent Platform
Sorry typo 

I need to use the store to get other windows bcoz in certain cases i need to check NOT only the list in the current window.
Message has been deleted

Shannon Ma

unread,
Dec 2, 2016, 4:58:26 PM12/2/16
to Confluent Platform
Hope this not further  confuse you, in my first Kafka Stream, i am building a list by key (and save to store), then convert back to stream (key,list), 



KTable<Windowed<String>, ArrayList<GenericRecord>> listTableStream = groupedStream.aggregate(new Initializer<ArrayList<GenericRecord>>() {
@Override
public ArrayList<GenericRecord> apply() {
return new ArrayList<GenericRecord>();
}
} ,
  new Aggregator() {
@Override
public Object apply(Object aggKey, Object value, Object aggregate) {
ArrayList<GenericRecord> list = (ArrayList<GenericRecord>)aggregate;
GenericRecord v = (GenericRecord)value;
String vid = ((org.apache.avro.util.Utf8) v.get("TRANSACTION_ID")).toString();
for (GenericRecord r: list) {
String rid = ((org.apache.avro.util.Utf8) r.get("TRANSACTION_ID")).toString();
if (rid.equals(vid)) {
return list;
}
}
list.add((GenericRecord) value);
logger.debug("aggkey==="+aggKey);
logger.debug("value==="+value);
logger.debug("list==="+list);
return list;
}}
, ww, aggValueSerde, storename); 

logger.info("-------before toStream----");
KStream<Windowed<String>, ArrayList<GenericRecord>> liststream =  listTableStream.toStream();
logger.info("-------before to topic----");
liststream.map((key, value) -> new KeyValue<>(populate(schema, value.get(0), null), populateArray(schemaArray, schema, value))).to(txnIntTopic);


In apply() i print out value and list, i see 318218905 first, 


16:45| DEBUG | RtTxnDuplicateIndentifier.java 150 | value==={"TAG_ID": "901470003", "PLAZA_ID": "10", "TRANSACTION_ID": "318218905", "TRANSACTION_DATE": 1480686360183, "DATE_CREATED": 1480686377000, "LANE_NAME": "LIVEE08-01", "PLAZA_NAME": "W8E8", "SEGMENT_NAME": "580-LIVEE", "PAYMENT_TYPE": "SOV"}
16:45| DEBUG | RtTxnDuplicateIndentifier.java 151 | list===[{"TAG_ID": "901470003", "PLAZA_ID": "10", "TRANSACTION_ID": "318218905", "TRANSACTION_DATE": 1480686360183, "DATE_CREATED": 1480686377000, "LANE_NAME": "LIVEE08-01", "PLAZA_NAME": "W8E8", "SEGMENT_NAME": "580-LIVEE", "PAYMENT_TYPE": "SOV"}]


then 218218905 which list has 218218905 and 318218905


16:45| DEBUG | RtTxnDuplicateIndentifier.java 150 | value==={"TAG_ID": "901470003", "PLAZA_ID": "10", "TRANSACTION_ID": "218218905", "TRANSACTION_DATE": 1480686359683, "DATE_CREATED": 1480686377000, "LANE_NAME": "LIVEE08-01", "PLAZA_NAME": "W8E8", "SEGMENT_NAME": "580-LIVEE", "PAYMENT_TYPE": "SOV"}
16:45| DEBUG | RtTxnDuplicateIndentifier.java 151 | list===[{"TAG_ID": "901470003", "PLAZA_ID": "10", "TRANSACTION_ID": "318218905", "TRANSACTION_DATE": 1480686360183, "DATE_CREATED": 1480686377000, "LANE_NAME": "LIVEE08-01", "PLAZA_NAME": "W8E8", "SEGMENT_NAME": "580-LIVEE", "PAYMENT_TYPE": "SOV"}, {"TAG_ID": "901470003", "PLAZA_ID": "10", "TRANSACTION_ID": "218218905", "TRANSACTION_DATE": 1480686359683, "DATE_CREATED": 1480686377000, "LANE_NAME": "LIVEE08-01", "PLAZA_NAME": "W8E8", "SEGMENT_NAME": "580-LIVEE", "PAYMENT_TYPE": "SOV"}]


then in my map() i call populateArray() to convert list to array to put to topic, in populateArray I print out the list


 populateArray list=[{"TAG_ID": "901470003", "PLAZA_ID": "10", "TRANSACTION_ID": "318218905", "TRANSACTION_DATE": 1480686360183, "DATE_CREATED": 1480686377000, "LANE_NAME": "LIVEE08-01", "PLAZA_NAME": "W8E8", "SEGMENT_NAME": "580-LIVEE", "PAYMENT_TYPE": "SOV"}, {"TAG_ID": "901470003", "PLAZA_ID": "10", "TRANSACTION_ID": "218218905", "TRANSACTION_DATE": 1480686359683, "DATE_CREATED": 1480686377000, "LANE_NAME": "LIVEE08-01", "PLAZA_NAME": "W8E8", "SEGMENT_NAME": "580-LIVEE", "PAYMENT_TYPE": "SOV"}]


i only see map gets called once with list has both. How does this work? In the other case where there is only one element for the key, the map gets called when the list has one element. So my question is when/how map gets call in this case?

In this case the two 318218905 and 218218905  come in very close, what happens if one of them comes in late (hours late)?

Shannon Ma

unread,
Dec 6, 2016, 9:38:16 AM12/6/16
to Confluent Platform
Anyone has any idea what might be the issue or i missed something?

Damian Guy

unread,
Dec 6, 2016, 11:02:05 AM12/6/16
to Confluent Platform
Hi Shannon,

In your case because the map() call has happened after an aggregate(), the map will only be called when the cache is flushed. This occurs on every commit interval (default is 30 seconds) or when there is a cache eviction. 
The cache size is controlled by StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG - if you set it to 0 every record will be immediately flushed through the topology. So you should see something printed out for each record.

Thanks,
Damian


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/91cd7011-3a99-4156-bef4-85ab2d022180%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Shannon Ma

unread,
Dec 6, 2016, 11:35:15 AM12/6/16
to Confluent Platform
Thanks Damian, that explains it. Will set 0 cause performance issue? I probably can set a flag in the message in my later process so i know whether a message in the list has been processed/sent (in my second Kafka Streams logic).
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Damian Guy

unread,
Dec 6, 2016, 12:15:28 PM12/6/16
to Confluent Platform
Hi Shannon,

There will be some performance degradation, but at the moment it is the safest thing to do as there are some issues with the cache, i.e., https://issues.apache.org/jira/browse/KAFKA-4492

So i'd recommend setting the cache size to 0.

Thanks,
Damian

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Shannon Ma

unread,
Dec 6, 2016, 5:01:48 PM12/6/16
to Confluent Platform
Thanks!
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages