--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
BTW another option for event bus flow control - as opposed to waiting for a consumer to reply to the producer to request more data - is to increment a counter on send and decrement the counter on reply. The counter can then serve as the maxQueueSize. Vertigo does something similar, but like the request-reply method it still maintains order by tagging messages with a monotonically increasing number.
--
Hey Jordan,
do you mean your own in-memory counter / semaphore? the only limitation there is it wouldn't work across a cluster i'd assume?
that said, the nice thing about semaphores is you can tune the 'window size' so you can have several messages active through a pipeline at once rather then just running in lock-step.
the regulator in the example i gave could allow for that also, it basically is running a semaphore on the local consumer that only controls when the reply() requesting the next batch is sent (rather then effecting a semaphore running in the remote producer).
it's a tricky problem since a complex pipeline, esp. one that amplifies data / latency (by running additional async operations) is asymmetric i.e. one message from the producer could generate a large amount of buffers to the eventual WriteStream so you can't just have a one-to-one mapping.
-Peter
On Friday, 23 May 2014 05:28:10 UTC+1, Jordan Halterman (kuujo) wrote:Reactive Streams is really exciting. I was curious about whether it would be a goal for Vert.x 3. That's great!BTW another option for event bus flow control - as opposed to waiting for a consumer to reply to the producer to request more data - is to increment a counter on send and decrement the counter on reply. The counter can then serve as the maxQueueSize. Vertigo does something similar, but like the request-reply method it still maintains order by tagging messages with a monotonically increasing number.
--
Hey Jordan,do you mean your own in-memory counter / semaphore? the only limitation there is it wouldn't work across a cluster i'd assume?
that said, the nice thing about semaphores is you can tune the 'window size' so you can have several messages active through a pipeline at once rather then just running in lock-step.
the regulator in the example i gave could allow for that also, it basically is running a semaphore on the local consumer that only controls when the reply() requesting the next batch is sent (rather then effecting a semaphore running in the remote producer).it's a tricky problem since a complex pipeline, esp. one that amplifies data / latency (by running additional async operations) is asymmetric i.e. one message from the producer could generate a large amount of buffers to the eventual WriteStream so you can't just have a one-to-one mapping.
-Peter
On Friday, 23 May 2014 05:28:10 UTC+1, Jordan Halterman (kuujo) wrote:Reactive Streams is really exciting. I was curious about whether it would be a goal for Vert.x 3. That's great!BTW another option for event bus flow control - as opposed to waiting for a consumer to reply to the producer to request more data - is to increment a counter on send and decrement the counter on reply. The counter can then serve as the maxQueueSize. Vertigo does something similar, but like the request-reply method it still maintains order by tagging messages with a monotonically increasing number.
--
> It's basically what TCP does: http://en.wikipedia.org/wiki/Sliding_Window_ProtocolYep - thats where I got the term from :-) Also means you could end up dealing with the same problems (slow-start etc) if you did try to adaptively size the window.I do wonder when it comes to streaming large amounts of data (eg from database or real-time event streams etc) whether the EventBus can be a suitable transport or if Vert.x 3 should have native stream support for remote services? (via an async driver api)