Hi Teams,
I have create a spring boot application in which we are joining multiple datasources using Hazelcast Jet for small datasource it is working very good but as datasource records increases above 1 million it start occupying to much memory and memory leakage also not sure what is wrong am if someone can help to increase joining speed and low memory usage. I have allocated 8gb ram memory to my application.
Below is the code to call Pipeline:
UUID idOne = UUID.randomUUID();
DataFactory av = new DataFactory(idOne.toString());
av.buildJetInstance();
Map<String, Object> p = new HashMap<String, Object>();
p.putAll((Map<String, Object>) data.get("alldata"));
av.readMaptoJsonFile(p);
Pipeline pl = av.createPipeline();
av.runProcess(pl);
public class DataFactory implements Serializable {
public String uid;
public DataFactory(String uid) {
this.uid = uid;
}
public JetInstance buildJetInstance() {
JetConfig jetConfig = new JetConfig();
jetConfig.getInstanceConfig().setCooperativeThreadCount(5);
jetConfig.configureHazelcast(c -> {
c.getNetworkConfig().setReuseAddress(true);
c.setClusterName("DATA" + UUID.randomUUID().toString());
c.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true).setMembers(Arrays.asList(new String[] {"localhost"}));
});
EtlObjects.jetInstance = Jet.newJetInstance(jetConfig);
return EtlObjects.jetInstance;
}
public Pipeline createPipeline() {
return Pipeline.create();
}
public void joinPipeToJet(Pipeline pl, String name) {
JobConfig j = new JobConfig();
j.setName(name);
EtlObjects.jetInstance.newJob(pl,j).join();
}
public void readMaptoJsonFile(final Map<String, Object> data) {
String jobid = UUID.randomUUID().toString();
try {
Pipeline pl = createPipeline();
UUID idOne = UUID.randomUUID();
final IMap<Object, Object> abc = EtlObjects.jetInstance.getMap(idOne.toString());
abc.putAll(data);
final BatchSource batchSource = Sources.map(abc);
pl.readFrom(batchSource)
.writeTo(Sinks.map(this.uid));
joinPipeToJet(pl, jobid);
abc.destroy();
} catch (Exception e) {
Job j1 = EtlObjects.jetInstance.getJob(jobid);
if (j1 != null) {
j1.cancel();
}
} finally {
Job j1 = EtlObjects.jetInstance.getJob(jobid);
if (j1 != null) {
j1.cancel();
}
}
}
public Map<String, Object> runProcess(final Pipeline pl) {
//Where all source batch will be stored
final Map<String, BatchStage<Object>> allBatch = new HashMap<String, BatchStage<Object>>();
final Map<String, Object> data = // will get data from Pipeline
//Here we will get data from different datasources (jdbc, csv, text etc.) and it will be store in allBatch
((List<Map<String, Object>>)data.get("sources")).stream().forEach(z -> {
//jdbcSource where we get data from database
allBatch.put(z.get("id").toString(), pl.readFrom(jdbcSource));
});
//Here we will right the logic for the joining multiple datasources
((List<Map<String, Object>>)data.get("joins")).stream().forEach(z -> {
allBatch.put("result",new JoinData().join(data, allBatch.get(z.get("id1").toString()), allBatch.get(z.get("id2").toString()),pl,joinKeys));
});
}
}
Below is the inner join where I join to datasources:
public BatchStage<Object> JoinData() {
//Here is the logic for the inner joining
BatchStageWithKey<Object, String> jdbcGroupByKey = batch1.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData -> {
// gorup by join key
});
BatchStageWithKey<Object, String> csvGroupByKey = batch2.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData -> {
// gorup by join key
});
//Aggregate here
BatchStage<Entry<String, Tuple2<List<Object>, List<Object>>>> d = jdbcGroupByKey.aggregate2(AggregateOperations.toList(),csvGroupByKey,AggregateOperations.toList());
BatchStage<List<Object>> jdbcBatchStageData = d.map(e -> {
// joining
});
return jdbcBatchStageData;
}
And sometime joining multiple pipeline at same time gives below error:
2023-03-24 09:39:15,092 [ INFO] - processors=4, physical.memory.total=15.4G, physical.memory.free=7.3G, swap.space.total=0, swap.space.free=0, heap.memory.used=2.8G,
heap.memory.free=1.1G, heap.memory.total=3.9G, heap.memory.max=7.1G,
heap.memory.used/total=70.82%, heap.memory.used/max=38.70%, minor.gc.count=228,
minor.gc.time=14798ms, major.gc.count=5, major.gc.time=2766ms, load.process=0.00%,
load.system=0.00%, load.systemAverage=2.28, thread.count=84, thread.peakCount=119,
cluster.timeDiff=0, event.q.size=0, executor.q.async.size=0, executor.q.client.size=0,
executor.q.client.query.size=0, executor.q.client.blocking.size=0, executor.q.query.size=0,
executor.q.scheduled.size=0, executor.q.io.size=0, executor.q.system.size=0,
executor.q.operations.size=0, executor.q.priorityOperation.size=0,
operations.completed.count=5818629, executor.q.mapLoad.size=0, executor.q.mapLoadAllKeys.size=0,
executor.q.cluster.size=0, executor.q.response.size=0, operations.running.count=0,
operations.pending.invocations.percentage=0.00%, operations.pending.invocations.count=0,
proxy.count=10, clientEndpoint.count=0, connection.active.count=0, client.connection.count=0,
connection.count=0
Please help to improve performance of it.