So, I'm trying to get proof of life for a simple get operation using vertx-web, together with RxJava observables and the core HttpServer. My problem is that, after setting up the route handler using RxHelper's ObservableHandler, the chain is only good for exactly one GET request, and further GET requests just block forever without ever invoking the handler again.
Obviously, I've jacked up something in how I'm setting up observables and subscriptions, but I've moved them around in different combinations and haven't yet got a setup that will process any more than one get request.
Where am I going wrong?
Route route = _router.route( "/items" ).method( HttpMethod.GET ).produces( "application/json" );
ObservableHandler<RoutingContext> getHandler = io.vertx.rx.java.RxHelper.observableHandler();
Observable<RoutingContext> observable = getHandler.doOnNext( context -> {
_logger.trace("Got a new request, enabling chunking");
context.response().setChunked( true );
} );
observable.subscribe(
context -> {
HttpServerResponse response = context.response();
List<Boolean> first = new ArrayList<Boolean>();
first.add( true );
_repository.findAll()
.doOnNext( jsonItem -> {
if ( first.get( 0 ) ) {
response.write( "{\"items\": [" );
}
else {
response.write( "," );
}
})
.subscribe(
jsonItem -> {
response.write( jsonItem.toString() );
},
error -> {
_logger.trace("Error during item stream", error);
response.close();
},
() -> {
response.write( "]}" );
response.end();
}
);
}
);
route.handler( observable.toHandler() );
_server = vertx.createHttpServer( new HttpServerOptions().setCompressionSupported( true ) );
_server.requestHandler( _router::accept ).listen(8080);