Memory Leak in io.vertx.core.impl.EventLoopContext in Java Application

1,367 views
Skip to first unread message

Abhishek Pandey

unread,
May 20, 2018, 3:11:35 PM5/20/18
to vert.x
Hi,

We are using vert.x  in one of our Java Application. Past few days we have started seeing high memory (heap) usage which sometime results into application unresponsive (due to heavy GC CPU cycle usage). So to investigate whats eating heap memory I've used Eclipse MAT tool for finding memory leaks. Attached is the screenshot of same Memory leak. Not sure what actually going wrong in the code so need experts advise here. 

pom.xml :

        <groupId>io.vertx</groupId>
        <artifactId>vertx-core</artifactId>
        <version>3.3.3</version>

Request Handler Code using io.vertex :

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor; 

public class Processor {

@POST
@Path("/update_record")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_JSON })
public void processUpdateEvent(String data, @Suspended final AsyncResponse asyncResponse, @Context Vertx vertx) throws Exception{
    logger.info("update called");
    WorkerExecutor executor = vertx.createSharedWorkerExecutor("update_record_event");
    executor.executeBlocking(future -> {
        MyProcessor processor = (MyProcessor) MainClass.context.getBean("myprocessorevent");
        try {
            processor.processMessage(data);
            future.complete();
        } catch (Exception e) {
            logger.error("consuming event failed: " + e.getMessage());
            future.fail(e);
        }
    }, res -> {
        Response response  = null;
        if(res.succeeded()) {
            response = Response.status(Response.Status.OK).build();
        } else {
            response = Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
        }
        asyncResponse.resume(response);
    });
    executor.close();
  }
}

Its all new to me so bear with me. Can you tell me what is wrong here in the code or any particular issue with this vert.x version and how we can fix this memory issue ? 


Thanks
Screen Shot 2018-05-16 at 7.50.30 PM.png

Dan O'Reilly

unread,
May 20, 2018, 4:01:08 PM5/20/18
to ve...@googlegroups.com
Looks like it might be 
https://github.com/eclipse/vert.x/issues/2464, which is caused by a leak with createSharedWorkerExecutor. I think the fix is going to be included in 3.5.2.
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/baa08160-ee74-4ab4-9eaf-7177d22fd99a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Abhishek Pandey

unread,
May 21, 2018, 1:00:59 AM5/21/18
to vert.x
Thanks for the reply Dan.

So when is the official release for version 3.5.2 ? Currently on official website for vert.x its showing 3.5.1 as latest version. https://vertx.io/download/

Julien Viet

unread,
May 21, 2018, 3:53:16 AM5/21/18
to ve...@googlegroups.com
Hi,

3.5.2.CR2 has been released yesterday which includes this fix.

This release is the candidate release for 3.5.2.

Julien

Abhishek Pandey

unread,
May 21, 2018, 4:14:04 AM5/21/18
to vert.x
Thanks  a lot Julien,

Updated my pom file, and got the fixed code in my local now.

<vertx.core.version>3.5.2.CR2</vertx.core.version>

Will update my production code in a day or two after some sanity test. Will update you guys how the heap looks after this fix deployment.

Thanks for your help.

Julien Viet

unread,
May 21, 2018, 4:24:12 AM5/21/18
to ve...@googlegroups.com
let us know.

I'm surprised this issue was not found before actually.

Julien

Message has been deleted
Message has been deleted

Abhishek Pandey

unread,
May 22, 2018, 4:37:09 AM5/22/18
to vert.x

Hi Julien,

After updating the version from 3.3.3 to 3.5.2.CR2 the code was working fine for sometime but latter I'm facing below error :

 May 22, 2018 12:48:08 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[accounting_balance_update-0,5,main] has been blocked for 2078 ms, time limit is 1410
May 22, 2018 12:48:09 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[accounting_balance_update-0,5,main] has been blocked for 3079 ms, time limit is 1410
May 22, 2018 12:48:14 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[accounting_balance_update-0,5,main] has been blocked for 2025 ms, time limit is 1410
Exception in thread "accounting_balance_update-0" java.util.concurrent.RejectedExecutionException: Task io.vertx.core.impl.TaskQueue$$Lambda$10/601749733@18cfc0b5 rejected from java.util.concurrent.ThreadPoolExecutor@53d8fd62[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:70)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

When I checked the logs properly then I realised that executeBlocking internal code as changed :

Earlier version :

  <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
      Handler<AsyncResult<T>> resultHandler,
      Executor exec, PoolMetrics metrics)

3.5.2.CR2 version :

  <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
      Handler<AsyncResult<T>> resultHandler,
      Executor exec, TaskQueue queue, PoolMetrics metrics) 

My requirement is that I want to run blocking code with using vert.x thread pool executor as shown in my first post . Now I'm setting Pool_Size and Max_execution_time while creating thread pool like :
WorkerExecutor executor = vertx.createSharedWorkerExecutor("accounting_balance_update", POOL_SIZE, MAX_EXECUTION_TIME);
Still while running this code in production suddenly vert.x gets stopped processing and start getting 504 for all http request which internally uses vert.x blocking code.

This issue reproducible when two parallel request are made. Please guide here.

Thanks

Julien Viet

unread,
May 22, 2018, 5:29:53 AM5/22/18
to ve...@googlegroups.com
Hi,

can you provide a small reproducer ?

Please guide here.

Thanks



For more options, visit https://groups.google.com/d/optout.


--

Regards
Abhishek

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Abhishek Pandey

unread,
May 22, 2018, 6:08:15 AM5/22/18
to vert.x
Yes Julien,

I'm using Java application . So one can try with below code
 
pom.xml :

        <groupId>io.vertx</groupId>
        <artifactId>vertx-core</artifactId>

        <version>3.5.2.CR2</version>

Request Handler Code using io.vertex :

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;  
@Path("/v1")
public class Processor {

@POST
@Path("/update_record")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_JSON })
public void processUpdateEvent(String data, @Suspended final AsyncResponse asyncResponse, @Context Vertx vertx) throws Exception{
    logger.info("update called");
    WorkerExecutor executor = vertx.createSharedWorkerExecutor("update_record_event", 10, 100*1000000000);

    executor.executeBlocking(future -> {
        MyProcessor processor = (MyProcessor) MainClass.context.getBean("myprocessorevent");
        try {
            processor.processMessage(data);
            future.complete();
        } catch (Exception e) {
            logger.error("consuming event failed: " + e.getMessage());
            future.fail(e);
        }
    }, res -> {
        Response response  = null;
        if(res.succeeded()) {
            response = Response.status(Response.Status.OK).build();
        } else {
            response = Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
        }
        asyncResponse.resume(response);
    });
    executor.close();
  }
}

Server code from where we run this:
 
import mypackage.Processor;
import io.vertx.core.AbstractVerticle;
import org.jboss.resteasy.plugins.server.vertx.VertxRequestHandler;
import org.jboss.resteasy.plugins.server.vertx.VertxResteasyDeployment;
import org.springframework.context.ApplicationContext;

public class VertxServer extends AbstractVerticle {
    VertxServer(final ApplicationContext context) {

    }

    @Override
    public void start() throws Exception {
        VertxResteasyDeployment deployment = new VertxResteasyDeployment();
        deployment.start();
        deployment.getRegistry().addPerInstanceResource(Processors.class);
        vertx.createHttpServer()
                .requestHandler(new VertxRequestHandler(vertx, deployment))
                .listen(8080);
    }

Once server is up and running then just hit two request simultaneously on the above controller . i.e :

curl -X POST \
  -H 'Cache-Control: no-cache' \
  -H 'Content-Type: application/json' \
  -H 'Postman-Token: c9494189-4ac9-9f6c-44f6-216186c74431' \
  -d '{"id":"123"}'
 
 


Thanks

Julien Viet

unread,
May 22, 2018, 6:48:42 AM5/22/18
to ve...@googlegroups.com
In your case what you are facing is a race condition related to the fact that you are getting / closing the shared worker executor with the sequence:

WorkerExecutor exec = vertx.createSharedWorkerExecutor("update_record_event");

// If that's the first task : execute it directly (so the task thread will be interrupted for sure)
// When that's the second task : queued because the first task is still executing and rejected before execution because the executor has been closed below
exec.executeBlocking(a, b);

// Close the executor, it might not execute the previous execute blocking tasks because the task could have been queued if that's not the first one
exec.close();


So it's not really a vertx issue and more an issue in your code.

You should actually create a single shared worker executor in your controller and close it only when the controller is destroyed.

Julien

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Abhishek Pandey

unread,
May 22, 2018, 7:11:36 AM5/22/18
to vert.x
Alright may be I misunderstood some part. 

Just couple of questions here :

  1. Can you tell me how to check or know if executor is busy in processing any request or not so that based on that check only will close the executor ? Or shall I remove the executor.close() line ?
  2. Why this line did not solve the race issue as we 10 threads now in same pool :
  1. WorkerExecutor executor = vertx.createSharedWorkerExecutor("update_record_event", 10, 100*1000000000);
Above questions might be silly,

Thanks

Julien Viet

unread,
May 22, 2018, 7:22:07 AM5/22/18
to ve...@googlegroups.com

On 22 May 2018, at 13:11, Abhishek Pandey <apand...@gmail.com> wrote:

Alright may be I misunderstood some part. 

Just couple of questions here :

  1. Can you tell me how to check or know if executor is busy in processing any request or not so that based on that check only will close the executor ? Or shall I remove the executor.close() line ?
what do you want to achieve ?

if you want to close this executor, then you shall close it in the executeBlocking asynchronous result.

but that will create contention in your application.
  1. Why this line did not solve the race issue as we 10 threads now in same pool :
    WorkerExecutor executor = vertx.createSharedWorkerExecutor("update_record_event", 10, 100*1000000000);

since you are executing this on the event loop, you are creating two different executors (with each 10 threads) as you sequentially close them after creation.

when the first request is processed, the blocking tasks is scheduled on the first executor which is closed sequentially but the task is already in the executor  so the task is actually interrupted.

you can try to do in the task

try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); }

and you will see that it prints an interrupted exception

when the second request is processed, the blocking tasks is not scheduled on the second executor, instead it is added to the task queue (because by default execute blocking has a sequential execution) and then it is executed then it is rejected because the second executor was closed (in happens) before the task is scheduled on this executor.

I hope it makes sense.





Abhishek Pandey

unread,
May 22, 2018, 8:02:18 AM5/22/18
to vert.x
Yes Julien,

I completely understood your second point. 

But my concern was on executor.close() line which if not used or called then would result into memory leak right (as per your this fix in : https://github.com/eclipse/vert.x/commit/37f281bee87f212e80dd5b5520cef0ff7bad56fb where in executor.close() part only we remove the reference from  closeHooks).

So the question where should we keep this executor.close() line in case of parallel requests ? If we placed outside executeBlocking() (as I'm currently using ) code then it would result into race-condition (as per your second point in case of parallel requests).

Thats the reason I was asking in my first point is there a way to check if executor is in use or not so that if not in used then call executor.close() to avoid memory leaks.

Hope you get my concern

Thanks

Julien Viet

unread,
May 22, 2018, 8:24:27 AM5/22/18
to ve...@googlegroups.com
I think that you should create one shared executor in your verticle and make it available to your controllers.

when you undeploy the verticle, the executor will be automatically closed for you

Abhishek Pandey

unread,
Jun 18, 2018, 7:22:06 AM6/18/18
to vert.x
Hi Julien,

Just wanted to inform you that Memory Leak issue was resolved in version 3.5.2. Also as you suggested idea for having shared executor, I've modified my code as below, so instead of creating shared worker executor on each http request (my bad) now we create once and use same worker object again and again. Now every thing is working smoothly in prod we don't need to call executor.close(), which I was stressing earlier to avoid leak in my earlier communication. Also, to your surprise why this leak was not caught earlier (in previous versions) because no one might be creating executor object on each request (with high throughput).

Also used vertex parallel processing power by passing "false" value for ordered variable in executeBlocking() method (refer bold flag in below code).

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor; 

public class Processor {

private static final Map<String, WorkerExecutor> executorMap = new ConcurrentHashMap<>();

public WorkerExecutor getWorkerExecutor(String threadName, Vertx vertxObj) {
    WorkerExecutor executor = executorMap.get(threadName);
    if (executor == null) {
        synchronized (Processors.class) {
            executor = executorMap.get(threadName);
            if(executor == null){
                executor = vertxObj.createSharedWorkerExecutor(threadName);
                executorMap.putIfAbsent(threadName, executor);
            }
        }
    }
    return executor;
}


@POST
@Path("/update_record")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_JSON })
public void processUpdateEvent(String data, @Suspended final AsyncResponse asyncResponse, @Context Vertx vertx) throws Exception{
    logger.info("update called");

    WorkerExecutor executor = getWorkerExecutor("update_record_event", vertx);

    executor.executeBlocking(future -> {
        MyProcessor processor = (MyProcessor) MainClass.context.getBean("myprocessorevent");
        try {
            processor.processMessage(data);
            future.complete();
        } catch (Exception e) {
            logger.error("consuming event failed: " + e.getMessage());
            future.fail(e);
        }

    }, false, res -> {

        Response response  = null;
        if(res.succeeded()) {
            response = Response.status(Response.Status.OK).build();
        } else {
            response = Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
        }
        asyncResponse.resume(response);
    });
  }
}

Thanks once again!

Abhishek Pandey

unread,
Jun 21, 2018, 6:42:59 AM6/21/18
to vert.x
Reply all
Reply to author
Forward
0 new messages