Distributed Executor service blocks when submitting large number of async tasks

360 views
Skip to first unread message

DaveR

unread,
Apr 26, 2018, 1:16:52 PM4/26/18
to Hazelcast
Hi,

We have noticed that if we attempt to submit a large number of Async tasks to a Hazelcast Distributed Executor:


either using the java client or as a full Java member of the cluster, the submissions block.

Here is some example code:
@Autowired
private ExecutorService executorService;

@Override
public void run() {
logger.info("AsyncJob started");

List<Future<String>> results = new ArrayList<>();
long submissionStartTime = System.currentTimeMillis();
for (int i=0; i<5000; i++) {
Future<String> result = executorService.submit(new JobTask(i));
results.add(result);
if ( i%50 == 0 ) {
logger.info("{} tasks submitted",i);
}
}
long submissionEndTime=System.currentTimeMillis();

int doneCount=0;
long workStartTime = System.currentTimeMillis();
while (doneCount < results.size() ) {
List<Future<String>> complete = results.stream()
.filter(r -> r.isDone())
.collect(Collectors.toList());
doneCount = complete.size();
logger.info ("{} done, performing useful work", doneCount);
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
logger.warn("Sleep interrupted");
}
}
long workEndTime=System.currentTimeMillis();

logger.info("All tasks complete");

logger.info("Submission took {} ms", (submissionEndTime-submissionStartTime));
logger.info("Time for useful work {} ms", (workEndTime-workStartTime));
}

The JobTask in this case is:

public class JobTask implements Callable<String>, Serializable {
private static final long serialVersionUID = 8945413768030936835L;

private static final Logger logger = LoggerFactory.getLogger(JobTask.class);
private int requestId;
public JobTask(int requestId) {
super();
this.requestId = requestId;
}

@Override
public String call() throws Exception {
logger.debug("Invoking task for {}", requestId);
TimeUnit.MILLISECONDS.sleep(250);
logger.debug("Task complete for {}", requestId);
return requestId + " complete";
}
}

Using a local task executor (no Hazelcast at all) I get:

2018-04-26 17:09:34.885  INFO 14420 --- [       Thread-5] c.e.async.AsyncControllerComponent       : Submission took 14 ms
2018-04-26 17:09:34.886  INFO 14420 --- [       Thread-5] c.e.async.AsyncControllerComponent       : Time for useful work 64043 ms

Using a full cluster member I get:

2018-04-26 17:12:32.566  INFO 16916 --- [       Thread-5] c.e.async.AsyncControllerComponent       : Submission took 70463 ms
2018-04-26 17:12:32.566  INFO 16916 --- [       Thread-5] c.e.async.AsyncControllerComponent       : Time for useful work 2009 ms

Using a Hazelcast client to a remote cluster I get:

2018-04-26 17:15:40.649  INFO 7336 --- [       Thread-5] c.e.async.AsyncControllerComponent       : Submission took 68252 ms
2018-04-26 17:15:40.649  INFO 7336 --- [       Thread-5] c.e.async.AsyncControllerComponent       : Time for useful work 2011 ms

In each case, I have configured the executor as follows, to have a pool size of 25 threads and queue capacity of 10,000 (larger than the 5,000 submitted tasks) :

// Configure the executor pool
config.getExecutorConfig(hazelcastExecutorName)
.setPoolSize(hazelcastExecutorPoolSize)
.setQueueCapacity(hazelcastExecutorQueueCapacity)
.setStatisticsEnabled(true);

Looking at the cluster member case, a thread dump during task submission shows:

"Thread-5" #79 daemon prio=5 os_prio=0 tid=0x0000000030dda800 nid=0x4f78 waiting on condition [0x0000000037c8f000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:153)
        at com.hazelcast.executor.impl.ExecutorServiceProxy.submitToPartitionOwner(ExecutorServiceProxy.java:245)
        at com.hazelcast.executor.impl.ExecutorServiceProxy.submit(ExecutorServiceProxy.java:227)
        at com.example.async.AsyncControllerComponent.run(AsyncControllerComponent.java:34)
        at java.lang.Thread.run(Thread.java:748)


    /**
     * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can
     * be removed.
     */
    private boolean checkSync() {
        boolean sync = false;
        long last = lastSubmitTime;
        long now = Clock.currentTimeMillis();
        if (last + SYNC_DELAY_MS < now) {
            CONSECUTIVE_SUBMITS.set(this, 0);
        } else if (CONSECUTIVE_SUBMITS.incrementAndGet(this) % SYNC_FREQUENCY == 0) {
            sync = true;
        }
        lastSubmitTime = now;
        return sync;
    }

