Batch job limit

12 views
Skip to first unread message

Lukáš Herman

unread,
Feb 27, 2019, 3:48:47 AM2/27/19
to hazelcast-jet
Hello Jet Team,
what is the recommended approach to limit Batch job size? Assuming that there is a large map with data to be processed, and batch job to process the data (transformation + enrichment). 
I would like to process the map data partially, in lets say first 1000 items. Is there any suitable mechanism for that?

With regards
Lukas Herman

Can Gencer

unread,
Feb 28, 2019, 2:23:44 AM2/28/19
to Lukáš Herman, hazelcast-jet
Hi Lukas,

If I understand correctly, you want to run a batch job as a map as a source, but don't want to process all the items, only a subset. Could this subset be anything or is it based on some predicate? If it could be anything then I don't see a way of doing this without writing a custom source or processor. The way a map source is works is that first partitions are distributed between each member's processors (so if a member owns 10 partitions, they will be distributed between its processors), and then it creates a partition iterator for each partition. Afterwards they're iterated in a round robin fashion so that we don't emit all data from partition 1, then partition 2 but instead the partition data is interleaved.

For your use case, you could tweak the existing source and just read a few items from each partition and then finish rather than iterating them all to the end.  The source for map/cache is implemented in ReadWithPartitionIteratorP and the method to create a partition iterator is internal only available in MapProxyImpl rather than IMap , so it could potentially change between releases however there aren't any planned changes there for the next release. The requirements that map needs to be stable / not changing during iteration still applies. You are also of course free to send a PR to add maxItems or similar parameter to ReadWithPartitonIteratorP.readMapSupplier as it sounds like it would be a useful feature.

Does this answer your question?


--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/995d2e7f-a172-431e-9b97-14f599c02526%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Lukáš Herman

unread,
Feb 28, 2019, 3:36:50 AM2/28/19
to hazelcast-jet
Hi Can,
thanks for detailed explanation. 
So, knowing the internals, the most precise solution for now is to prepare a temporary map containing data to be processed as a subset/snapshot of large map, then apply batch job on that map. 
It does not make much sense to implement ReadWithPartitionIteratorP changes, especially when resulting map needs to be stable, which is the main limiting factor in transition from stream to batch processing. 

Regards
Lukas


Dne čtvrtek 28. února 2019 8:23:44 UTC+1 Can Gencer napsal(a):

Can Gencer

unread,
Feb 28, 2019, 4:24:46 AM2/28/19
to Lukáš Herman, hazelcast-jet
Hi Lukas,

Yes, that would be the most straightfoward solution for you. I asked my colleagues what was the most efficient way to get 1000 random entries from a map and the best still seems to be use the internal partition iterator I described above to get some entries from each partition. Another option is to use IMap.localKeySet() but that only works  member-side and will returns entries from only one member.

Lukáš Herman

unread,
Feb 28, 2019, 7:05:16 AM2/28/19
to hazelcast-jet
Hi Can,
so far I can see the following is not allowed:

Predicate<Long, Long> limitPredicate = new PagingPredicate<>(filterPredicate, batchSize);
BatchSource<Map.Entry<Long, Long>> source = Sources.map(exportMap, limitPredicate, Projections.identity());

which fails with:
java.lang.IllegalArgumentException: Paging predicate is not allowed when iterating map by query
at com.hazelcast.map.impl.proxy.MapProxyImpl.iterator(MapProxyImpl.java:857)
at com.hazelcast.jet.impl.connector.ReadWithPartitionIteratorP.lambda$null$3(ReadWithPartitionIteratorP.java:127)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

So the solution would be to extract page size from PagingPredicate and apply it to internal iterator, right? 

Lukas

Dne čtvrtek 28. února 2019 10:24:46 UTC+1 Can Gencer napsal(a):

Can Gencer

unread,
Feb 28, 2019, 8:28:45 AM2/28/19
to Lukáš Herman, hazelcast-jet
Yes, you shouldn't use paging predicate but do it yourself using the iterator. Something like (also can be parallelized)

        IMap<Integer, Integer> map = jet.getHazelcastInstance().getMap("map");
        int minItems = 1000;
        int partitionCount = jet.getHazelcastInstance().getPartitionService().getPartitions().size();
        int itemsPerPartition = (int) Math.ceil(minItems  / (double)partitionCount);
        MapProxyImpl<Integer, Integer> mapProxy = (MapProxyImpl<Integer, Integer>) map;
        Map<Integer, Integer> subset = new HashMap<>();
        for (int i = 0; i < partitionCount; i++) {
            Iterator<Entry<Integer, Integer>> iterator = mapProxy.iterator(itemsPerPartition, i, false);
            for (int j = 0; j < itemsPerPartition && iterator.hasNext(); j++) {
                Entry<Integer, Integer> e = iterator.next();
                subset.put(e.getKey(), e.getValue());
            }
        }



Reply all
Reply to author
Forward
0 new messages