Hazelcast Jet not removing Jobs/resources after completion

4 views
Skip to first unread message

Parth Bhatt

unread,
Jun 5, 2023, 4:06:58 AM6/5/23
to hazelcast-jet

I trying to join 2 data sources using Hazelcast Jet and once I start Hazelcast Jet instance I do not want to shutdown it as to start it every time when new job is submitted can be time consuming so I am not doing it. But after job completion I check that it is not getting removed from the list and may be still occupying memory, so my question is why it not removing jobs after completion and release memory. My code is as below:

 

UUID idOne = UUID.randomUUID();

DataFactory av = new DataFactory(idOne.toString());

av.buildJetInstance();

Map<String, Object> p = new HashMap<String, Object>();

p.putAll((Map<String, Object>) data.get("alldata"));

av.readMaptoJsonFile(p);

Pipeline pl = av.createPipeline();

av.runProcess(pl);

 

public class DataFactory implements Serializable {

 

public String uid;

public DataFactory(String uid) {

    this.uid = uid;

}

 

public JetInstance buildJetInstance() {

   

 

    JetConfig jetConfig = new JetConfig();

 

    jetConfig.getInstanceConfig().setCooperativeThreadCount(5);

    jetConfig.configureHazelcast(c -> {

        c.getNetworkConfig().setPort(9493);

        c.getNetworkConfig().setPublicAddress("localhost");

        c.getNetworkConfig().setPortAutoIncrement(true);

        c.getNetworkConfig().getJoin().getAutoDetectionConfig().setEnabled(false);

        c.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);

        c.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true).setMembers(Arrays.asList(new String[] {"localhost"}));

       

    });

    EtlObjects.jetInstance = Jet.newJetInstance(jetConfig);

    return EtlObjects.jetInstance;

}

 

public Pipeline createPipeline() {

    return Pipeline.create();

}

 

public void joinPipeToJet(Pipeline pl, String name) {

    JobConfig j = new JobConfig();

    j.setName(name);

    EtlObjects.jetInstance.newJob(pl,j).join();

}

 

public void readMaptoJsonFile(final Map<String, Object> data) {

    String jobid = UUID.randomUUID().toString();

    try {

        Pipeline pl = createPipeline();

        UUID idOne = UUID.randomUUID();

       

         final IMap<Object, Object> abc = EtlObjects.jetInstance.getMap(idOne.toString());

         abc.putAll(data);

       

         final BatchSource batchSource = Sources.map(abc);

        

                pl.readFrom(batchSource)

                    .writeTo(Sinks.map(this.uid));

                

        joinPipeToJet(pl, jobid);

         abc.destroy();

    } catch (Exception e) {

        Job j1 = EtlObjects.jetInstance.getJob(jobid);

        if (j1 != null) {

            j1.cancel();

        }

    } finally {

        //Even after completion I am trying to cancel it but it's not working and also removing

            System.out.println("Number of Jobs:" + EtlObjects.jetInstance.getJobs().size());

               Job j1 = EtlObjects.jetInstance.getJob(jobid);

                            if (j1 != null) {

                                j1.cancel();

                                j1.getFuture().cancel(true);

                                int z =0;

                                for (Job k : EtlObjects.jetInstance.getJobs()) {

                                    if (k.getName().equalsIgnoreCase(jobid)) {

                                         EtlObjects.jetInstance.getJobs().remove(z);

                                         z++;

                                    }

                                }

                                

 

                            }

    }

}

 

public Map<String, Object> runProcess(final Pipeline pl) {

   

    //Where all source batch will be stored

    final Map<String, BatchStage<Object>> allBatch = new HashMap<String, BatchStage<Object>>();

 

    final Map<String, Object> data = // will get data from Pipeline

   

   

    //Here we will get data from different datasources (jdbc, csv, text etc.) and it will be store in allBatch

    ((List<Map<String, Object>>)data.get("sources")).stream().forEach(z -> {

   

            //jdbcSource where we get data from database

        allBatch.put(z.get("id").toString(), pl.readFrom(jdbcSource));

   

    });

   

    //Here we will right the logic for the joining multiple datasources

    ((List<Map<String, Object>>)data.get("joins")).stream().forEach(z -> {

            allBatch.put("result",new JoinData().join(data, allBatch.get(z.get("id1").toString()), allBatch.get(z.get("id2").toString()),pl,joinKeys));

    });

   

   

   

}

}

 

 

Even if you check my code after completion I am trying to cancel it but it's not working and also removing. And when trying to cancel the future thread it is giving me below error:

java.lang.UnsupportedOperationException: This future can't be cancelled by an outside callerat com.hazelcast.jet.impl.util.NonCompletableFuture.cancel(NonCompletableFuture.java:40) Could anyone please help me why it this happening if I missing any configuration and how to cleanup after every job completion?

 

Reply all
Reply to author
Forward
0 new messages