Vertx message bus streaming

692 views
Skip to first unread message

Christopher Probst

unread,
May 22, 2014, 6:53:54 AM5/22/14
to ve...@googlegroups.com
Hi,

what can I do to control the flow of the event bus ? Sockets, Files, etc. can be queried using Pump, which is nice, but what happend if the event bus is not able to handle a high amount of message (for instance streaming of a file over the EB) ? And especially using hazelcast... In my understanding the only way to recognize overload is an OOM exception, right ?

Mike Milinkovich

unread,
May 22, 2014, 9:29:12 AM5/22/14
to ve...@googlegroups.com, IoT Working Group mailing list

Arno Schulz

unread,
May 22, 2014, 9:56:47 AM5/22/14
to ve...@googlegroups.com
Pretty cool, and they seem to be hiring people (Ottawa - Canada) for this aswell




--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

petermd

unread,
May 22, 2014, 12:17:56 PM5/22/14
to ve...@googlegroups.com
Hi Chris,

TBH I'm not sure sending a stream of data over the EB (esp. in a cluster) would necessarily be an efficient design but you can implement some flow control to avoid overload.

Essentially the Producer should provide a callback mechanism (eg a reply handler in the EventBus) where the Consumer can request more data after the initial response has been processed. You then need to manage the feedback from the WriteStream for the socket to limit the release of data. It's not fine-grained control but it does ensure the operation will run inside an upper-bound of memory usage.

The latest mod-rxvertx has an example that streams data from an EventBus handler to a rate-limited WriteStream using such a feedback mechanism (health-warning: i've not used it in anger - it's just a demonstration)


There is a project underway (http://www.reactive-streams.org/) that will potentially provide a more comprehensive solution and hopefully will be part of Vert.x 3 

-Peter

Mark Little

unread,
May 22, 2014, 12:19:47 PM5/22/14
to ve...@googlegroups.com, IoT Working Group mailing list
Very nice.

Mark.
> --
> You received this message because you are subscribed to the Google Groups "vert.x" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

Tim Fox

unread,
May 22, 2014, 1:47:35 PM5/22/14
to ve...@googlegroups.com
Great!

Christopher Probst

unread,
May 22, 2014, 3:24:14 PM5/22/14
to ve...@googlegroups.com
Hi!

Thanks for your reply =). I was just wondering if there is some technique(for instance dropping messages) to prevent overloading the event bus. Imagine many many vertices pushing a lot of message into the event bus. Basically the only way you would notice the overload would be an OOM.

Jordan Halterman (kuujo)

unread,
May 23, 2014, 12:28:10 AM5/23/14
to ve...@googlegroups.com
Reactive Streams is really exciting. I was curious about whether it would be a goal for Vert.x 3. That's great!

BTW another option for event bus flow control - as opposed to waiting for a consumer to reply to the producer to request more data - is to increment a counter on send and decrement the counter on reply. The counter can then serve as the maxQueueSize. Vertigo does something similar, but like the request-reply method it still maintains order by tagging messages with a monotonically increasing number.

Tim Fox

unread,
May 23, 2014, 1:49:39 AM5/23/14
to ve...@googlegroups.com
You can do your own flow control over the event bus, for an example see the eb_perf demo in the examples repo.
--

Tim Fox

unread,
May 23, 2014, 1:56:12 AM5/23/14
to ve...@googlegroups.com
On 23/05/14 05:28, Jordan Halterman (kuujo) wrote:
> Reactive Streams is really exciting. I was curious about whether it would be a goal for Vert.x 3. That's great!
>
> BTW another option for event bus flow control - as opposed to waiting for a consumer to reply to the producer to request more data - is to increment a counter on send and decrement the counter on reply. The counter can then serve as the maxQueueSize.

+1. This is basically rolling your own flow control :)

petermd

unread,
May 23, 2014, 5:26:06 AM5/23/14
to ve...@googlegroups.com
Hey Jordan,

do you mean your own in-memory counter / semaphore? the only limitation there is it wouldn't work across a cluster i'd assume?

that said, the nice thing about semaphores is you can tune the 'window size' so you can have several messages active through a pipeline at once rather then just running in lock-step.  the regulator in the example i gave could allow for that also, it basically is running a semaphore on the local consumer that only controls when the reply() requesting the next batch is sent (rather then effecting a semaphore running in the remote producer).

it's a tricky problem since a complex pipeline, esp. one that amplifies data / latency (by running additional async operations) is asymmetric i.e. one message from the producer could generate a large amount of buffers to the eventual WriteStream so you can't just have a one-to-one mapping.

-Peter

Tim Fox

unread,
May 23, 2014, 6:18:00 AM5/23/14
to ve...@googlegroups.com
On 23/05/14 10:26, petermd wrote:
Hey Jordan,

