Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

Streaming a reply through the eventbus?

1,487 views
Skip to first unread message

Stephan Wissel

unread,
Apr 3, 2016, 8:35:13 AM4/3/16
to vert.x
I have the following scenario:
A request comes in through a http server and is passed to a worker verticle through an eventbus send. The reply is potentially very large, so I'd like to use stream it using a pump. I checked the documentation and some of the posts here, but I couldn't quite figure out the moving parts.
To get started I created a classic example that simply sends a buffer response. How would that example look translated to a stream approach (using the same verticle for bus reply and http - obviously not what your would do in a real app):

public class BusSample extends AbstractVerticle {

  @Override

  public void start(final Future<Void> startFuture) throws Exception {

    // The server to show the output

    final HttpServer httpServer = this.vertx.createHttpServer();

    httpServer.requestHandler(this::httpRequestHandler).listen(8805);

    final EventBus eb = this.vertx.eventBus();

    eb.consumer("bustest", this::busIncoming);

    startFuture.complete();

  }


  // Simple action to write something back

  private void busIncoming(final Message<String> incoming) {

    final int turns = Integer.valueOf(incoming.body()).intValue();

    final Buffer result = Buffer.buffer(turns * 10);

    for (int i = 0; i < turns; i++) {

      result.appendString("Line ");

      result.appendString(Integer.toString(i));

      result.appendString("\n");

    }

    incoming.reply(result);

  }


  // Simple example, just write 42 on the bus and write out

  private void httpRequestHandler(final HttpServerRequest request) {

    final HttpServerResponse response = request.response();

    response.setChunked(true);

    final EventBus eb = this.vertx.eventBus();

    eb.send("bustest", "42", reply -> {

      response.putHeader("Content-type", "text/plain");

      final Buffer result = (Buffer) reply.result().body();

      response.end(result);

    });

  }

}


Advice is highly appreciated

Julien Viet

unread,
Apr 4, 2016, 2:21:20 AM4/4/16
to ve...@googlegroups.com
Hi,

you can only make a single reply to a specific message.

if you want multiple replies, then you should rather create your own consumer with an UUID in the HttpServer request handler, then send a message to the consumer with this UUID and have the consumer send messages to it and unregister it when it is done.

this is typically what the Message replies does under the hood. However we don’ really support flow control over event bus at the moment, however you can try and use MessageProducer for sending events instead of send. It should work as you won’t love messages in this case.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/030d22e2-0c76-47da-b648-847e970afcf7%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Stephan Wissel

unread,
Apr 4, 2016, 11:35:10 AM4/4/16
to vert.x
Hi Julien,

appreciate your reply. I was suspecting something like that. Could you elaborate on "won’t love messages".

Let me sum up the options as I understand them:

  • When sending the message I provide a UUID to my worker and it will go and not sending a reply, but sending (multiple) "new" messages. Advantage: works with any "flavour" of the Eventbus (invaluable for a cluster environment). Challenge: no flow control (Backpressure?). Any sample for that?
  • When receiving a message I reply (immediately) with a port and I use a socket connection on the http side to listen to that port ... there I have full flow control and can use a pump. Challenge: packet loss (?) and clustering?
  • Writing a buffer into the shared map?
  • Something else?
What would you recommend?

Julien Viet

unread,
Apr 4, 2016, 11:43:43 AM4/4/16
to ve...@googlegroups.com
won’t lose :-)


Hoobajoob

unread,
Apr 25, 2017, 10:01:54 AM4/25/17
to vert.x
Stephan,

We are also interested in a working byte pump over the (local) event bus, preferably with a custom codec that avoids introducing copies. Did you find an approach that worked for you?

Thanks!

Julien Viet

unread,
Apr 25, 2017, 10:40:47 AM4/25/17
to ve...@googlegroups.com
Hi,

I totally forgot this thread :-)

I started a proof of concept about setting up a stream basd on event bus : https://github.com/vert-x3/vertx-service-proxy/tree/stream-protocol

I used the vertx-service-proxy project because I think it I see this similar to what already exist in service proxy with @ProxyClose and so this could be based (or compatible) with the service-proxy “protocol” (i.e how it uses the event bus) and also provide streaming support for vertx-service-proxy, in short an @ServiceProxy can return or consume a ReadStream, à la gRPC.

Julien