which makes every 100th (SYNC_FREQUENCY) task submission synchronous, causing the blocking behavior.  Similar code exists in the ClientExecutorServiceProxy.java ( MAX_CONSECUTIVE_SUBMITS ).

Is this behaviour by design? If so, could it be made configurable so that if you have large enough queue the limit could be raised or disabled?

Ideally we wanted to have code where we could transparently switch between local and distributed execution.

I have a fully executable test case if it would help - but I think it's clear in the code where and how this is happening, my question is more about whether this behaviour is correct and intentional.

Hope this all makes sense, many thanks in advance

Dave


Sertuğ Kaya

unread,
May 18, 2018, 12:20:57 PM5/18/18
to Hazelcast
Can you specify which version of Hazelcast is this? And is the cluster on a really distributed environment? Also, since it's related, have you seen back pressure settings?

DaveR

unread,
May 18, 2018, 12:47:50 PM5/18/18
to Hazelcast
Hi Sertuğ 

Thanks for looking at this.  This is using Hazelcast 3.9.3.  The cluster is distributed - the remote server is spun up in a separate JVM to the client.
I've looked through the back pressure settings, but can't see how they would help as the hazelcast client code forces the operation to sync every 100 requests.

I have put the whole test case here: https://github.com/driseley/async-blocking-client , if you want to have a look.

To spin up the Cluster run: com.example.async.AsyncWorkerApplication
To spin up the Client run: com.example.async.AsyncControllerApplication -Dspring.profiles.active=client

Then to trigger the test hit:  http://localhost:8080/job/start

Kind Regards
Dave

Peter Veentjer

unread,
May 19, 2018, 10:20:11 AM5/19/18
to haze...@googlegroups.com
The sync is done to prevent overload of the executor if I remember correctly.

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+unsubscribe@googlegroups.com.
To post to this group, send email to haze...@googlegroups.com.
Visit this group at https://groups.google.com/group/hazelcast.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast/1de8206a-33f4-4c70-aebe-73d6cc1841c2%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

DaveR

unread,
May 21, 2018, 4:37:14 AM5/21/18
to Hazelcast
Thanks Peter. In our scenario, we have configured a queue that is larger than the number of tasks we plan to submit, so there should be no overload.

Could the overload protection be made configurable or tuneable - having a hardcoded value of 100 tasks basically makes the submission synchronous?

Kind Regards
Dave


On Saturday, 19 May 2018 15:20:11 UTC+1, peter veentjer wrote:
The sync is done to prevent overload of the executor if I remember correctly.
On Fri, May 18, 2018 at 7:47 PM, DaveR <da...@openanswers.co.uk> wrote:
Hi Sertuğ 

Thanks for looking at this.  This is using Hazelcast 3.9.3.  The cluster is distributed - the remote server is spun up in a separate JVM to the client.
I've looked through the back pressure settings, but can't see how they would help as the hazelcast client code forces the operation to sync every 100 requests.

I have put the whole test case here: https://github.com/driseley/async-blocking-client , if you want to have a look.

To spin up the Cluster run: com.example.async.AsyncWorkerApplication
To spin up the Client run: com.example.async.AsyncControllerApplication -Dspring.profiles.active=client

Then to trigger the test hit:  http://localhost:8080/job/start

Kind Regards
Dave


On Friday, 18 May 2018 17:20:57 UTC+1, Sertuğ Kaya wrote:
Can you specify which version of Hazelcast is this? And is the cluster on a really distributed environment? Also, since it's related, have you seen back pressure settings?

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+...@googlegroups.com.

To post to this group, send email to haze...@googlegroups.com.
Visit this group at https://groups.google.com/group/hazelcast.

Ozan Kılıç

unread,
May 21, 2018, 7:07:06 AM5/21/18
to haze...@googlegroups.com
Hi Dave, 

I think it would be best if you created a GitHub issue for this request. 

DaveR

unread,
May 21, 2018, 7:39:59 AM5/21/18
to Hazelcast
Hi Ozan

Sure no problems, was raising it here first to check I wasn't doing anything silly(!).

I'll update this thread with the issue once I've raised it (should be tomorrow)

Kind Regards
Dave

DaveR

unread,
May 22, 2018, 7:58:41 AM5/22/18
to Hazelcast
Hi Ozan

Kind Regards
Dave


Reply all
Reply to author
Forward
0 new messages