do you mean your own in-memory counter / semaphore? the only limitation there is it wouldn't work across a cluster i'd assume?

that said, the nice thing about semaphores is you can tune the 'window size' so you can have several messages active through a pipeline at once rather then just running in lock-step.

 the regulator in the example i gave could allow for that also, it basically is running a semaphore on the local consumer that only controls when the reply() requesting the next batch is sent (rather then effecting a semaphore running in the remote producer).

it's a tricky problem since a complex pipeline, esp. one that amplifies data / latency (by running additional async operations) is asymmetric i.e. one message from the producer could generate a large amount of buffers to the eventual WriteStream so you can't just have a one-to-one mapping.

-Peter

On Friday, 23 May 2014 05:28:10 UTC+1, Jordan Halterman (kuujo) wrote:
Reactive Streams is really exciting. I was curious about whether it would be a goal for Vert.x 3. That's great!

BTW another option for event bus flow control - as opposed to waiting for a consumer to reply to the producer to request more data - is to increment a counter on send and decrement the counter on reply. The counter can then serve as the maxQueueSize. Vertigo does something similar, but like the request-reply method it still maintains order by tagging messages with a monotonically increasing number.

--

petermd

unread,
May 23, 2014, 7:09:29 AM5/23/14
to ve...@googlegroups.com
Yep - thats where I got the term from :-) Also means you could end up dealing with the same problems (slow-start etc) if you did try to adaptively size the window.

I do wonder when it comes to streaming large amounts of data (eg from database or real-time event streams etc) whether the EventBus can be a suitable transport or if Vert.x 3 should have native stream support for remote services? (via an async driver api)

-Peter

Jordan Halterman

unread,
May 23, 2014, 1:14:48 PM5/23/14
to ve...@googlegroups.com


On May 23, 2014, at 2:26 AM, petermd <pet...@gmail.com> wrote:

Hey Jordan,

do you mean your own in-memory counter / semaphore? the only limitation there is it wouldn't work across a cluster i'd assume?
Why not? I think I see what you're saying. Vertigo does this on a per-"connection" basis. That is, it's not an upper limit on the event bus as a whole - Vertigo only has control over the messaging within Vertigo components anyways - it's an upper limit on a unique channel between two component instances. So, it definitely doesn't preclude OOM if you have 100 verticles pumping messages in every direction. This method has more to do with placing at least some sort of limit on the event bus - though only between a single sender/receiver - and maintaining order while still allowing multiple messages to be in transit between verticles.

that said, the nice thing about semaphores is you can tune the 'window size' so you can have several messages active through a pipeline at once rather then just running in lock-step.
This is mostly what I was looking for. No need for a round trip between messages. I actually did quite a bit of performance testing and batch acking messages (rather than request-reply for each message) was a huge benefit. Of course, this type of thing I think is uniquely easier for Vertigo to handle since it requires more bi-directional coordination than just request-reply, but it can be simplified.

I actually took some of the thinking from the way Raft followers resolve log entry order with the leader if you think of the "sender" as the leader and the "receiver" as a follower. That is, respond immediately if a message is out of order, respond on a timer if messages are in order, though it's obviously not really *that* similar:


 the regulator in the example i gave could allow for that also, it basically is running a semaphore on the local consumer that only controls when the reply() requesting the next batch is sent (rather then effecting a semaphore running in the remote producer).

it's a tricky problem since a complex pipeline, esp. one that amplifies data / latency (by running additional async operations) is asymmetric i.e. one message from the producer could generate a large amount of buffers to the eventual WriteStream so you can't just have a one-to-one mapping.
Yep this is one issue. It's definitely an imperfect tool!

-Peter

On Friday, 23 May 2014 05:28:10 UTC+1, Jordan Halterman (kuujo) wrote:
Reactive Streams is really exciting. I was curious about whether it would be a goal for Vert.x 3. That's great!

BTW another option for event bus flow control - as opposed to waiting for a consumer to reply to the producer to request more data - is to increment a counter on send and decrement the counter on reply. The counter can then serve as the maxQueueSize. Vertigo does something similar, but like the request-reply method it still maintains order by tagging messages with a monotonically increasing number.

--

Jordan Halterman

unread,
May 23, 2014, 1:15:56 PM5/23/14
to ve...@googlegroups.com


On May 23, 2014, at 4:09 AM, petermd <pet...@gmail.com> wrote:


Yep - thats where I got the term from :-) Also means you could end up dealing with the same problems (slow-start etc) if you did try to adaptively size the window.

I do wonder when it comes to streaming large amounts of data (eg from database or real-time event streams etc) whether the EventBus can be a suitable transport or if Vert.x 3 should have native stream support for remote services? (via an async driver api)
Something like this may be more ideal!
Reply all
Reply to author
Forward
0 new messages