-- 
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Hoobajoob

unread,
Apr 25, 2017, 11:57:28 AM4/25/17
to vert.x
Cool - thank you for the reference! I'll take a look and see how we might approach this.

We're interested in this for supporting blob retrieval where any given blob might be many megabytes in size. We do our REST front ends in javascript, though, and the pattern in other areas is to support pluggable providers written as verticles that service a well-known event bus address.

For blobs, we're not sure exactly what to do because we think we'd introduce a custom codec to avoid copies on every chunked buffer message, but my understanding is that custom codecs aren't supported in javascript (is that right)?

If we try to implement the REST interface for blob retrieval in Java so we can use the custom codec, we'd need to use a separate verticle than the javascript verticle that currently sets up all the routes at that endpoint. I don't see a way to share an HTTP router instance between languages, and that seems to lead us to requiring a separate HTTP service instance (at least) serving on a different port, if not a separate jvm/vertx instance. This is not a choice we would make based on a separation-of-concerns consideration though, so it seems unfortunate.

Is there a way to have a javascript REST front end making streaming requests across the event bus to a java streaming provider with a non-copying codec (say for vertx Buffers)? Or, are we correct in assuming we are going to have to introduce a new endpoint?

Thanks for any advice you might have!

Julien Viet

unread,
Apr 25, 2017, 12:09:18 PM4/25/17
to ve...@googlegroups.com
On Apr 25, 2017, at 5:57 PM, Hoobajoob <chefho...@gmail.com> wrote:

Cool - thank you for the reference! I'll take a look and see how we might approach this.

We're interested in this for supporting blob retrieval where any given blob might be many megabytes in size. We do our REST front ends in javascript, though, and the pattern in other areas is to support pluggable providers written as verticles that service a well-known event bus address.

For blobs, we're not sure exactly what to do because we think we'd introduce a custom codec to avoid copies on every chunked buffer message, but my understanding is that custom codecs aren't supported in javascript (is that right)?

they are supported if you register them from a Java, i.e a main verticle could register the codecs in Java and deploy the JavaScript verticle.

it also depends on what you send on the event bus from JS, most of the times it will be unwrapped by the JS shim.


If we try to implement the REST interface for blob retrieval in Java so we can use the custom codec, we'd need to use a separate verticle than the javascript verticle that currently sets up all the routes at that endpoint. I don't see a way to share an HTTP router instance between languages, and that seems to lead us to requiring a separate HTTP service instance (at least) serving on a different port, if not a separate jvm/vertx instance. This is not a choice we would make based on a separation-of-concerns consideration though, so it seems unfortunate.

HTTP router cannot be shared between Verticles event without polyglot (that being said, it’s a request that is often asked by community that would like to make more modular HTTP routers)


Is there a way to have a javascript REST front end making streaming requests across the event bus to a java streaming provider with a non-copying codec (say for vertx Buffers)? Or, are we correct in assuming we are going to have to introduce a new endpoint?

if you don’t copy, does it mean you are holding the stream data in memory to serve them ?

by new endpoint you mean http servers on different ports ? 


Thanks for any advice you might have!


On Tuesday, April 25, 2017 at 7:40:47 AM UTC-7, Julien Viet wrote:
Hi,

I totally forgot this thread :-)

I started a proof of concept about setting up a stream basd on event bus : https://github.com/vert-x3/vertx-service-proxy/tree/stream-protocol

I used the vertx-service-proxy project because I think it I see this similar to what already exist in service proxy with @ProxyClose and so this could be based (or compatible) with the service-proxy “protocol” (i.e how it uses the event bus) and also provide streaming support for vertx-service-proxy, in short an @ServiceProxy can return or consume a ReadStream, à la gRPC.

Julien


--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Hoobajoob

unread,
Apr 25, 2017, 12:25:39 PM4/25/17
to vert.x


On Tuesday, April 25, 2017 at 9:09:18 AM UTC-7, Julien Viet wrote:

[custom codecs] are supported if you register them from a Java, i.e a main verticle could register the codecs in Java and deploy the JavaScript verticle.

it also depends on what you send on the event bus from JS, most of the times it will be unwrapped by the JS shim.


Oh - this sounds good! I think we might use json for streaming control messages, and Buffers for the actual blob content. But, we haven't even started prototyping yet, so there's still some uncertainty here.

