Stream.toList() issue

9 views
Skip to first unread message

Jon Poulton

unread,
Mar 10, 2015, 5:50:08 PM3/10/15
to reactor-...@googlegroups.com
Hi there,
I'm having difficulty getting using the Stream API to construct a response object in my service
wrapping responses provided by two downstream services.
 
Some code might be useful at this point. The following is the accept() method on my Channel Consumer.
Some names have been changed to protect the innocent..
 
    @Override
    public void accept(final NetChannel<FullHttpRequest, FullHttpResponse> channel) {
        log.info("Consuming NetChannel of FullHttpRequest");
        try {
            // Our initial Stream is a single HTTP request containing our XML document.
            Stream<FullHttpRequest> requestStream = channel.in();
 
            requestStream.filter((x) -> {return true;})
                    .dispatchOn(dispatcher)
                    .map(new FooRequestFunction())                    // 1)
                    .flatMap(new BarRequestStreamFunction())          // 2)
                    .flatMap(new DownstreamRequestZipFunction())      // 3)
                    .toList()                                         // 4)
                    .onComplete(new ResponsesConsumer(channel));      // 5)
                                                                               
        } catch (Exception e) {
            log.error("Exception thrown during Channel processing", e);
        }
    }
 
So, a FooRequest wraps many BarRequests, each of which has one associated Classify request, and one
associated Validate request. We want to 1) Convert to a FooRequest, 2) Convert the FooRequest to
a series of BarRequests, 3) Run two downstream requests for each BarRequest, 4) Aggregate all of
our BarResponse objects into an overall response, 5) send a response back out to the client.
 
The point at which I encounter problems is the toList() method, which never seems to execute.
Every time I've attempted something that involves a Promise is always seems to break, and this
has been no exception.
 
FooRequestFunction, BarRequestStreamFunction are fairly simple, and seem to run fine. Their
method signatures are:
 
// FooRequestFunction
public FooRequest apply(final FullHttpRequest request);
 
// BarRequestStreamFunction
public Stream<BarRequest> apply(FooRequest dsoRequests);
 
DownstreamRequestZipFunction looks like this:
 
    @Override
    public Stream<BarResponse> apply(BarRequest t) { 
        Stream<ClassifyResponse> classifyRes = Streams
                .just(t)
                .flatMap(new ClassifyDownstreamRequestFunction());
       
        Stream<ValidateResponse> validateRes = Streams
                .just(t)
                .flatMap(new ValidateDownstreamRequestFunction());
       
        return Streams.zip(classifyRes, validateRes, (tuple) -> {
            BarResponse response = new BarResponse();
            response.setClassifyRes(tuple.getT1());
            response.setValidateRes(tuple.getT2());
            return response;
        });
    }
               
This seems to work fine, as long as both of the downstream request functions return a result.
 
Finally, the Consumer at the end of the chained calls has this signature:
 
// ResponsesConsumer
public void accept(Promise<List<BarResponse>> responses)
 
What this does is await() the responses promise, and then aggregates all of those responses into
a single XML document written back to the channel. I can tell the execution never gets as far as
this method, because none of the logging fires. It all seems to stop at .toList().
 
Does anyone know why this set up ever seems to execute toList() or anything afterward?
Reply all
Reply to author
Forward
0 new messages