How to limit or discard Msg in eventbus?

764 views
Skip to first unread message

jr981...@gmail.com

unread,
May 14, 2021, 4:23:46 AM5/14/21
to vert.x
I am very confused about how to set the buffer size limit of the event bus. I wrote the test code. 
Consumers consume at a very slow speed. Producers produce very quickly. The buffer limit is set, expected bus is full and the error occurs, but nothing happens.  Memory monitoring shows that the memory is increasing rapidly, 
how can I safely generate back pressure or limit the eventbus to meet the difference speed between consumer and producer.
I can discard messages but cannot pile up.

producer
Vertx vertx = Vertx.vertx(new VertxOptions());
vertx.deployVerticle("VerTical", new DeploymentOptions().setInstances(1));
vertx.exceptionHandler( e -> {
System.out.println("vertx");
e.printStackTrace();
}
);
MessageProducer<Object> sender = vertx.eventBus().sender("a");
sender.setWriteQueueMaxSize(1);
sender.exceptionHandler(e -> {
System.out.println("send");
e.printStackTrace();
}
);

while (true){
sender.write("a");
}


consumer VerTical
MessageConsumer<Object> consumer = vertx.eventBus().consumer("a");
consumer.setMaxBufferedMessages(1);
consumer.handler(v -> {System.out.println(v.body());
try {
sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consumer.exceptionHandler(e -> {
System.out.println("consumer");
e.printStackTrace();
}
);

memory: 0G -8G 

Christopher Tate

unread,
May 16, 2021, 2:01:50 AM5/16/21
to vert.x
Hello jr981008rj,

I have recently been working on the very same thing with good results. We have been using semaphores within a workerExecutor.executeBlocking handler to ensure that only a fixed amount of database updates are made at a time.

Here I setup declare the semaphore and the max number of permits:

   Integer semaphorePermits;
   Semaphore semaphore;
   public AppVertx setSemaphore(Semaphore semaphore) {
     this.semaphore = semaphore;
     return this;
   }

Here I configure the number of permits from an environment variable or default:

   public static void run() {
     Integer semaphorePermits = System.getenv(ConfigKeys.SEMAPHORE_PERMITS) == null ? 10 : Integer.parseInt(System.getenv(ConfigKeys.SEMAPHORE_PERMITS));
     Semaphore semaphore = new Semaphore(semaphorePermits);

Here in my /health check handler, I can monitor the number of semaphore permits used versus the total:

     healthCheckHandler.register("semaphore", 2000, a -> {
       a.complete(Status.OK(new JsonObject().put("permits", semaphorePermits).put("availablePermits", semaphore.availablePermits()).put("queueLengthEstimate", semaphore.getQueueLength())));
     });

Here I configure the semaphore in all of my OpenApi services to use for updates to the database:
See: https://github.com/opendatapolicing/opendatapolicing/blob/main/src/main/java/com/opendatapolicing/enus/vertx/AppVertx.java#L682-L701

   private Future<Void> configureApi() {
     Promise<Void> promise = Promise.promise();
     try {
       SiteUserEnUSGenApiService.registerService(semaphore, vertx.eventBus(), config, workerExecutor, pgPool, webClient, oauth2AuthenticationProvider, authorizationProvider, vertx);
       SiteStateEnUSGenApiService.registerService(semaphore, vertx.eventBus(), config, workerExecutor, pgPool, webClient, oauth2AuthenticationProvider, authorizationProvider, vertx);
       SiteAgencyEnUSGenApiService.registerService(semaphore, vertx.eventBus(), config, workerExecutor, pgPool, webClient, oauth2AuthenticationProvider, authorizationProvider, vertx);

* In my services, I have a PATCH request that reads in objects 100 at a time before it calls itself recursively until it has no more records to read.
* For each 100 items in the SearchList, I add to a List of Futures a function that calls workerExecutor.executeBlocking then acquires a semaphore so that it does not block the event loop but applies backpressure to finish the other items first when the semaphore has no more permits.
* When a semaphore is acquired then I prepare an event bus message and send it to patch the object on the event bus without blocking again. This completes the blocking code.
See: https://github.com/opendatapolicing/opendatapolicing/blob/main/src/main/java/com/opendatapolicing/enus/trafficstop/TrafficStopEnUSGenApiServiceImpl.java#L1468-L1499

  public Future<Void> listPATCHTrafficStop(ApiRequest apiRequest, SearchList<TrafficStop> listTrafficStop) {
    Promise<Void> promise = Promise.promise();
    List<Future> futures = new ArrayList<>();
    listTrafficStop.getList().forEach(o -> {
      futures.add(Future.future(promise1 -> {
        workerExecutor.executeBlocking(blockingCodeHandler -> {
          try {
            semaphore.acquire();
            Long pk = o.getPk();

            JsonObject params = new JsonObject();
            params.put("body", siteRequest.getJsonObject());
            params.put("path", new JsonObject());
            params.put("cookie", new JsonObject());
            params.put("query", new JsonObject().put("q", "*:*").put("fq", new JsonArray().add("pk:" + pk)));
            JsonObject context = new JsonObject().put("params", params);
            JsonObject json = new JsonObject().put("context", context);
            eventBus.send("opendatapolicing-enUS-TrafficStop", json, new DeliveryOptions().addHeader("action", "patchTrafficStopFuture"));
            blockingCodeHandler.complete();
          } catch(Exception ex) {
            LOG.error(String.format("listPATCHTrafficStop failed. "), ex);
            blockingCodeHandler.fail(ex);
          }
        }).onSuccess(a -> {
          promise1.complete();
        }).onFailure(ex -> {
          LOG.error(String.format("listPATCHTrafficStop failed. "), ex);
          promise1.fail(ex);
        });
      }));
    });

* The event bus message to PATCH the object is received and processed, and whether the PATCH request for this object was complete or fail, I release the semaphore to allow more events to be processed.
* The semaphore was acquired before the message was queued up on the event bus, and released when the message is done processing to prevent the event bus from running out of memory.

  @Override
  public void patchTrafficStopFuture(JsonObject json, ServiceRequest serviceRequest, Handler<AsyncResult<ServiceResponse>> eventHandler) {
    SiteRequestEnUS siteRequest = generateSiteRequestEnUS(null, serviceRequest, json);
    TrafficStop o = new TrafficStop();
    o.setSiteRequest_(siteRequest);
    ApiRequest apiRequest = new ApiRequest();
    apiRequest.setRows(1);
    apiRequest.setNumFound(1L);
    apiRequest.setNumPATCH(0L);
    apiRequest.initDeepApiRequest(siteRequest);
    siteRequest.setApiRequest_(apiRequest);
    o.setPk(json.getString(TrafficStop.VAR_pk));
    patchTrafficStopFuture(o, false).onSuccess(a -> {
      semaphore.release();
      eventHandler.handle(Future.succeededFuture());
    }).onFailure(ex -> {
      semaphore.release();
      eventHandler.handle(Future.failedFuture(ex));
    });
  }

Then my code to recursively call the same method and page through all the results 100 at a time is called again and again when the CompositeFuture.all is complete until there are no more records.

    CompositeFuture.all(futures).onSuccess( a -> {
      if(apiRequest != null) {
        apiRequest.setNumPATCH(apiRequest.getNumPATCH() + listTrafficStop.getQueryResponse().getResults().size());
        if(apiRequest.getNumFound() == 1L)
          listTrafficStop.first().apiRequestTrafficStop();
        eventBus.publish("websocketTrafficStop", JsonObject.mapFrom(apiRequest).toString());
      }
      listTrafficStop.next().onSuccess(next -> {
        if(next) {
          listPATCHTrafficStop(apiRequest, listTrafficStop);
        } else {
          promise.complete();
        }
      }).onFailure(ex -> {
        LOG.error(String.format("listPATCHTrafficStop failed. "), ex);
        promise.fail(ex);
      });
    }).onFailure(ex -> {
      LOG.error(String.format("listPATCHTrafficStop failed. "), ex);
      promise.fail(ex);
    });
    return promise.future();
  }

There are some important configuration items that you will want to setup and tweak for ultimate performance:

siteInstances=21 # I allocate 20 instances of the main verticle to process PATCH requests for example, plus 1 to run the long running request recursively.
workerPoolSize=20 # The number of workers that can execute blocking and obtain a semaphore.
jdbcMaxPoolSize=2 # I give each instance of the verticle 2 JDBC connections, in my app, 1 connection doesn't seem to be enough sometimes and results in not being retrieved so I like at least 2.
jdbcMaxWaitQueueSize=1000 # Allow several extra connections to your database to be queued up, so they don't result in an error with so many requests going on at once.
semaphorePermits=80 # This is your amount of backpressure you want to apply to events being processed at once.

With this configuration, I have been able to PATCH 1 million records every 10 minutes without running out of memory! 27 million records total processed and more to come!
Hope it helps you!

To your health. Courage!
Christopher Tate.

jr981...@gmail.com

unread,
May 18, 2021, 5:45:08 AM5/18/21
to vert.x

Thank you very much for reply. It is a direction that can be considered. Perhaps you can switch from semisphore to ratelimter and there may be a slight improvement.

jr981...@gmail.com

unread,
May 18, 2021, 7:56:45 AM5/18/21
to vert.x
this is solution i use now,its not good  but not bad;

Vertx vertx = Vertx.vertx();

MessageProducer<Object> sender = vertx.eventBus().sender("a");
sender.setWriteQueueMaxSize(10);
sender.exceptionHandler(e -> e.printStackTrace());
// sender.deliveryOptions(new DeliveryOptions().getSendTimeout())
new Thread(() -> {
while (true){
if (!sender.writeQueueFull()) {
System.out.println("send");

sender.write("a");
} else {
// kpi
}

}
}).start();


MessageConsumer<Object> consumer = vertx.eventBus().consumer("a");
consumer.exceptionHandler(e -> e.printStackTrace());

WorkerExecutor shareExe = vertx.createSharedWorkerExecutor("my-worker-pool", 8, TimeUnit.SECONDS.toNanos(60));
consumer.handler(new Handler<Message<Object>>() {

AtomicInteger count = new AtomicInteger(0);
@Override
public void handle(Message<Object> event) {
Object body = event.body();
shareExe .executeBlocking(new Handler<Promise<Integer>>() {
@Override
public void handle(Promise<Integer> promis) {
sleep(10000);
// dmq
promis.complete();

}
},false,res->{
if(count.decrementAndGet()<10){
consumer.resume();
}
System.out.println("fin");
});
if (count.incrementAndGet() > 10) {
consumer.pause();
}
}
});

Julien Viet

unread,
May 18, 2021, 9:00:13 AM5/18/21
to vert.x
Hi,

if you need some kind of back-pressure you can use acknowledgements
when sending a request.

e.g you can use a simple counter to know the number of inflight
messages that is increased when you send a request and decreased when
you receive the response.

this means you need to use eventBus.request() instead of eventBus.send().

HTH

Julien
> --
> You received this message because you are subscribed to the Google Groups "vert.x" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
> To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/6771447a-b767-44f7-9a61-df6ac064cd76n%40googlegroups.com.

jr981...@gmail.com

unread,
May 19, 2021, 3:13:55 AM5/19/21
to vert.x
Because of bus consumer uncertainty, due to external reasons. in my case,the current vertx event loop is also responsible for processing gateway requests. I cannot blocking the gateway. I need assume that  bus consumers(send data to kafka) will blocking execution, avoid blocking  vertx event loop

Christopher Tate

unread,
May 27, 2021, 3:14:00 PM5/27/21
to vert.x
Thanks for the great idea Julien!
I switched to using eventBus.request()/reply(), along with a simple Long counter, and setting up the SQL Row Stream to pause, and then fetch a certain amount. I was able to remove the blocking semaphores and double my performance with your recommendation!

    private Future<Void> syncData(String tableName) {

        Promise<Void> promise = Promise.promise();
        try {
            if(config().getBoolean(String.format("%s%s", ConfigKeys.ENABLE_DB_SOLR_SYNC, tableName), true)) {
                LOG.info(String.format(syncDataStarted, tableName));
                Long apiCounterPause = config().getLong(ConfigKeys.API_COUNTER_PAUSE);
                Long apiCounterResume = config().getLong(ConfigKeys.API_COUNTER_RESUME);
                Integer apiCounterFetch = config().getInteger(ConfigKeys.API_COUNTER_FETCH);
                vertx.sharedData().getCounter("dbSolrSync").onSuccess(counter -> {
                    counter.get().onSuccess(startCount -> {
                        counter.compareAndSet(startCount, 0L).onSuccess(counterResetSuccess -> {
                            if(counterResetSuccess) {
                                pgPool.withTransaction(sqlConnection -> {
                                    Promise<Void> promise1 = Promise.promise();
                                    sqlConnection.prepare(String.format("SELECT pk FROM %s", tableName)).onSuccess(preparedStatement -> {
                                        setCountNum(0L);
                                        setTotalNum(0L);
                                        try {
                                            RowStream<Row> stream = preparedStatement.createStream(apiCounterFetch);
                                            stream.pause();
                                            stream.fetch(apiCounterFetch);
                                            stream.exceptionHandler(ex -> {
                                                LOG.error(String.format(syncDataFail, tableName), new RuntimeException(ex));
                                                promise1.fail(ex);
                                            });
                                            stream.endHandler(v -> {
                                                LOG.info(String.format(syncDataComplete, tableName));
                                                promise1.complete();
                                            });
                                            stream.handler(row -> {
                                                incrementCountNum();
                                                if(countNum.compareTo(apiCounterPause) >= 0) {
                                                    stream.pause();
                                                }
                                                try {
                                                    vertx.eventBus().request(
                                                            String.format("opendatapolicing-enUS-%s", tableName)
                                                            , new JsonObject().put(
                                                                    "context"
                                                                    , new JsonObject().put(
                                                                            "params"
                                                                            , new JsonObject()
                                                                                    .put("body", new JsonObject().put("pk", row.getLong(0).toString()))
                                                                                    .put("path", new JsonObject())
                                                                                    .put("cookie", new JsonObject())
                                                                                    .put("query", new JsonObject().put("q", "*:*").put("fq", new JsonArray().add("pk:" + row.getLong(0))).put("var", new JsonArray().add("refresh:false")))
                                                                    )
                                                            )
                                                            , new DeliveryOptions().addHeader("action", String.format("patch%sFuture", tableName))).onSuccess(a -> {
                                                        incrementTotalNum();
                                                        decrementCountNum();
                                                        if(countNum.compareTo(apiCounterResume) == 0) {
                                                            stream.fetch(apiCounterFetch);
                                                            LOG.info("FETCH {} {}", totalNum, Thread.currentThread().getName());
                                                        }
                                                    }).onFailure(ex -> {
                                                        LOG.error(String.format(syncDataFail, tableName), ex);
                                                        promise1.fail(ex);
                                                    });
                                                } catch (Exception ex) {
                                                    LOG.error(String.format(syncDataFail, tableName), ex);
                                                    promise1.fail(ex);
                                                }
                                            });
                                        } catch (Exception ex) {
                                            LOG.error(String.format(syncDataFail, tableName), ex);
                                            promise1.fail(ex);
                                        }
                                    }).onFailure(ex -> {
                                        LOG.error(String.format(syncDataFail, tableName), ex);
                                        promise1.fail(ex);
                                    });
                                    return promise1.future();
                                }).onSuccess(a -> {
                                    promise.complete();
                                }).onFailure(ex -> {
                                    LOG.error(String.format(syncDataFail, tableName), ex);
                                    promise.fail(ex);
                                });
                            } else {
                                Exception ex = new RuntimeException(String.format(syncDataCounterResetFail, tableName));
                                LOG.error(ex.getMessage(), ex);
                                promise.fail(ex);
                            }
                        }).onFailure(ex -> {
                            LOG.error(String.format(syncDataFail, tableName), ex);
                            promise.fail(ex);
                        });
                    }).onFailure(ex -> {
                        LOG.error(String.format(syncDataFail, tableName), ex);
                        promise.fail(ex);
                    });
                }).onFailure(ex -> {
                    LOG.error(String.format(syncDataFail, tableName), ex);
                    promise.fail(ex);
                });
            } else {
                LOG.info(String.format(syncDataSkip, tableName));
                promise.complete();
            }
        } catch (Exception ex) {
            LOG.error(String.format(syncDataFail, tableName), ex);
            promise.fail(ex);
        }
        return promise.future();
    }

Reply all
Reply to author
Forward
0 new messages