If we try to implement the REST interface for blob retrieval in Java so we can use the custom codec, we'd need to use a separate verticle than the javascript verticle that currently sets up all the routes at that endpoint. I don't see a way to share an HTTP router instance between languages, and that seems to lead us to requiring a separate HTTP service instance (at least) serving on a different port, if not a separate jvm/vertx instance. This is not a choice we would make based on a separation-of-concerns consideration though, so it seems unfortunate.

HTTP router cannot be shared between Verticles event without polyglot (that being said, it’s a request that is often asked by community that would like to make more modular HTTP routers)


I can sympathize with this request. We have been getting along so far without this support, but it makes it harder to run one deployment with an all-in-one configuration where all routes are served from one endpoint (host/port) and a distributed deployment where different url spaces get different port (and/or host) assignments.

Is there a way to have a javascript REST front end making streaming requests across the event bus to a java streaming provider with a non-copying codec (say for vertx Buffers)? Or, are we correct in assuming we are going to have to introduce a new endpoint?

if you don’t copy, does it mean you are holding the stream data in memory to serve them ?


I think we'd have to - perhaps a chunk at a time until the next chunk is requested, or something along those lines.
 
by new endpoint you mean http servers on different ports ? 


Yes - something that changes the authority segment of the URL. 

Hoobajoob

unread,
Apr 25, 2017, 12:37:15 PM4/25/17
to vert.x
> a main verticle could register the codecs in Java and deploy the JavaScript verticle.

...is the timing of this registration important? That is, can the registration be performed either before or after the JS verticle is deployed, as long as it is registered before the JS verticle sets up an event bus message consumer that would make use of it?

ry...@tagomi.com

unread,
Aug 8, 2018, 9:54:00 PM8/8/18
to vert.x
Hi Julien,

Just wondering what the status of this change is and if there are any plans to resume work on the streaming feature?

Thanks,
Ryan

Julien Viet

unread,
Aug 9, 2018, 5:52:14 PM8/9/18
to ve...@googlegroups.com
Hi,

this is part of the roadmap (https://github.com/vert-x3/wiki/wiki/Vert.x-Roadmap#streaming-over-eventbus) as an item that can be contributed by community.

i.e we do have other items more important for the coming months but we would be glad to have a contribution that we help to implement and review

Julien

Dai MIKURUBE

unread,
Oct 16, 2024, 8:53:46 AM10/16/24
to vert.x
Hi,

I was looking for a way of "Streaming a reply through the eventbus", and found this old mailing thread here.

Julien said that this was a part of the roadmap as of 2018, but the link seems no longer containing "Streaming over eventbus" now.

Wanted to know if the current Vert.x 4 already has some feature for streaming over eventbus.
If yes, do you have any document, reference, or examples for that?


Thanks.

2018年8月10日金曜日 6:52:14 UTC+9 Julien Viet:

Thomas SEGISMONT

unread,
Oct 17, 2024, 5:04:04 AM10/17/24
to ve...@googlegroups.com
Hi,

No, there is no feature in Vert.x 4 for this. Julien has made some experiments for Vert.x 5 but it was decided not pursue with this.

Regards,
Thomas

Dai MIKURUBE

unread,
Oct 17, 2024, 9:22:14 AM10/17/24
to ve...@googlegroups.com
Hi Thomas,

Sad news, but thanks! Got it.


On the other hand, is another approach still available in Vert.x to generate a temporary stream address at the eventbus consumer, and then to return the generated address to the eventbus requester?

I found some information about such an approach, but they were just starting points. Wondered if I could see an end-to-end example if available.



2024年10月17日(木) 18:04 Thomas SEGISMONT <tsegi...@gmail.com>:
You received this message because you are subscribed to a topic in the Google Groups "vert.x" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/vertx/KX0qopBJoTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to vertx+un...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/CACiEr_Te83_crkN78b1%2BNLh%2BvMidoybdTKyHtS-4mqShbH5imA%40mail.gmail.com.


--
Dai MIKURUBE
   dmik...@acm.org

Thomas SEGISMONT

unread,
Oct 18, 2024, 4:42:05 AM10/18/24
to ve...@googlegroups.com
I'm not knowledgeable with these solutions, I would recommend to talk to these users directly.

Reply all
Reply to author
Forward
0 new messages