Triage performance problems: ConcurrentInboundEdgeStream.drainTo

23 views
Skip to first unread message

Lukáš Herman

unread,
Sep 20, 2018, 2:58:36 AM9/20/18
to hazelcast-jet
Hello Jet Team,
I am currently trying to diagnose performance issues with Jet, basically getting 6x lower than expected throughput. Most of the metrics are good so far, getting 30k Hazelcast operations per second on each node, but average on 80-95% CPU consumption, with most of the time (64%) spent in ConcurrentInboundEdgeStream.drainTo method, according to Flight Recorder. I have also noticed that Vertex invocation per second rate is quite high, in 15-30k range, which does not correspond with number of Vertex invocations needed to complete a business transaction (30 on avg). 
Could you please suggest some places in the code to collect additional metrics, which will help me find the root cause? We are using DAG/AbstractProcessors almost exclusively.

With regards
Lukas Herman

Can Gencer

unread,
Sep 20, 2018, 7:50:36 AM9/20/18
to Lukáš Herman, hazelcast-jet
Hi Lukas,

Are you able to provide more details, for example how many vertices and is it a single vertex that's getting executed 15k-30k times, and if these are cooperative or non-cooperative processors? Typically, Jet repeatedly executes each processor, but if they did no work in a certain round then some exponential backoff is applied. For cooperative processors, several of them are run on same thread, so the backoff is only applied if none of them had any work to do.

Are you using 0.6.1 or 0.7? 

Lukáš Herman

unread,
Sep 20, 2018, 9:34:57 AM9/20/18
to hazelcast-jet
Hi Can,
We are using 0.6.1, cca 160 vertices, mostly cooperative ones. We have reviewed the metrics collected, it generates 30k vertex executions across the cluster, cca 10k per node. 
So I have set up additional metrics, especially in places where Processor.tryProcess returns false. It helped us to detect slow Vertex implementation.
What exactly is happenning when tryProcess returns false repeatedly? Is the execution engine trying to work somewhere else when some part of the flow is overloaded? 
Is this case covered by 0.7 metrics? 

Regards
Lukas

Dne čtvrtek 20. září 2018 13:50:36 UTC+2 Can Gencer napsal(a):

Can Gencer

unread,
Sep 21, 2018, 11:58:27 AM9/21/18
to Lukáš Herman, hazelcast-jet
Hi Lukas,

How many instances of each Vertex is running, and how many cores/threads you have on each node? I'm trying to see how many processor instance you would have per actual core.

When you return false from tryProcess() it means that you haven't finished processing an item. Typically this will be caused by an output queue being full (i.e. tryEmit() returning false) This is used to apply backpressure so that you can be called at a later time. When you return true, it will advance to the next item. This is how the system tries to slow upstream vertices when an output queue is full.



--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/e7ba9907-f76f-4a47-ba43-d6838713c885%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Lukáš Herman

unread,
Sep 23, 2018, 1:33:45 PM9/23/18
to hazelcast-jet
Hi Can,
so after many hours of analysis and experiments we have found that the root cause is that the cooperative threads are being blocked due to processors not exactly following the Processor state model. This is especially tricky area to master, every ms wasted there has devastating impact on the processing throughput. While most of our code is blocking, we need to have a reliable way how to pass the data between cooperative and blocking code the most efficient way. Our team has come up with an acceptable solution so far based on MPSC queues, but it is not perfect obviously - from time to time it simply takes too much time on cooperative thread to offer or take items. We will focus on this area in the following weeks.  
Regards
Lukas 

Dne pátek 21. září 2018 17:58:27 UTC+2 Can Gencer napsal(a):

c...@hazelcast.com

unread,
Sep 25, 2018, 11:35:58 AM9/25/18
to hazelcast-jet
Is the communication between your cooperative and blocking code bidirectional? Jet uses SPSC queues between two vertices, but obviously the flow is one directional.

Also if you enable trace logging for "TaskletExecutionService" it should warn for calls to cooperative processors that took more than 5ms. It sounds like a good idea to make this a more explicit feature, as it's useful for debugging.

Lukáš Herman

unread,
Sep 26, 2018, 4:26:22 AM9/26/18
to hazelcast-jet
Hi Can,
currently we are experimenting with some code variants. The communication from cooperative to blocking is naturally one-to-one, but blocking back to cooperative is many-to-one in our case. 
So the code basically goes through process -> map.getAsync/map.submitToKey -> (async emit multiple items to MPSC queue). Cooperative complete and tryProcess then emit items from MPSC queue to the outbox.
We closely monitor TaskletExecutionService warnings, we have implemented our own warnings inside processors to better identify slow parts. It basically works as expected, but sometimes TaskletExecutionService throws slow call warnings even when the system is idle, which is strange. Could it be caused by garbage collector? 

Regards
Lukas


Dne úterý 25. září 2018 17:35:58 UTC+2 c...@hazelcast.com napsal(a):

Can Gencer

unread,
Sep 27, 2018, 7:35:40 AM9/27/18
to Lukáš Herman, hazelcast-jet
Hi Lukas,

We have a similar design when we write to snapshots which is also done async. However the difference is of course that we don't collect the response and send it forward. You can have a look to at this class https://github.com/hazelcast/hazelcast-jet/blob/master/hazelcast-jet-core/src/main/java/com/hazelcast/jet/impl/util/AsyncSnapshotWriterImpl.java for some inspiration perhaps. It applies backpressure by counting the number of pending ops and there is a flush method which ensures all pending ops are finished.

We have some plans to offer more general support for contextful async operations (i.e. async version of mapWithContext) however not sure when we'll get to it.

Reply all
Reply to author
Forward
0 new messages