Hazelcast Jet multiple datasource join slow down machine and high memory usage

19 views
Skip to first unread message

Parth Bhatt

unread,
Mar 27, 2023, 8:03:41 AM3/27/23
to Hazelcast
Hi Teams,

I have create a spring boot application in which we are joining multiple datasources using Hazelcast Jet for small datasource it is working very good but as datasource records increases above 1 million it start occupying to much memory and memory leakage also not sure what is wrong am if someone can help to increase joining speed and low memory usage. I have allocated 8gb ram memory to my application.

Below is the code to call Pipeline:

    UUID idOne = UUID.randomUUID();
    DataFactory av = new DataFactory(idOne.toString());
    av.buildJetInstance();
    Map<String, Object> p = new HashMap<String, Object>();
    p.putAll((Map<String, Object>) data.get("alldata"));
    av.readMaptoJsonFile(p);
    Pipeline pl = av.createPipeline();
    av.runProcess(pl);
   
    public class DataFactory implements Serializable {
   
    public String uid;
    public DataFactory(String uid) {
    this.uid = uid;
    }
   
    public JetInstance buildJetInstance() {
   
   
            JetConfig jetConfig = new JetConfig();
   
            jetConfig.getInstanceConfig().setCooperativeThreadCount(5);
            jetConfig.configureHazelcast(c -> {
                c.getNetworkConfig().setReuseAddress(true);
                c.setClusterName("DATA" + UUID.randomUUID().toString());
                c.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true).setMembers(Arrays.asList(new String[] {"localhost"}));
               
            });
            EtlObjects.jetInstance = Jet.newJetInstance(jetConfig);
    return EtlObjects.jetInstance;
        }
   
    public Pipeline createPipeline() {
    return Pipeline.create();
    }
   
    public void joinPipeToJet(Pipeline pl, String name) {
    JobConfig j = new JobConfig();
    j.setName(name);
    EtlObjects.jetInstance.newJob(pl,j).join();
    }
   
    public void readMaptoJsonFile(final Map<String, Object> data) {
    String jobid = UUID.randomUUID().toString();
    try {
    Pipeline pl = createPipeline();
    UUID idOne = UUID.randomUUID();
   
     final IMap<Object, Object> abc = EtlObjects.jetInstance.getMap(idOne.toString());
     abc.putAll(data);
   
     final BatchSource batchSource = Sources.map(abc);
     
            pl.readFrom(batchSource)
                .writeTo(Sinks.map(this.uid));
           
    joinPipeToJet(pl, jobid);
     abc.destroy();
    } catch (Exception e) {
    Job j1 = EtlObjects.jetInstance.getJob(jobid);
    if (j1 != null) {
    j1.cancel();
    }
    } finally {
    Job j1 = EtlObjects.jetInstance.getJob(jobid);
    if (j1 != null) {
    j1.cancel();
    }
    }
    }
   
    public Map<String, Object> runProcess(final Pipeline pl) {
   
    //Where all source batch will be stored
    final Map<String, BatchStage<Object>> allBatch = new HashMap<String, BatchStage<Object>>();
   
    final Map<String, Object> data = // will get data from Pipeline
   
   
    //Here we will get data from different datasources (jdbc, csv, text etc.) and it will be store in allBatch
    ((List<Map<String, Object>>)data.get("sources")).stream().forEach(z -> {
   
    //jdbcSource where we get data from database
    allBatch.put(z.get("id").toString(), pl.readFrom(jdbcSource));
   
    });
   
    //Here we will right the logic for the joining multiple datasources
    ((List<Map<String, Object>>)data.get("joins")).stream().forEach(z -> {
    allBatch.put("result",new JoinData().join(data, allBatch.get(z.get("id1").toString()), allBatch.get(z.get("id2").toString()),pl,joinKeys));
    });
   
   
   
    }
    }
Below is the inner join where I join to datasources:

    public BatchStage<Object> JoinData() {
   
    //Here is the logic for the inner joining
   
     BatchStageWithKey<Object, String> jdbcGroupByKey = batch1.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData ->  {
    // gorup by join key
                });
    BatchStageWithKey<Object, String> csvGroupByKey = batch2.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData ->  {
    // gorup by join key
                });
   
    //Aggregate here
    BatchStage<Entry<String, Tuple2<List<Object>, List<Object>>>> d = jdbcGroupByKey.aggregate2(AggregateOperations.toList(),csvGroupByKey,AggregateOperations.toList());
   
    BatchStage<List<Object>> jdbcBatchStageData = d.map(e -> {
    // joining
    });
   
    return jdbcBatchStageData;
   
    }
   
And sometime joining multiple pipeline at same time gives below error:

        2023-03-24 09:39:15,092 [ INFO] - processors=4, physical.memory.total=15.4G, physical.memory.free=7.3G, swap.space.total=0, swap.space.free=0, heap.memory.used=2.8G,
    heap.memory.free=1.1G, heap.memory.total=3.9G, heap.memory.max=7.1G,
    heap.memory.used/total=70.82%, heap.memory.used/max=38.70%, minor.gc.count=228,
    minor.gc.time=14798ms, major.gc.count=5, major.gc.time=2766ms, load.process=0.00%,
    load.system=0.00%, load.systemAverage=2.28, thread.count=84, thread.peakCount=119,
    cluster.timeDiff=0, event.q.size=0, executor.q.async.size=0, executor.q.client.size=0,
    executor.q.client.query.size=0, executor.q.client.blocking.size=0, executor.q.query.size=0,
    executor.q.scheduled.size=0, executor.q.io.size=0, executor.q.system.size=0,
    executor.q.operations.size=0, executor.q.priorityOperation.size=0,
    operations.completed.count=5818629, executor.q.mapLoad.size=0, executor.q.mapLoadAllKeys.size=0,
    executor.q.cluster.size=0, executor.q.response.size=0, operations.running.count=0,
    operations.pending.invocations.percentage=0.00%, operations.pending.invocations.count=0,
    proxy.count=10, clientEndpoint.count=0, connection.active.count=0, client.connection.count=0,
     connection.count=0

   
Please help to improve performance of it.
    
Reply all
Reply to author
Forward
0 new messages