Project Reactive Evaluation - newbie questions

75 views
Skip to first unread message

Jocelyn Lepage

unread,
Mar 16, 2015, 1:39:59 PM3/16/15
to reactor-...@googlegroups.com
Hi,

We are currently evaluating asynchronous frameworks to implement a high-throughput REST web service interacting with a couple of backend servers, and considering project reactive as our internal server backbone - I have a couple of questions about various subjects. I'm evaluating 2.0.0.M2.


1) Basic broadcaster operation and dispatcher thread(s):
I was toying with Broadcaster, using the default RingBufferDispatcher. I was assuming that any terminal Consumer (or intermediate processing steps) would be invoked from the dispatcher thread. 
I noticed that, when the Broadcaster only contains a terminal Consumer, the consumer is invoked synchronously within the thread that called onNext on the Broadcaster:

    Broadcaster<String> sink = Streams.broadcast(Environment.get());

    sink

      .consume(s -> {

        // Gets invoked from the thread calling sink.onNext() if this is the only element in the stream

        System.out.printf("%s: message is [%s]\n", Thread.currentThread(), s); 

      });


    for (int i=0; i<100000; i++)

      sink.onNext("Msg number " + i + "!");

However, as soon as there's an intermediary processing step (such as map or observe), then both the intermediary step(s) and terminal action gets called from the Dispatcher thread.

Is there a reason for this behavior or I do miss an important point?


2) Graceful Backpressure handling at the Web Service level
We plan to use Jersey 2 async resources backed by an embedded Jetty as our REST stack. Received REST requests will get asynchronously handled through a Broadcaster (or perhaps more likely an EventBus, TBD). However, in case or overload (Dispatcher buffer full), we'd like to be able to immediately answer with "503 Service Unavailable" and not block the Jetty thread used to publish on the Broadcaster or EventBus.

In other words, at a high-level, I'd like my REST handler to logically do this:
try {
  broadcaster.tryOnNext(request)
  // or eventBus.tryNotify(requestEvent)
} catch (NoCapacityExceptionOnDispatcher e) {
  return 503 right away
}

I noticed that the Dispatcher interface as a "tryDispatch" method that'd throw if no capacity, but this method doesn't seem to be used anywhere by other project reactor classes - any plan to leverage this feature? 

Any thought on this? Did I missed an important Reactor design point?


3) Instrumentation for monitoring
Our servers normally implements metrics (using dropwizards metrics) - and we'd like to expose data that would give a sense of what's going on, for example identify slow consumers or saturated dispatchers. 

Looks like to me that exposing Dispatcher's backlogSize and remainingSlots seems a starting point. I guess also actions doing some buffering might expose their buffer usage. Any other points you might consider here?


4) Thread affinity for EventBus
As mentioned, we're contemplating using the EventBus to have decoupled asynchronous event between different internal subsystems in our application.
The events doesn't need to be strictly processed in order, but events related to a given user need to (if we want to avoid some form of locking within consumers). 

To my understanding, streams do offer this using Stream.groupBy and having each substream assigned a different dispatcher (such as a pooled one form Environment.cachedDispatcher()), e.g.:
    Broadcaster<String> sink = Streams.broadcast(Environment.get());

    sink

      .groupBy(s -> { return keyFor(s); })

      .consume(stream -> {

        stream

          .dispatchOn(Environment.get().getCachedDispatcher())

          // I want actual consumption to be done on the dispatcher's thread - so need a non-terminal action in between

          .observe(s -> {  })

          .consume(s -> {

            System.out.printf("%s: Consumed final data [%s] (key=[%s])\n", currentThread().getName(), s, keyFor(s));

          });          

      });


However, I don't see an equivalent for EventBus - I think that would be equally useful. Is there any way to implement a similar scheme with Event Bus?


Thanks in advance for your time and collaboration, it is very appreciated!
Don't hesitate to put references to existing docs - I read most of them but experimented only a little - I likely might have missed some important points...

Jocelyn
Reply all
Reply to author
Forward
0 new messages