Message published on event bus not received by consumer

480 views
Skip to first unread message

Scott Pettyjohn

unread,
Mar 30, 2017, 10:42:56 AM3/30/17
to vert.x

I am trying to use Vert.x to execute long running processes in my otherwise synchronous REST API (drop wizard). I am totally new to Vert.x, so not sure if this is even possible....

I am deploying a 'main' vertical in my application main method:

public static void main(String... args) throws Exception {
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
new InventoryScannerMain().run(args);
Vertx.vertx().deployVerticle(MainVerticle.class.getName());
}

MainVerticle deploys another vertical that is listening for messages and publishes the message source to the bus.

public class MainVerticle extends MicroServiceVerticle {

private static final Logger LOG = LoggerFactory.getLogger(MainVerticle.class);

@Override
public void start() {
super.start();
vertx.deployVerticle(AsyncProcessVerticle.class.getName());

publishMessageSource(AsyncProcessVerticle.ADDRESS, AsyncProcessVerticle.ADDRESS, rec -> {
if (!rec.succeeded()) {
LOG.error("attempt to publish Async Process message source failed: ", rec.cause());
}
LOG.info("Async Process message source published: " + rec.succeeded());
});
}
}

Here is AsyncProcessVerticle:
public class AsyncProcessVerticle  extends MicroServiceVerticle{
public static final String ADDRESS = "async-test";
private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessVerticle.class);
public void start() {
LOG.info("AsyncProcessVerticle#start");
vertx.eventBus().<JsonObject>consumer(ADDRESS).handler( message -> {
LOG.info("message recieved!message recieved!");
});
}
}


The REST resource class (Jersey) publishes a message to the same address that AsyncProcessVerticle is listening on.

@Path(QUEUE_MANAGERS +"/asynctest")
@GET
public Response asyncTest() {
Vertx.vertx().eventBus().publish(AsyncProcessVerticle.ADDRESS, new JsonObject().put("from","asynctest"));
Vertx.vertx().eventBus().publish(AsyncProcessVerticle.ADDRESS, new JsonObject());
return Response.accepted().build();
}

But the message is never received by the AsyncProcessVerticle handler.

Is this approach even possible? Is there a better way?

Thanks.

Jez P

unread,
Mar 30, 2017, 11:18:18 AM3/30/17
to vert.x
You do realise you're using several instances of vertx there? Each time you call Vertx.vertx() you create a new instance (with new eventbus). So asyncTest() isn't publishing to the eventbus the others are listening on. They do need to share the same vertx instance!

sandeepsahoo2k2

unread,
Mar 30, 2017, 12:04:47 PM3/30/17
to vert.x
Also do they need to run with -cluster option ? without that it may not be possible
Reply all
Reply to author
Forward
0 new messages