Stream tail calls never executed

16 views
Skip to first unread message

Jon Poulton

unread,
Mar 11, 2015, 11:34:44 AM3/11/15
to reactor-...@googlegroups.com
Hi, 
I tried asking the question below yesterday, but for some reason Google Groups just was not playing ball. I asked it on StackOverflow, and so far have had no response.

I'm having difficulty getting using the Spring Reactor Stream API (similar to rxjava) to construct a response object in my service wrapping responses provided by two downstream services.
 
The following is the `accept()` method on my Channel Consumer. Some names have been changed to protect the innocent..
 
@Override
public void accept(final NetChannel<FullHttpRequest, FullHttpResponse> channel) {
    log
.info("Consuming NetChannel of FullHttpRequest");
   
try {
       
// Our initial Stream is a single HTTP request containing our XML document.
       
Stream<FullHttpRequest> requestStream = channel.in();
 
        requestStream
.filter((x) -> {return true;})
               
.dispatchOn(dispatcher)
               
.map(new FooRequestFunction())                    // 1)
               
.flatMap(new BarRequestStreamFunction())          // 2)
               
.flatMap(new DownstreamRequestZipFunction())      // 3)
               
.toList()                                         // 4)
               
.onComplete(new ResponsesConsumer(channel));      // 5)
                                                                           
   
} catch (Exception e) {
        log
.error("Exception thrown during Channel processing", e);
   
}
}

 
So, a `FooRequest` wraps many `BarRequests`, each of which has one associated Classify request, and one associated Validate request. We want to 1) Convert to a `FooRequest`, 2) Convert the `FooRequest` to a series of `BarRequests`, 3) Run two downstream requests for each `BarRequest`, 4) Aggregate all of our `BarResponse` objects into an overall response, 5) send a response back out to the client.
 
The point at which I encounter problems is the `toList()` method, which never seems to execute. Every time I've attempted something that involves a `Promise` it always seems to break, and this has been no exception.
 
`FooRequestFunction`, `BarRequestStreamFunction` are fairly simple, and seem to run fine. Their method signatures are:
 
    // FooRequestFunction
   
public FooRequest apply(final FullHttpRequest request);


And:

    // BarRequestStreamFunction
   
public Stream<BarRequest> apply(FooRequest dsoRequests);


`DownstreamRequestZipFunction` looks like this:

@Override
public Stream<BarResponse> apply(BarRequest t) {
   
Stream<ClassifyResponse> classifyRes = Streams
           
.just(t)
           
.flatMap(new ClassifyDownstreamRequestFunction());
   
   
Stream<ValidateResponse> validateRes = Streams
           
.just(t)
           
.flatMap(new ValidateDownstreamRequestFunction());
   
   
return Streams.zip(classifyRes, validateRes, (tuple) -> {
       
BarResponse response = new BarResponse();
        response
.setClassifyRes(tuple.getT1());
        response
.setValidateRes(tuple.getT2());
       
return response;
   
});
}

      
This seems to work fine, as long as both of the downstream request functions return a result.
 
Finally, the Consumer at the end of the chained calls has this signature:

    // ResponsesConsumer
   
public void accept(Promise<List<BarResponse>> responses)


What this does is await() the responses promise, and then aggregates all of those responses into a single XML document written back to the channel. I can tell the execution never gets as far as this method, because none of the logging fires. It all seems to stop at .toList().
 
Does anyone know why this set up ever seems to execute `toList()` or anything afterwards?

EDIT: Okay, I have a bit more information. After supplying a naming convention to each thread in the application to make debugging easier, I can see that "shared-1", the thread that runs the accept() method enters into a WAITING state, and then stays there. This could be something to do with the fact that the underlying Dispatcher is a ringbuffer dispatcher, which is single threaded. 

I modified the code so that the approach was slightly different, and used a multithreaded dispatcher, and avoided using a `Promise`, but I still have a state in which the tail of the chained set of calls will not execute. See below:

@Override
public void accept(final NetChannel<FullHttpRequest, FullHttpResponse> channel) {
    log
.info("Consuming NetChannel of FullHttpRequest");
   
try {
       
// Our initial Stream is a single HTTP request containing our XML document.
       
Stream<FullHttpRequest> requestStream = channel.in();
 
        requestStream
.filter((x) -> {return true;})
               
.dispatchOn(dispatcher)
               
.map(new FooRequestFunction())                    // 1)
               
.flatMap(new BarRequestStreamFunction())          // 2)
               
.flatMap(new DownstreamRequestZipFunction())      // 3)
               
.reduce(new ArrayList<BarResponse>(), (list,resp) -> {log.info("Reducing"); list.add(resp); return list;})                                        // 4)
               
.consumeOn((x)->{log.info("Consume");}, (x)->{log.error("error");}, (x)->{log.info("Complete");}, dispatcher);      // 5)
                                                                           
   
} catch (Exception e) {
        log
.error("Exception thrown during Channel processing", e);
   
}
}


In the above, I've replaced the toList() with a call to reduce() and collapsed everything into a single `List<BarResponse>`. I can see this executing and logging just fine. However, no matter what I do with the last call, after trying consume(), consumeOn() etc - it never executes, and never logs the final calls you see above. 

Looking in VisualVM I can see that the dispatcher threads are all waiting on the same object monitor associated with a blocking queue - in other words, they're all waiting for work to arrive. It's like the tail consumeOn() call is completely ignored. 

What am I doing wrong here? What am I not understanding?
Reply all
Reply to author
Forward
0 new messages