pipeline
.drawFrom(source)
.filter(fd -> fd.getLineNumber() > 0) // skip header, managed in source
.map(FileData::parseLine) // parse in multi thread
.map(fd -> {
fd.getData()[2] = "test2";
return fd;
})
.mapUsingIMap("bigmap", this::computeHeavy)
.drainTo(Sinks.noop());
private FileData computeHeavy(IMap<String, List<String[]>> bigmap, FileData fd) {
List<String[]> liste = bigmap.get("INV" + fd.getLineNumber() % 50000);
for (String[] data : liste) {
for (int dataIndex = 0; dataIndex < data.length; dataIndex++) {
String datum = data[dataIndex] + "" + 3;
if (datum.equals(fd.getData()[dataIndex])) {
System.out.println("GOTCHA !");
}
}
}
return fd;
}
private FileData computeHeavy(IMap<String, List<String[]>> bigmap, FileData fd) {
List<String[]> liste = bigmap.get("INV" + fd.getLineNumber() % 50000);
return fd;
}
And it took 40763ms to complete.
This is the access to bigMap, even with 1 instance wich is very very slow !
How could I have a "local" cache ??
private FileData computeHeavy(List<String[] liste, FileData fd) {
pipeline
.drawFrom(source)
.filter(fd -> fd.getLineNumber() > 0) // skip header, managed in source
.map(FileData::parseLine) // parse in multi thread
.map(fd -> {
fd.getData()[2] = "test2";
return fd;
})
.groupingKey(fd->"INV" + fd.getLineNumber() % 50000)
.mapUsingIMap("bigmap", this::computeHeavy)
.drainTo(Sinks.noop());
And
private FileData computeHeavy( FileData fd, List<String[]> liste) {
// for (String[] data : liste) {
// for (int dataIndex = 0; dataIndex < data.length; dataIndex++) {
// String datum = data[dataIndex] + "" + 3;
// if (datum.equals(fd.getData()[dataIndex])) {
// System.out.println("GOTCHA !");
// }
// }
// }
return fd;
}
JetConfig config = new JetConfig();
Config hzConfig = config.getHazelcastConfig();
hzConfig.getMapConfig("bigmap")
.setNearCacheConfig(
new NearCacheConfig()
.setInMemoryFormat(InMemoryFormat.OBJECT)
.setCacheLocalEntries(true));
BatchStage<Entry<String, List<String[]>>> bigMap = pipeline.drawFrom(Sources.map("bigmap"));
pipeline
.drawFrom(source)
.filter(fd -> fd.getLineNumber() > 0)
.map(FileData::parseLine) // parse in multi thread
.map(fd -> {
fd.getData()[2] = "a"; // dummy replacement of value
return fd;
})
.hashJoin(bigMap, JoinClause.onKeys(fd -> "INV" + (fd.getLineNumber() % KEY_COUNT), e -> e.getKey()),
(FileData fd, Entry<String, List<String[]>> e) -> { // simulate high compute cpu
for (String[] data : e.getValue()) {
for (int dataIndex = 0; dataIndex < 15; dataIndex++) {
if (data[dataIndex].equals(fd.getData()[dataIndex])) {
System.out.println("GOTCHA !");
}
}
}
return fd;
})
.drainTo(Sinks.noop());
--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/3cc72dcf-ed67-4b88-9fcf-269b9bcb82b8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.