Issue with Stream.toList()

7 views
Skip to first unread message

Jon Poulton

unread,
Mar 10, 2015, 4:16:12 PM3/10/15
to reactor-...@googlegroups.com

Hi there,
I'm having difficulty getting using the Stream API to construct a response object in my service wrapping responses provided by two downstream services.

Some code might be useful at this point. The following is the accept() method on my Channel Consumer. Some names have been changed to protect the innocent..

    @Override
   
public void accept(final NetChannel<FullHttpRequest, FullHttpResponse> channel) {
        log
.info("Consuming NetChannel of FullHttpRequest");
       
try {
           
Stream<FullHttpRequest> requestStream = channel.in();

            requestStream
.filter((x) -> {return true;})
                   
.dispatchOn(dispatcher)
                   
.map(new FooRequestFunction())                    // 1)
                   
.flatMap(new BarRequestStreamFunction())          // 2)
                   
.flatMap(new DownstreamRequestZipFunction())      // 3)
                   
.toList()                                         // 4)
                   
.onComplete(new ResponsesConsumer(channel));      // 5)
     
       
} catch (Exception e) {
            log
.error("Exception thrown during Channel processing", e);
       
}
   
}


So, a FooRequest wraps many BarRequests, each of which has one associated Classify request, and one
associated Validate request. We want to 1) Convert to a FooRequest, 2) Convert the FooRequest to
a series of BarRequests, 3) Run two downstream requests for each BarRequest, 4) Aggregate all of
our BarResponse objects into an overall response, 5) send a response back out to the client.

The point at which I encounter problems is the toList() method, which never seems to execute. Every time I've attempted something that involves a Promise is always seems to break, and this has been no exception.

FooRequestFunction, BarRequestStreamFunction are fairly simple, and seem to run fine. Their method signatures are:

// FooRequestFunction
public FooRequest apply(final FullHttpRequest request);

// BarRequestStreamFunction
public Stream<BarRequest> apply(FooRequest requests);

DownstreamRequestZipFunction looks like this:

    @Override
   
public Stream<BarResponse> apply(BarRequest t) {  
       
Stream<ClassifyResponse> classifyRes = Streams
               
.just(t)
               
.flatMap(new ClassifyDownstreamRequestFunction());
       
       
Stream<ValidateResponse> validateRes = Streams
               
.just(t)
               
.flatMap(new ValidateDownstreamRequestFunction());
       
       
return Streams.zip(classifyRes, validateRes, (tuple) -> {
           
BarResponse response = new BarResponse();
            response
.setClassifyRes(tuple.getT1());
            response
.setValidateRes(tuple.getT2());
           
return response;
       
});
   
}

This seems to work fine, as long as both of the downstream request functions return a result.

Finally, the Consumer at the end of the chained calls has this signature:

// ResponsesConsumer
public void accept(Promise<List<BarResponse>> responses);

What this does is await() the responses promise, and then aggregates all of those responses into a single XML document written back to the channel. I can tell the execution never gets as far as this method, because none of the logging fires. It all seems to stop at .toList().

Does anyone know why this set up ever seems to execute toList() or anything afterward?

Reply all
Reply to author
Forward
0 new messages