Cluster sizing

87 views
Skip to first unread message

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 5:00:19 AM8/20/20
to hazelcast-jet
Hi,
We have a data pipeline which consumes from kafka and sinks it to a hz map with incoming rate of 100k messages per second. One message is around 100 byte. When it runs as a single jet job with 5 different instances(all of them are independent and we do not care about lost messages)
Recently we start to run it on cluster mode with a cluster size of 10 thinking that this will make rolling upgrades possible and will be more fault tolerant. Anyway, we have seen latencies when we migrate to cluster mode. I know the question is too high level but is it so naive to think that migrating to parallel should not lead to any latency? Is there any overhead introduced by cluster mode?

Marko Topolnik

unread,
Aug 20, 2020, 5:10:29 AM8/20/20
to hazelcast-jet
How much latency did you see? Generally speaking, when you add another node to what was a single-node cluster, the pipeline will rescale so it uses both nodes. There's a one-time latency associated with rescaling. After that, each Jet node should be consuming from half of the Kafka partitions and operating pretty much independently from the other node. However, if you also have an aggregation stage in the pipeline, that involves sending some of the data to the other node, so that all the items with the same grouping key are processed at the same node. This should amount to the latency of a single network hop, typically < 10 ms.

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 5:30:28 AM8/20/20
to hazelcast-jet
Thanks for the quick response. The latencies we observe after migration is in the degrees of minutes, which makes us think we are doing something wrong. We have no aggregation stage, the logic is quite simple : put the incoming elements to hashmap and sink it to hazelcast map once in every 200ms, Before the migration, we dedicated a whole instance to a very crowded topic and now all share of load is done under the hood by jet. For instance when you say half of the kafka partitions will be taken by one member and the other half by the other member, does this scenario take the load of the topics into account? How is the load share logic? 
Before migration lets say we had two instances(as an example) one reads from 99 light topic and the other one reads from 1 very busy topic. With cluster mode can it be possible each member takes 50 member and that can cause latency in the busy one? 

Marko Topolnik

unread,
Aug 20, 2020, 5:39:55 AM8/20/20
to hazelcast-jet
Why do you need the intermediate hashmap? Do you use it in a mapStateful stage? The natural way would be to just push it directly to the IMap sink. If you want to minimize the traffic to the IMap by coalescing events with duplicate keys, then I think a window(tumbling(200)).aggregate(toMap()) would do it, followed by a flatmapping stage that unpacks the map into a stream of entries. But I'm not sure if that's really necessary.

As for the load balancing, a single Jet source takes from a single Kafka topic. If you have several Kafka sources in the same Jet job, all the topics will be spread across all the nodes. This means the heavy topic will be balanced across nodes just as all others.

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 5:50:05 AM8/20/20
to hazelcast-jet
Yes, the idea of the hashmap is to minimize the traffic because there is a lot of duplicate keys and we want to only write it to Imap once. I will investigate the aggregation of tumbling window. Thanks for the tip.
If load balancing does not take the traffic into account but the number of topics, then it can be possible for cluster mode to introduce some latencies I think. With non cluster mode, we manually assign topics to different instances like assigning dozens of different light topics to one instance and one very busy topic to a dedicated instance. With the cluster mode, it might very well be to end up with unbalanced instances.

Marko Topolnik

unread,
Aug 20, 2020, 5:55:11 AM8/20/20
to hazelcast-jet
If load balancing does not take the traffic into account but the number of topics, then it can be possible for cluster mode to introduce some latencies I think. With non cluster mode, we manually assign topics to different instances like assigning dozens of different light topics to one instance and one very busy topic to a dedicated instance. With the cluster mode, it might very well be to end up with unbalanced instances.

With manual assignment, the busy topic ends up handled by a single node. With a multinode cluster handling everything, the busy topic will be handled by the whole cluster. At least as far as balancing is concerned, it will be in perfect balance. You might argue that there will be Kafka consumer threads for the low-traffic topics that are underutilized, but with a couple dozen Java threads sitting in a PARKED state, it doesn't immediately smell like trouble to me.

Can Gencer

unread,
Aug 20, 2020, 7:36:43 AM8/20/20
to Marko Topolnik, hazelcast-jet
Hi Fatih,

Could you give some indication of your pipeline? When you say you put items into hashMap, what operation are you using? Is it statefulMap?

Also, when you're reading from multiple kafka topics, are you using a different source stage for each or combining multiple into one source stage?

On Thu, Aug 20, 2020 at 11:55 AM Marko Topolnik <marko.t...@gmail.com> wrote:

If load balancing does not take the traffic into account but the number of topics, then it can be possible for cluster mode to introduce some latencies I think. With non cluster mode, we manually assign topics to different instances like assigning dozens of different light topics to one instance and one very busy topic to a dedicated instance. With the cluster mode, it might very well be to end up with unbalanced instances.

With manual assignment, the busy topic ends up handled by a single node. With a multinode cluster handling everything, the busy topic will be handled by the whole cluster. At least as far as balancing is concerned, it will be in perfect balance. You might argue that there will be Kafka consumer threads for the low-traffic topics that are underutilized, but with a couple dozen Java threads sitting in a PARKED state, it doesn't immediately smell like trouble to me.

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/698b126f-3eb9-44c2-8865-f7d447d81281n%40googlegroups.com.

This message contains confidential information and is intended only for the individuals named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmission cannot be guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender therefore does not accept liability for any errors or omissions in the contents of this message, which arise as a result of e-mail transmission. If verification is required, please request a hard-copy version. -Hazelcast

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 8:25:55 AM8/20/20
to hazelcast-jet
Hi Can,

We use standard Java Concurrent Hashmap. We just wrote a map supplier by first putting values in a map and then pushing it to Hz server just to reduce traffic. This might not be the ideal way of doing things but I do not think this is the reason why parallel version is slower because this map is there in cluster mode and also non cluster mode.

We are combining all topics into one source stage.

I can see partitions are not assigned evenly in terms of the traffic they receive. For instance one member receives X amount of messages where as the other receives 2X amount of messages and the one who receives 2X amount of messages is the laggy one. If we partition the topics more, this will eventually lead to more uniform distribution of the messages.Current distribution is very far from ideal, to give you an idea we read from 40 different topics and 1 topic is responsible for 50% of the traffic and the second biggest one is responsible for the 5% and there are many others less than 1%. The biggest topic(the one responsible for 50% of the traffic) has 10 partitions and the others 3 or 1. So that makes it very likely to end up with skewed distribution of load and causes latencies in the hotspots. So we can make it faster by adding making it more uniform by adding more partitions. But still I can not answer how is this possible that the non cluster mode, with one single instance who receives like 5X amount of messages is faster than the parallel version who consumes only 2X?

Thanks 

Can Gencer

unread,
Aug 20, 2020, 8:31:11 AM8/20/20
to fatih.ar...@gmail.com, hazelcast-jet
Hi Fatih,

I didn't quite understand what you mean by putting things into Java Concurrent HashMap. How does your pipeline look like? Is it based on some time-window? What timestamps are you using?

Regarding topics/partitions, from Jet's perspective, it's equivalent to have 100 topics with 1 partition or 1 topic with 100 partitions, they'll be all distributed evenly across the cluster.

One thing I can think of is that if one of the topic-partitions doesn't have any data, Jet will consider that partition "idle" however this normally takes 60 seconds as there's a grace period before it's considered idle. So it may look like timestamps don't advance in the cluster while this partition isn't idle. You can reduce this timeout at the source by changing the StreamSource.setPartitionIdleTimeout

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 8:47:43 AM8/20/20
to hazelcast-jet
The pipeline is very simple : 

pipeline.drawFrom(alltopics)
.withIngestionTimestamps()
.map(entry -> {
return Util.entry(entry.key(), new HazelcastBytesHolder(entry.value()));
})
.drainTo(CustomClass.getBuffer());

So we are using ConcurrentHashMap as a buffer by putting streaming elements and push them once in every 200 milisecond by using Imap.putAll(concurrentMap).

So if one partition does not receive any data for 60 seconds, this partition is considered idle by jet. But latency is seen in the very busy partition which continously receives data and is never idle. If there is an idle topic in the kafka topic sources, is it possible that this idle topic introduces latency in some other topic?

Marko Topolnik

unread,
Aug 20, 2020, 8:56:44 AM8/20/20
to hazelcast-jet
Try writing

StreamStage<T> lowTraffic = pipeline.drawFrom(lowTrafficTopics);
StreamStage<T> highTraffic = pipeline.drawFrom(highTrafficTopic);
StreamStage allTraffic = lowTraffic.merge(highTraffic);

That should ensure that the high-traffic topic is distributed evenly.

Also, I'm worried why you're using a ConcurrentHashMap instead of a plain one. If you allow concurrent updates to the map, you'll get reordering, which means you can lose the most recent update. The sink should have a total parallelism of one, in which case you don't need a ConcurrentHashMap.

Marko Topolnik

