InterruptedException Occurred with ExecuteBlocking from with vertx v4.3.0 onwards

28 views
Skip to first unread message

hemantk...@gmail.com

unread,
Mar 29, 2024, 6:53:09 AMMar 29
to vert.x
Team,

While upgrading vertx version from 4.2.7 to 4.3.0, we are seeing InterruptedException is being thrown wherever we have used Vertx executeBlocking with JDK ExecutorCompletionService. This issue is observed from Vert.x version 4.3.0 onwards. Is there any changes to executeBlocking in 4.3.0 ?

We are calling method B from method A, where method B has code to create different threads using JDK ExecutorCompletionService and method A has Vertx execute blocking code from where method B is being called.

Below are the stack trace for reference

2024-03-06 15:25:08,942 [DS_ASSET_DISCOVERY_HANDLER-0] ERROR PortScanWrapper:144 - Exception occured in PortScanWrapper java.lang.InterruptedException: null at java.util.concurrent.locks.ReentrantLock$Sync.lockInterruptibly(ReentrantLock.java:159) ~[?:?] at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:372) ~[?:?] at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:432) ~[?:?] at java.util.concurrent.ExecutorCompletionService.take(ExecutorCompletionService.java:200) ~[?:?] at com.sup.deviceidentification.PortScanWrapper.startScan(PortScanWrapper.java:134) ~[identification-5.0.23-SNAPSHOT.jar:?] at com.sup.core.handlers.DeviceIdentificationHandlerImpl.portScan(DeviceIdentificationHandlerImpl.java:221) ~[core-5.0.23-SNAPSHOT.jar:?] at com.sup.core.handlers.DeviceIdentificationHandlerImpl.identify(DeviceIdentificationHandlerImpl.java:415) ~[core-5.0.23-SNAPSHOT.jar:?] at com.sae.commons.utility.VertxTest.identifyDevice(VertxTest.java:125) ~[main/:?] at com.sae.commons.utility.VertxTest.callCollector(VertxTest.java:67) ~[main/:?] at com.sae.commons.utility.VertxTest.lambda$0(VertxTest.java:39) ~[main/:?] at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:159) ~[vertx-core-4.3.0.jar:4.3.0] at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100) ~[vertx-core-4.3.0.jar:4.3.0] at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:157) ~[vertx-core-4.3.0.jar:4.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.106.Final.jar:4.1.106.Final] at java.lang.Thread.run(Thread.java:840) [?:?]

This issue is observed with vertx v4.3.0 onwards.


Same code is working fine till vertx v4.2.7. Any version after v4.2.7 is throwing interrupted Exception.
Below are the sample code snippet

WorkerExecutor addAssetExecutor = vertx.createSharedWorkerExecutor("DS_ASSET_DISCOVERY_HANDLER",
                20, 30, TimeUnit.MINUTES);

        addAssetExecutor.executeBlocking(
                inventoryaddAssetPromise ->callCollector(inventoryaddAssetPromise), false,
                addAssetResult -> {
                    if (addAssetResult.succeeded()) {
                        log.debug("Execution Completed For Add Asset : {} ", addAssetResult.result());
                    } else {
                        log.error("Execution Failed For Add Asset : ", addAssetResult.cause());
                    }
                });

PortScanWrapper code

// Creating a pool of threads [no of port list / 2]
        ExecutorService executor = Executors.newFixedThreadPool(portList.size(),new ThreadFactoryBuilder().setNameFormat("PortScanThread-%d").build());
        CompletionService<Integer> compService = new ExecutorCompletionService<Integer>(
                executor);
        ArrayList<Integer> resultPortList = new ArrayList<Integer>();
        for (Integer port : portList) {
            PortScan scannerThread = new PortScan(device.getIpAddress(), port);
            compService.submit(scannerThread);
        }
        for(int i=0;i<portList.size();i++){

            try
            {
                future = compService.take();
                portNumber = future.get();
                if(portNumber>0){
                    logger.debug("PortScan is done for port:"+portNumber+" for device:"+device.getIpAddress());
                    resultPortList.add(portNumber);
                }
            }
            catch (ExecutionException |InterruptedException e) {
                logger.error("Exception occured in PortScanWrapper",e);
                throw e;
            }
        }

hemantk...@gmail.com

unread,
Mar 29, 2024, 7:24:06 AMMar 29
to vert.x
This issue is not seen while running without execute Blocking. But We need to run this on worker thread as it is long running task.
Reply all
Reply to author
Forward
0 new messages