<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.3.3</version>import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
public class Processor {
@POST
@Path("/update_record")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_JSON })
public void processUpdateEvent(String data, @Suspended final AsyncResponse asyncResponse, @Context Vertx vertx) throws Exception{
logger.info("update called");
WorkerExecutor executor = vertx.createSharedWorkerExecutor("update_record_event");
executor.executeBlocking(future -> {
MyProcessor processor = (MyProcessor) MainClass.context.getBean("myprocessorevent");
try {
processor.processMessage(data);
future.complete();
} catch (Exception e) {
logger.error("consuming event failed: " + e.getMessage());
future.fail(e);
}
}, res -> {
Response response = null;
if(res.succeeded()) {
response = Response.status(Response.Status.OK).build();
} else {
response = Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
asyncResponse.resume(response);
});
executor.close();
}
}--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/baa08160-ee74-4ab4-9eaf-7177d22fd99a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/7b484fbc-e8e0-42f5-9cd4-4c812006a53a%40googlegroups.com.
<vertx.core.version>3.5.2.CR2</vertx.core.version>
Will update my production code in a day or two after some sanity test. Will update you guys how the heap looks after this fix deployment.
Thanks for your help.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/02c3960a-3684-4040-ac62-e053cd9dba8b%40googlegroups.com.
May 22, 2018 12:48:08 PM io.vertx.core.impl.BlockedThreadCheckerWARNING: Thread Thread[accounting_balance_update-0,5,main] has been blocked for 2078 ms, time limit is 1410May 22, 2018 12:48:09 PM io.vertx.core.impl.BlockedThreadCheckerWARNING: Thread Thread[accounting_balance_update-0,5,main] has been blocked for 3079 ms, time limit is 1410May 22, 2018 12:48:14 PM io.vertx.core.impl.BlockedThreadCheckerWARNING: Thread Thread[accounting_balance_update-0,5,main] has been blocked for 2025 ms, time limit is 1410Exception in thread "accounting_balance_update-0" java.util.concurrent.RejectedExecutionException: Task io.vertx.core.impl.TaskQueue$$Lambda$10/601749733@18cfc0b5 rejected from java.util.concurrent.ThreadPoolExecutor@53d8fd62[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:70)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)at java.lang.Thread.run(Thread.java:748)
3.5.2.CR2 version :<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,Handler<AsyncResult<T>> resultHandler,Executor exec, PoolMetrics metrics)
My requirement is that I want to run blocking code with using vert.x thread pool executor as shown in my first post . Now I'm setting Pool_Size and Max_execution_time while creating thread pool like :<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,Handler<AsyncResult<T>> resultHandler,Executor exec, TaskQueue queue, PoolMetrics metrics)
WorkerExecutor executor = vertx.createSharedWorkerExecutor("accounting_balance_update", POOL_SIZE, MAX_EXECUTION_TIME);Still while running this code in production suddenly vert.x gets stopped processing and start getting 504 for all http request which internally uses vert.x blocking code.Please guide here.Thanks
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/6A00FDBD-BC53-4A8C-A865-F120E52CD6EC%40julienviet.com.
For more options, visit https://groups.google.com/d/optout.
--RegardsAbhishek
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/CALk7ZDoq9gjT%3DGwYX%3DzYpiFf%3DcWbO4hpQyB1NnhXRJ3CjrvtLg%40mail.gmail.com.
pom.xml :
<groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId>
<version>3.5.2.CR2</version>
Request Handler Code using io.vertex :import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import io.vertx.core.Vertx; import io.vertx.core.WorkerExecutor;
@Path("/v1")
public class Processor { @POST @Path("/update_record") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_JSON }) public void processUpdateEvent(String data, @Suspended final AsyncResponse asyncResponse, @Context Vertx vertx) throws Exception{ logger.info("update called");
WorkerExecutor executor = vertx.createSharedWorkerExecutor("update_record_event", 10, 100*1000000000);
executor.executeBlocking(future -> { MyProcessor processor = (MyProcessor) MainClass.context.getBean("myprocessorevent"); try { processor.processMessage(data); future.complete(); } catch (Exception e) { logger.error("consuming event failed: " + e.getMessage()); future.fail(e); } }, res -> { Response response = null; if(res.succeeded()) { response = Response.status(Response.Status.OK).build(); } else { response = Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } asyncResponse.resume(response); }); executor.close(); } }
import mypackage.Processor;import io.vertx.core.AbstractVerticle;import org.jboss.resteasy.plugins.server.vertx.VertxRequestHandler;import org.jboss.resteasy.plugins.server.vertx.VertxResteasyDeployment;import org.springframework.context.ApplicationContext;public class VertxServer extends AbstractVerticle {VertxServer(final ApplicationContext context) {}@Overridepublic void start() throws Exception {VertxResteasyDeployment deployment = new VertxResteasyDeployment();deployment.start();deployment.getRegistry().addPerInstanceResource(Processors.class);vertx.createHttpServer().requestHandler(new VertxRequestHandler(vertx, deployment)).listen(8080);}}
Thanks
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/02e25c49-f40c-4499-89b8-671ea8003ae3%40googlegroups.com.
WorkerExecutor executor = vertx.createSharedWorkerExecutor("update_record_event", 10, 100*1000000000);On 22 May 2018, at 13:11, Abhishek Pandey <apand...@gmail.com> wrote:Alright may be I misunderstood some part.Just couple of questions here :
- Can you tell me how to check or know if executor is busy in processing any request or not so that based on that check only will close the executor ? Or shall I remove the executor.close() line ?
- Why this line did not solve the race issue as we 10 threads now in same pool :
WorkerExecutor executor = vertx.createSharedWorkerExecutor("update_record_event", 10, 100*1000000000);
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/63caf864-a007-4772-b5f6-d15919ece14c%40googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/84df55ba-6241-4fce-b912-9d27e361a3b7%40googlegroups.com.
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
public class Processor {
private static final Map<String, WorkerExecutor> executorMap = new ConcurrentHashMap<>();
public WorkerExecutor getWorkerExecutor(String threadName, Vertx vertxObj) {
WorkerExecutor executor = executorMap.get(threadName);
if (executor == null) {
synchronized (Processors.class) {
executor = executorMap.get(threadName);
if(executor == null){
executor = vertxObj.createSharedWorkerExecutor(threadName);
executorMap.putIfAbsent(threadName, executor);
}
}
}
return executor;
}
@POST
@Path("/update_record")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_JSON })
public void processUpdateEvent(String data, @Suspended final AsyncResponse asyncResponse, @Context Vertx vertx) throws Exception{
logger.info("update called");
WorkerExecutor executor = getWorkerExecutor("update_record_event", vertx);
executor.executeBlocking(future -> {
MyProcessor processor = (MyProcessor) MainClass.context.getBean("myprocessorevent");
try {
processor.processMessage(data);
future.complete();
} catch (Exception e) {
logger.error("consuming event failed: " + e.getMessage());
future.fail(e);
}
}, false, res -> {
Response response = null;
if(res.succeeded()) {
response = Response.status(Response.Status.OK).build();
} else {
response = Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
asyncResponse.resume(response);
});
}
}