unread,
Aug 20, 2020, 9:04:22 AM8/20/20
to hazelcast-jet
In fact, even on a single member your pipeline will reorder items because you have a stateless mapping stage in there. If you use keyed aggregation toMap and a regular IMap sink, it should prevent these problems.

Can Gencer

unread,
Aug 20, 2020, 9:06:49 AM8/20/20
to fatih.ar...@gmail.com, hazelcast-jet
Hi Fatih,

Jet already uses putAll internally when writing to a map. It doesn't write items one by one. It batches items internally as they arrive.

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 9:11:11 AM8/20/20
to hazelcast-jet
Thanks Marko. Actually I consider all this using concurrent map as a buffer as a technical debt. It was some piece of code we wrote in the initial stage of the project. It should ideally be changed inline with your suggestions.

But still when we have single instance with 5x traffic consuming from 1 single topic with 10 partitions does not cause any latency, where as parallel version with 10 members and each member consumes between x and 2x introduces latency. I can not explain that.

Can Gencer

unread,
Aug 20, 2020, 9:26:24 AM8/20/20
to fatih.ar...@gmail.com, hazelcast-jet
It might be just that the put operations are too big perhaps and you end up with large contention between all the members trying to write to the same partitions. Are you writing the results to Jet or to another Hazelcast somewhere?

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 9:38:40 AM8/20/20
to hazelcast-jet
We are writing results to a hazelcast map. Actually each member if the cluster writes to the same remote map. It might make sense all members might try to write to the same partition of the map. We use the same key in kafka and hazelcast map and we partititon kafka topic with this key. In that sense, different members of the cluster might write to the same partitions I think. 

In that case, what would be your suggestion? Increase the batch size so that we can put more elements to the map in one put operation? Or shall we increase partition number?

Can Gencer

unread,
Aug 20, 2020, 11:08:40 AM8/20/20
to fatih.ar...@gmail.com, hazelcast-jet
If you use Jet to write to map directly (i.e. using Sinks.remoteMap()), starting from 3.2 Jet will partition the keys before writing - so only one member will be writing to one partition at a time. Have you tried using Jet to write to the target map instead of your own custom logic?

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 11:28:19 AM8/20/20
to hazelcast-jet
Actually we use jet 3.0. So I think multiple members can write to the same partition I guess. 

For the custom logic before writing to the map, as far as I know it used to be directly sinking to the map without any additional logic. That caused excessive number of operations in hazelcast side(Pipeline receives 100K message per second) and that is why hazelcast subscribers could not subscribe to the data because of the load in the server. That part of the story is not experienced by me, but heard that s what happened. As a solution(pretty bad one), it was decided to put them to a buffer and sink with regular intervals. But I see that throttling might be also possible by tumbling window, which seems a lot better option.

But do you think this buffer logic might create latency in parallelization?


Can Gencer

unread,
Aug 20, 2020, 11:34:56 AM8/20/20
to fatih.ar...@gmail.com, hazelcast-jet
Hi Fatih,

Yes in older versions each map sink worker (i.e. processor) would write to all partitions. This was changed in 3.2 to reduce the pressure & number of ops. Some other optimizations were done such as using async calls. Having many different independent Jet instances would make the problem worse because each will fan out to all the partitions. Having a single Jet cluster writing should limit the number of operations in flight significantly.

BTW you can forget about what I said about idling, this is only relevant if you're using some time-based logic such as windowing.

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 5:14:44 PM8/20/20
to hazelcast-jet
If I understand correctly, in versions older than 3.2, having multiple sink workers might introduce latency because each will fan out to all partitions.
in version 3.0, it might be possible that single instance might be faster than multiple instances?

fatih.ar...@gmail.com

unread,
Aug 20, 2020, 6:02:29 PM8/20/20
to hazelcast-jet
@Marko, I changed the throttling like this :



pipeline.drawFrom(alltopics)
.withIngestionTimestamps()
.map(entry -> {
return Util.entry(entry.key(), new HazelcastBytesHolder(entry.value()));
})
.window(WindowDefinition.tumbling(200))
.aggregate(AggregateOperations.toMap(Entry::getKey, Entry::getValue))
.flatMap(windowedMap -> Traversers.traverseIterator(windowedMap.result().entrySet().iterator()))
.drainTo(Sinks.remoteMap(properties.getSinkMap(), clientConfig));;

so I removed the custom throttling. That s how you recommend, right?

Marko Topolnik

unread,
Aug 21, 2020, 4:39:09 AM8/21/20
to fatih.ar...@gmail.com, hazelcast-jet
Yes, that's pretty much what i had in mind, but if you use keyed aggregation it should be better because it would be parallellized. In that case you could use this kind of aggregate operation:

AggregateOperation
        .withCreate(MutableReference<T>::new)
        .<T>andAccumulate((acc, item) -> {
            acc.set(item);
        })
        .andCombine((acc1, acc2) -> {
            if (acc1.isNull()) acc1.set(acc2.get());
        })
        .andExportFinish(MutableReference::get);


This still has the problem that combining loses the ordering. Combining happens when all the members send their result to a single member when the window is complete. You may solve this by keeping the event timestamp as well, then you can decide which one to keep.

If you upggrade to Jet 4.2, you can also use the rebalance() operator in front of the aggregation. It makes the pipeline aggregate each key on one node without the need for combining.

Can Gencer

unread,
Aug 21, 2020, 5:24:51 AM8/21/20
to Marko Topolnik, fatih.ar...@gmail.com, hazelcast-jet
You can tweak the AggregateOperations.pickAny() as Marko suggested to "pickLatest", do a grouping before the aggregation then you can skip the flatMapping stage as well.


fatih.ar...@gmail.com

unread,
Aug 21, 2020, 6:56:38 AM8/21/20
to hazelcast-jet

.aggregate(AggregateOperation.withCreate(MutableReference<Entry<Long, HazelcastBytesHolder>>::new)
.<Entry<Long, HazelcastBytesHolder>>andAccumulate((acc, item) -> {

acc.set(item);
})
.andCombine((acc1, acc2) -> {
if (acc1.isNull()) {
acc1.set(acc2.get());
}
})
.andExportFinish(MutableReference::get))
.map(WindowResult::result)
.drainTo(Sinks.remoteMap(properties.getSinkMap(), clientConfig));

In that case I still need to map it to windowresult, right?

One more thing, to see the event time stamp, I need to deserialize the data which might also increase latency. We want to keep logic as light as possible because this is 100K messages per second. We just want to push data continously.

Marko Topolnik

unread,
Aug 21, 2020, 7:47:38 AM8/21/20
to hazelcast-jet

.aggregate(AggregateOperation.withCreate(MutableReference<Entry<Long, HazelcastBytesHolder>>::new)
.<Entry<Long, HazelcastBytesHolder>>andAccumulate((acc, item) -> {
acc.set(item);
})
.andCombine((acc1, acc2) -> {
if (acc1.isNull()) {
acc1.set(acc2.get());
}
})
.andExportFinish(MutableReference::get))
.map(WindowResult::result)
.drainTo(Sinks.remoteMap(properties.getSinkMap(), clientConfig));

In that case I still need to map it to windowresult, right?

You'll need the key as well, the result type of the keyed aggregation should be KeyedWindowResult.
 
One more thing, to see the event time stamp, I need to deserialize the data which might also increase latency. We want to keep logic as light as possible because this is 100K messages per second. We just want to push data continously.

I think you won't actually be getting the same key on two nodes due to Kafka partitioning. If that is the case, you shouldn't need combine to begin with. You can verify that by implementing combine to throw an exception.

Message has been deleted

fatih.ar...@gmail.com

unread,
Aug 26, 2020, 6:46:40 PM8/26/20
to hazelcast-jet
Hi,
After making a few adjustments, I see improvements but it is still not as fast as expected. 
pipeline.drawFrom(multipleKafkaTopics)
.map(entry-> {
long t1 = System.currentTimeMillis();
return Util.entry(entry.key, new ObjectHolder(entry.getValue(), t1)); //timestamp entry
})
.groupingKey(Entry::getKey())
.window(WindowDefinition.tumbling(200))
.aggregate(AggregateOperations.pickAny()
.map(windowed -> {
Entry<Long, ObjectHolder> result = windowed.result();
long timeSpent= System.currentTimeMillis() - result.getValue().getTime; //t2-t1 total spent time 
if(timeSpent> 1000){
LOGGER.warn("detected latency {}", l);
}
return result;})
.drainTo(Sinks.remoteMap(properties.getSinkMap(), clientConfig));
 
---> I changed the throttling as you described, which should be the correct way I guess. I group them based on the keys and select a random entry in an interval of 200 ms and only write it to map. Interestingly, latency starts from 20 secs and converges to 1 sec with this way. 
It should not be more than 200 ms , right? Am I missing somethiong?
Reply all
Forward

Marko Topolnik

unread,
Aug 27, 2020, 3:11:47 AM8/27/20
to hazelcast-jet
I'll try to use the details you provided so far to construct a pipeline and test it in a cluster.

fatih.ar...@gmail.com

unread,
Aug 27, 2020, 3:33:51 AM8/27/20
to hazelcast-jet
That will be great.

It is a very simple application, where jet reads from key value pairs from kafka and pushes only one key value pair which belongs to the same 200ms interval.

fatih.ar...@gmail.com

unread,
Aug 28, 2020, 7:27:02 AM8/28/20
to hazelcast-jet
Hi Marko,

remoteMap method allows to sink an entry to a remote map. Do you think agregating entries in the stream to a map and sinking this map to the remote map is possible? If it is possible, do you think would it ever give performance improvement?

Marko Topolnik

unread,
Aug 28, 2020, 11:29:39 AM8/28/20
to hazelcast-jet
The sink itself has batching internally and when you use .groupingKey(keyFn).aggregate(pickAny()), you get deduplication. Based on that I don't expect an improvement if you would do non-keyed aggregation, but using toMap(), it would just force the aggregation to be done on a single node, with a single thread.

I have not yet managed to run the benchmark I mentioned yesterday, I must continue on Monday.

fatih.ar...@gmail.com

unread,
Aug 30, 2020, 8:29:14 AM8/30/20
to hazelcast-jet
Hi Marko,
Thanks for the great tips. This is the final version of the pipeline :

pipeline.drawFrom(kafkaSourceFactory.<Long, byte[]>createKafkaSource(properties, topicsArray))
.withIngestionTimestamps()
.map(entry -> {
return Util.entry(entry.key(), new TimedValue<>(new HazelcastBytesHolder(entry.value()),
System.currentTimeMillis()));
})
.groupingKey(Entry::getKey)
.window(WindowDefinition.tumbling(200))
.aggregate(ThrottleOperations.pickLatest())
.map(windowed -> {
long timeSpent = System.currentTimeMillis() - windowed.result().getValue().getTimeStamp();
trackServiceMeanLatency(timeSpent);
trackServiceMaxLatency(timeSpent);
return Util.entry(windowed.result().getKey(), windowed.result().getValue().getValue());
})
.drainTo(Sinks.remoteMap(properties.getSinkMap(), clientConfig));

Basically it does create an Entry by time stamping and groups by key, windows and aggregates them to eliminate duplicates and drains to the remote map. Interestingly. throughput enormously increased when parallelizing the pipeline but latency became worse. For an example, as seen I time stampped the entries at the beginning of the pipeline and at the end. For some entries, the time spent between arrivals and sinks reaches upto 60 seconds. It does not happen all the time but happens randomly and frequently. I use  
 max.poll.records=50000, max.partition.fetch.bytes=10485760 and fetch.max.bytes=100428800. I wonder why is 60 seconds spent to process the items down the pipeline? I see that kafka consumer fetcher lag is nearly zero, so I guess kafka does not need any repartioning. Do you have any guess why 60 seconds might be spent inside the pipeline?
Can asked if pipeline creates a different source stage or using the same source stage. Do you think creating different source stages and merging them wil improve performance?

Marko Topolnik

unread,
Aug 30, 2020, 2:31:55 PM8/30/20
to fatih.ar...@gmail.com, hazelcast-jet
It does not happen all the time but happens randomly and frequently.

This sounds superficially consistent with GC pauses, do you have GC logging?

fatih.ar...@gmail.com

unread,
Aug 31, 2020, 3:46:06 AM8/31/20
to hazelcast-jet
Yes. I checked the gc metrics, it seems gc does not spend more than miliseconds and also JVM parameter is set to -XX:MaxGCPauseMillis=200.

The latencies are typically around 60 seconds with peaks, I think 60 seconds is a bit too much to result from GC.

Marko Topolnik

unread,
Aug 31, 2020, 8:50:56 AM8/31/20
to fatih.ar...@gmail.com, hazelcast-jet
Can you please join https://slack.hazelcast.com, this thread has got too long?

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.

Can Gencer

unread,
Aug 31, 2020, 2:00:54 PM8/31/20
to fatih.ar...@gmail.com, hazelcast-jet
I wrote you also on Slack, but this is almost certainly due to the partition idle timeout which is 60 secs by default. If you have a topic partition that is not producing any messages for 60 secs, Jet will wait for those topics by default to product something, so the global event time can be advanced.

You can change the partition idle timeout at the source level using the setPartitionIdleTimeout option.

The 200ms latency you see otherwise is because of the tumbling window size - if you want to reduce latency you can use a smaller window size.

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages