The second one, i am read from store for some logic, at the end i have flatMapValues (sorry i did use flatMapValues if said flatMap b4), I was only returning the last one in the list but did not see the first element in kibana, when i return all in the list i see all the data, but i am not sure this is correct.
final KStreamBuilder builder2 = new KStreamBuilder();
KStream<GenericRecord, GenericRecord> txnStream2 = builder2.stream(txnIntTopic);
ReadOnlyWindowStore<String, ArrayList<GenericRecord>> store = getStore(streams);
KStream<GenericRecord, GenericRecord> dupstream = txnStream2.map(new KeyValueMapper<GenericRecord, GenericRecord, KeyValue<String, ArrayList<GenericRecord>>>() {
@Override
public KeyValue<String, ArrayList<GenericRecord>> apply(GenericRecord key, GenericRecord value) {
String newkey = key.get("TAG_ID").toString() + "_" + key.get("PLAZA_ID").toString();
GenericData.Array<GenericRecord> array = (GenericData.Array<GenericRecord>) value.get("list");
ArrayList<GenericRecord> newvalue = new ArrayList<GenericRecord>();
for (Iterator<GenericRecord> it = array.iterator(); it.hasNext();) {
GenericRecord r = it.next();
newvalue.add(r);
}
return new KeyValue<String, ArrayList<GenericRecord>>(newkey, newvalue);
}
})
.flatMapValues(new ValueMapper<ArrayList<GenericRecord>, Iterable<GenericRecord>>() {
@Override
public Iterable<GenericRecord> apply(ArrayList<GenericRecord> value) {
logger.debug("flatMapValues#value=="+value);
logger.debug("listTableStream=="+listTableStream.getStoreName());
GenericRecord last = value.get(value.size()-1);
String key = last.get("TAG_ID").toString() + "_" + last.get("PLAZA_ID").toString();
long lastdate = (Long) last.get("TRANSACTION_DATE");
String lastid = ((org.apache.avro.util.Utf8) last.get("TRANSACTION_ID")).toString();
logger.debug("last#value=="+last);
ArrayList<GenericRecord> result = new ArrayList<GenericRecord>();
String dupid = null;
for (int i =0 ; i<value.size() - 1; i++ ) {
GenericRecord r = value.get(i);
String duptxnid = ((org.apache.avro.util.Utf8) r.get("TRANSACTION_ID")).toString();
logger.debug("lastid="+lastid+",dupid="+duptxnid);
long rdate = (Long) r.get("TRANSACTION_DATE");
if (Math.abs(rdate - lastdate) < 2000) {
dupid = duptxnid;
logger.debug("DUP FOUND : lastid="+lastid+",dupid="+duptxnid);
}
GenericRecord nr = populate(schema, r, null);
result.add(nr);
}
ArrayList<GenericRecord> oldlist = new ArrayList<GenericRecord>();
long current = System.currentTimeMillis();
if (dupid == null && current - lastdate > window_size - 2000)
{
logger.debug("lastdate="+lastdate);
WindowStoreIterator<ArrayList<GenericRecord>> olddata = store.fetch(key, lastdate-window_size, current);
while (olddata.hasNext()) {
ArrayList<GenericRecord> l = olddata.next().value;
if (l.size() > 0) {
oldlist.addAll(l);
}
}
}
logger.debug("oldlist==="+oldlist);
for (GenericRecord r : oldlist ) {
String duptxnid = ((org.apache.avro.util.Utf8) r.get("TRANSACTION_ID")).toString();
logger.debug("OLD lastid="+lastid+",dupid="+duptxnid);
long rdate = (Long) r.get("TRANSACTION_DATE");
if (!lastid.equals(duptxnid) && Math.abs(rdate - lastdate) < 2000) {
dupid = duptxnid;
logger.debug("DUP FOUND in OLD list : lastid="+lastid+",dupid="+duptxnid);
}
}
GenericRecord newr = populate(schema, last, dupid);
logger.debug("newr==="+newr);
result.add(newr);
logger.debug("result==="+result);
return result;
}
}).map((key, value) -> new KeyValue<>(null, value));
final KafkaStreams streams2 = new KafkaStreams(builder2, streamsConfiguration2);
streams2.start();