Replacing StreamEventJournalP with a custom solution

14 views
Skip to first unread message

vladc...@gmail.com

unread,
Jul 12, 2019, 7:28:31 AM7/12/19
to hazelcast-jet
Hi,

We're constantly facing issues with StreamEventJournalP and jobs based on it. The main problem is that the event journal capacity is fixed and the capacity is evenly distributed across partitions. During peak situations it can easily happen in our case that one partition receives too many events and the journal overflows, leading to loss of events. Basically no capacity configuration is ideal then - it is either too small (for such situations) or too large in total. Besides, there seems to be no chance to handle the overflow events and notify users.

I've now written my own source processor which listens to map events, buffers items in MPSCQueue and then emits then. I'd like to know your opinion - how could this solution be worse than StreamEventJournalP? I understand that the message buffer won't be backed up by other nodes, but this will be covered by an extra process finding forgotten messages (a scheduled job calling IMap.localKeySet with a predicate).

Source processor:

private transient IMap<Object, T> map;
private transient MPSCQueue<T> resultsQueue;
private transient Traverser<T> traverser;
...

@Override
protected void init(@Nonnull Context context) {
  map = context.jetInstance().getMap(mapName);
map.addLocalEntryListener((EntryAddedListener<Object, T>) added -> resultsQueue.offer(added.getValue()));
resultsQueue = new MPSCQueue<>(null);
traverser = () -> resultsQueue.poll();
}

@Override
public boolean complete() {
emitFromTraverser(traverser);
return false;
}

Viliam Durina

unread,
Jul 12, 2019, 7:54:03 AM7/12/19
to vladc...@gmail.com, hazelcast-jet
You're right, the capacity of map journal is distributed evenly among partitions and if one partition overflows, there's no way to recover the items.

The issue with your solution is that it's not fault-tolerant, as you said. It's also lot more complex. Also you must not forget to remove the listener when the job is cancelled, otherwise the listener will keep adding items to the queue and eventually use up all memory. For stability, you could stop adding when the queue size reaches some threshold, but this way you're at where you were with journal - losing items.

Also your backup process will have to use a very heavy predicate using a not-in-set predicate. By the way, the same backup process can be used with map journal to handle the lost events...

Viliam

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/80692e8a-d0a7-4926-ab9f-e50562f1f203%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages