Reactor application architecture

161 views
Skip to first unread message

Jon Poulton

unread,
Mar 3, 2015, 11:27:35 AM3/3/15
to reactor-...@googlegroups.com
Hi all,

I have a question regarding higher level application design. Some context might be helpful. I'm working on a proxy server that takes a large chunk of XML from a HTTP/REST POST, breaks it up into smaller 'n' chunks of XML, and sends them on to downstream services. It then waits until all the 'n' downstream services return some XML of their own, assembles an XML document from the responses, and then writes a response back out to the original client. 

So, at the moment, my proxy server uses reactor from end to end, and I'm facing some design decisions that I can't seem to work around without things getting ugly. I have a feeling that I'm not looking in the right places or not taking the right approach, probably due to my level of understanding of the framework and the pattern of thinking required. 

In the following, Reactor classes/methods are highlighted in bold, my own in italics. 

At the moment I have one EventBus (which I see used to be called Reactor). I have a NetServer<FullHttpRequest,FullHttpResponse> with an associated Consumer<NetChannel<FullHttpRequest,FullHttpResponse>> that consumes the POST by reading the XML into memory. I then create a "RequestContext" object containing the request and response, plus the XML, which I then push onto the bus with a Selector called "process". A DownstreamRequestsFunction associated with that selector then takes the RequestContext, extracts the XML, chops it up into smaller 'n' XML docs, makes 'n' HTTP requests, waits for each response, assembles the response document, and returns the result. The result then gets routed to a CompletedResponseConsumer (the function/reactor was invoked with sendAndReceive() that supplies the Consumer) which writes the XML out to the FullHttpResponse.

There is a lot wrong with this. I'll go over my issues one at a time:

1) Poor separation of concerns. Some of the Functions or Consumers have a direct reference to the EventBus, push to it directly and set up something to receive a reply, others seem to interact with it indirectly by receiving an Event and returning a result. I've noticed that you can set the replyTo property on an Event to route your function response somewhere else, but the API only allows you to route an Event from one Function to one Consumer. There does not seem to be a way to route an Event between a chain of 'n' functions, each making its own changes, which is how I expected an EventBus to be used. If I wanted to link 'n' components together using the EventBus, then it looks like what I'd have to do is to create 'n' pairs of Functions & Consumers, then set the Consumers up to be the "replyTo" for the corresponding Function, and then just get the Consumer to push the contents of the Event back onto the bus as a new Event. This seems really clunky to me, as the Consumers only exist to further route Events, and must themselves hold handles to the EventBus

2) Wrapper objects vs. Event headers. It looks like you can add arbitrary headers to an Event in a map like fashion. Putting the NetChannel<FullHttpRequest,FullHttpResponse> (so you can write the response at the end) on the Event headers might be a better approach than passing it around as part of a wrapper object in turn wrapped by the Event, but I don't know if this is the sort of thing the Event headers were intended to hold.

3) Synchronous vs. Asynchronous downstream requests. The proxy generates many synchronous downstream requests, which needs to change as it hurts performance. Any asynchronous approach I come up with seems inadequate. I did think of having some kind of "global" "Collector" class, backed by a "ConcurrentMap", which retains a unique key associated with a wrapper object containing the NetChannel<FullHttpRequest,FullHttpResponse>, and can figure out when all downstream responses have completed. But again, this seems like a bad code smell to me. If you have to have a global Map to manage what your program is doing, you've probably messed up somewhere. 

Does anyone more familiar with Reactive-based software architecture have any thoughts on where I am going wrong with the above? Is there any part of the API that I've missed that could help with some of the issues I've outlined?

Thanks in advance for any responses. 

Jon

Jon Brisbin

unread,
Mar 3, 2015, 11:34:57 AM3/3/15
to Jon Poulton, Reactor Framework
First question which has huge implications for later answers: is there a reason to prefer EventBus over a Stream? In general, EventBus is for a very specific use case (when you don’t have any way to directly access your event handlers). It’s more like a sink for topic-based assignment and is not meant to be used as a general-purpose reactive pipelining utility. That’s what Stream is for.

FWIW We’re working with Netflix right now on a common networking kernel that uses Reactive Streams and various kinds of composition libraries (Rx, Reactor, whatever) to make this kind of thing easier. That doesn’t help you right now, I know, but it’s something to keep in mind.


Thanks!

Jon Brisbin
Reactor Project Lead

about.me/jonbrisbin | @j_brisbin | @ProjectReactor

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jon Poulton

unread,
Mar 3, 2015, 11:54:38 AM3/3/15
to reactor-...@googlegroups.com, jsp...@gmail.com
Hi Jon,
From what I understand, if I attempted to use Streams, that would mean that the processing logic, request generation, response aggregation, etc. would all take place in the same thread. This would mean that the thread that handled the request would be too busy processing the application logic to respond to other concurrent requests. One goal for the proxy is to be able to handle 10K concurrent requests. So, from what I could see, its better to push the result of the parsed request onto the bus, return immediately, and let other background threads deal with the rest of the application logic before asynchronously writing the response. 

