How to use ObservableHandler with web router

76 views
Skip to first unread message

Hoobajoob

unread,
Nov 12, 2015, 9:55:30 AM11/12/15
to vert.x
So, I'm trying to get proof of life for a simple get operation using vertx-web, together with RxJava observables and the core HttpServer. My problem is that, after setting up the route handler using RxHelper's ObservableHandler, the chain is only good for exactly one GET request, and further GET requests just block forever without ever invoking the handler again.

Obviously, I've jacked up something in how I'm setting up observables and subscriptions, but I've moved them around in different combinations and haven't yet got a setup that will process any more than one get request.

Where am I going wrong? 

        Route route = _router.route( "/items" ).method( HttpMethod.GET ).produces( "application/json" );

       
ObservableHandler<RoutingContext> getHandler = io.vertx.rx.java.RxHelper.observableHandler();

       
Observable<RoutingContext> observable = getHandler.doOnNext( context -> {
            _logger
.trace("Got a new request, enabling chunking");
            context
.response().setChunked( true );
       
} );

        observable
.subscribe(
                context
-> {
                   
HttpServerResponse response = context.response();
                   
List<Boolean> first = new ArrayList<Boolean>();
                    first
.add( true );

                    _repository
.findAll()
                   
.doOnNext( jsonItem -> {
                       
if ( first.get( 0 ) ) {
                            response
.write( "{\"items\": [" );
                       
}
                       
else {
                            response
.write( "," );
                       
}
                   
})
                   
.subscribe(
                            jsonItem
-> {
                                response
.write( jsonItem.toString() );
                           
},
                            error
-> {
                                _logger
.trace("Error during item stream", error);
                                response
.close();
                           
},
                           
() -> {
                                response
.write( "]}" );
                                response
.end();
                           
}
                   
);
               
}
       
);


        route
.handler( observable.toHandler() );
        _server
= vertx.createHttpServer( new HttpServerOptions().setCompressionSupported( true ) );
        _server
.requestHandler( _router::accept ).listen(8080);


        

Hoobajoob

unread,
Nov 12, 2015, 10:05:09 AM11/12/15
to vert.x
Correction to the .doOnNext() operator in the repository chain:

                    .doOnNext( jsonItem -> {
                       
if ( first.get( 0 ) ) {
                            response
.write( "{\"items\": [" );

                            first
.set(0, false); // CORRECTION

Hoobajoob

unread,
Nov 12, 2015, 11:07:47 PM11/12/15
to vert.x
Anyone? Anyone?

Bueller?


Bueller?
Reply all
Reply to author
Forward
0 new messages