Also, from a code maintenance and debugging perspective, having your entire application as a bunch of chained requests on the Stream API didn't seem like a good choice. I'm new to reactive programming, as well as reactor, and admit I could be wrong, but this stood out to me as a potential issue. 

Coincidentally, on our morning stand-up just now I was talking about what I was working on and saying that Netflix must have encountered this problem before, and presumably had some kind of general solution for it. We just need to figure out what that approach is, and hopefully it will be applicable. 

Cheers

Jon

Jon Brisbin

unread,
Mar 3, 2015, 3:24:47 PM3/3/15
to Jon Poulton, Reactor Framework
On Mar 3, 2015, at 10:54 AM, Jon Poulton <jsp...@gmail.com> wrote:

Hi Jon,
From what I understand, if I attempted to use Streams, that would mean that the processing logic, request generation, response aggregation, etc. would all take place in the same thread. This would mean that the thread that handled the request would be too busy processing the application logic to respond to other concurrent requests. One goal for the proxy is to be able to handle 10K concurrent requests. So, from what I could see, its better to push the result of the parsed request onto the bus, return immediately, and let other background threads deal with the rest of the application logic before asynchronously writing the response. 


Here’s an example of using a Stream to succinctly parse incoming data on the request handling thread (here using Ratpack, which uses Netty underneath), transition to a worker thread for the remainder of the processing and the render the result of a query run in that other thread by sending a Stream back to Ratpack. I’ve written a special renderer for Streams which calls the necessary Ratpack code to ensure that the work done in another thread is wired back to the internal Netty context properly.




Also, from a code maintenance and debugging perspective, having your entire application as a bunch of chained requests on the Stream API didn't seem like a good choice. I'm new to reactive programming, as well as reactor, and admit I could be wrong, but this stood out to me as a potential issue. 


This seems pretty maintainable to me! ;)



Coincidentally, on our morning stand-up just now I was talking about what I was working on and saying that Netflix must have encountered this problem before, and presumably had some kind of general solution for it. We just need to figure out what that approach is, and hopefully it will be applicable. 


The Netflix solution varies at the moment. But currently it centers around the Observable, which is very similar to Reactor’s Stream. An example of the above that uses Observable would look nearly identical as far as API goes. They currently use Tomcat but are in a process to rewrite things to use Netty. Since we’re working on this with them, their approach and ours will merge in the near future.

Over the next couple months, Reactor and Reactive Streams contributors will be putting together the guts of a reusable networking kernel which hides ALL of these details and gives you a much, much easier way to write network apps using the composition library of choice. You could choose RxJava or, as the community grows, Groovy, Clojure or others.

Jon Poulton

unread,
Mar 11, 2015, 11:54:55 AM3/11/15
to reactor-...@googlegroups.com
Not sure if this will submit, but just an FYI, all new topics submitted to this list are failing to appear (at least from my end). I'm not sure if this is a wider Google Groups issue. 

Jon

Jon Poulton

unread,
Mar 11, 2015, 11:58:49 AM3/11/15
to reactor-...@googlegroups.com
As my last post did actually submit to the list, I hope no one minds if I pull this thread off on a tangent to do with a different issue I'm having with the application described above. I asked this on StackOverflow, and so far have had no response. See below:

I'm having difficulty getting using the Spring Reactor Stream API (similar to rxjava) to construct a response object in my service wrapping responses provided by two downstream services.
 
The following is the `accept()` method on my Channel Consumer. Some names have been changed to protect the innocent..
 
@Override
public void accept(final NetChannel<FullHttpRequest, FullHttpResponse> channel) {
    log
.info("Consuming NetChannel of FullHttpRequest");
   
try {
       
// Our initial Stream is a single HTTP request containing our XML document.
       
Stream<FullHttpRequest> requestStream = channel.in();
 
        requestStream
.filter((x) -> {return true;})
               
.dispatchOn(dispatcher)
               
.map(new FooRequestFunction())                    // 1)
               
.flatMap(new BarRequestStreamFunction())          // 2)
               
.flatMap(new DownstreamRequestZipFunction())      // 3)
               
.toList()                                         // 4)
               
.onComplete(new ResponsesConsumer(channel));      // 5)
                                                                           
   
} catch (Exception e) {
        log
.error("Exception thrown during Channel processing", e);
   
}
}

 
So, a `FooRequest` wraps many `BarRequests`, each of which has one associated Classify request, and one associated Validate request. We want to 1) Convert to a `FooRequest`, 2) Convert the `FooRequest` to a series of `BarRequests`, 3) Run two downstream requests for each `BarRequest`, 4) Aggregate all of our `BarResponse` objects into an overall response, 5) send a response back out to the client.
 
The point at which I encounter problems is the `toList()` method, which never seems to execute. Every time I've attempted something that involves a `Promise` it always seems to break, and this has been no exception.
 
`FooRequestFunction`, `BarRequestStreamFunction` are fairly simple, and seem to run fine. Their method signatures are:
 
    // FooRequestFunction
    public FooRequest apply(final FullHttpRequest request);

And:

    // BarRequestStreamFunction
    public Stream<BarRequest> apply(FooRequest dsoRequests);

`DownstreamRequestZipFunction` looks like this:

@Override
public Stream<BarResponse> apply(BarRequest t) {
   
Stream<ClassifyResponse> classifyRes = Streams
           
.just(t)
           
.flatMap(new ClassifyDownstreamRequestFunction());
   
   
Stream<ValidateResponse> validateRes = Streams
           
.just(t)
           
.flatMap(new ValidateDownstreamRequestFunction());
   
   
return Streams.zip(classifyRes, validateRes, (tuple) -> {
       
BarResponse response = new BarResponse();
        response
.setClassifyRes(tuple.getT1());
        response
.setValidateRes(tuple.getT2());
       
return response;
   
});
}

      
This seems to work fine, as long as both of the downstream request functions return a result.
 
Finally, the Consumer at the end of the chained calls has this signature:

    // ResponsesConsumer
    public void accept(Promise<List<BarResponse>> responses)

What this does is await() the responses promise, and then aggregates all of those responses into a single XML document written back to the channel. I can tell the execution never gets as far as this method, because none of the logging fires. It all seems to stop at .toList().
 
Does anyone know why this set up ever seems to execute `toList()` or anything afterwards?

EDIT: Okay, I have a bit more information. After supplying a naming convention to each thread in the application to make debugging easier, I can see that "shared-1", the thread that runs the accept() method enters into a WAITING state, and then stays there. This could be something to do with the fact that the underlying Dispatcher is a ringbuffer dispatcher, which is single threaded. 

I modified the code so that the approach was slightly different, and used a multithreaded dispatcher, and avoided using a `Promise`, but I still have a state in which the tail of the chained set of calls will not execute. See below:

@Override
public void accept(final NetChannel<FullHttpRequest, FullHttpResponse> channel) {
    log
.info("Consuming NetChannel of FullHttpRequest");
   
try {
       
// Our initial Stream is a single HTTP request containing our XML document.
       
Stream<FullHttpRequest> requestStream = channel.in();
 
        requestStream
.filter((x) -> {return true;})
               
.dispatchOn(dispatcher)
               
.map(new FooRequestFunction())                    // 1)
               
.flatMap(new BarRequestStreamFunction())          // 2)
               
.flatMap(new DownstreamRequestZipFunction())      // 3)
               
.reduce(new ArrayList<BarResponse>(), (list,resp) -> {log.info("Reducing"); list.add(resp); return list;})                                        // 4)
               
.consumeOn((x)->{log.info("Consume");}, (x)->{log.error("error");}, (x)->{log.info("Complete");}, dispatcher);      // 5)
                                                                           
   
} catch (Exception e) {
        log
.error("Exception thrown during Channel processing", e);
   
}
}


In the above, I've replaced the toList() with a call to reduce() and collapsed everything into a single `List<BarResponse>`. I can see this executing and logging just fine. However, no matter what I do with the last call, after trying consume(), consumeOn() etc - it never executes, and never logs the final calls you see above. 

Looking in VisualVM I can see that the dispatcher threads are all waiting on the same object monitor associated with a blocking queue - in other words, they're all waiting for work to arrive. It's like the tail consumeOn() call is completely ignored. 

What am I doing wrong here? What am I not understanding?

Jon Poulton

unread,
Mar 11, 2015, 12:08:02 PM3/11/15
to reactor-...@googlegroups.com
Strange. My one line reply above seemed to submit just fine. But my subsequent post about the new issue that I'm having (about 107 lines of text and code) simply never appeared. If this one appears, then I describe the issue on Stackoverflow, here: http://stackoverflow.com/questions/28975139/terminal-calls-to-stream-never-execute

Jon Brisbin

unread,
Mar 11, 2015, 2:27:31 PM3/11/15
to Jon Poulton, Reactor Framework
It must be something about the content of your email it didn’t like (the code maybe?).

TL;DR of my response is: what happens when you comment out the .dispatchOn() call?


Thanks!

Jon Brisbin
Reactor Project Lead

about.me/jonbrisbin | @j_brisbin | @ProjectReactor

On Mar 11, 2015, at 11:08 AM, Jon Poulton <jsp...@gmail.com> wrote:

Strange. My one line reply above seemed to submit just fine. But my subsequent post about the new issue that I'm having (about 107 lines of text and code) simply never appeared. If this one appears, then I describe the issue on Stackoverflow, here: http://stackoverflow.com/questions/28975139/terminal-calls-to-stream-never-execute

Jon Poulton

unread,
Mar 11, 2015, 3:10:02 PM3/11/15
to Jon Brisbin, Reactor Framework
My email was plain text. The code was then highlighted using the syntax highlighter embedded in the Google Groups UI. The pop up message told me that my message had been submitted, and after a while the window said "Your message is taking a long time to process..", or words to that effect. Not really sure what's happening there.

I've replied on the StackOverflow thread to avoid confusion for future readers. Maybe if I reach I resolution I can submit the problem/resolution to the main mailing list. 

Cheers

Jon
Reply all
Reply to author
Forward
0 new messages