So, I went through the "grokking rxjava" tutorials online and then have been mulling around through the RxJava wiki as needed to try and pick up how to work with the library. I think I have a very basic entry-level understanding of how the framework can be used.
But, I think I'm still pretty clueless on how to avoid nesting calls to subscribe starting from a root observable except in what seem, to me, to be somewhat simpler cases.
I'm using RxJava in the context of
Vert.x 3.1.0, which does async messaging between deployable components called verticles and has async libraries for network communications, database stuff and several other things.
So, a typical sequence to carry out goes like so:
1) I have an initial Observable which emits a request message when one is received from another component in the system.
2) On receipt of this message, I need to verify that the received message is valid and, if so, I need to...
3) ...establish a network connection to a remote component, which requires the production of a new Observable that will emit a socket when the connection is successfully established
4) When I receive the socket, I use it to send a request and this requires yet another observable so I can receive the response from the remote system. This can be a very long wait, as this remote request is to a work queue and there might be nothing to work on for some time.
5) When I receive the response, I package it up and send it to the original requestor, which requires yet another Observable which will emit the confirmation from the requestor that they received the item
6) Once I've received the response from the original requestor, then this sequence is done
So, my first question is: should I be expecting to be able to implement this series of exchanges without having nested subscriptions? Certainly, I'd like to be able to do that.
But if it's possible, it hasn't become clear to me how this can be done cleanly, due to the fact that for each step, the types produced by the observables and actions needed when the result is success versus failure are different at each juncture. Some steps on failure should attempt a retry of some kind, rather than giving up and responding with an error, which seems like it would require yet another tree of Observables and emitted types with different execution paths for the different possible results.
Here's an example of what I'm currently doing - it basically amounts to hiding the nested subscribe calls (in most cases) in the bodies of separate methods on the class where the root observable is used. Any suggestions for improving are welcome. I'm hoping one day to be able to look at this, slap my forehead, and chide myself for missing the obvious.
// STEP #1
_localRequests.subscribe(
message -> {
onNewRequest( message );
},
error -> {
_logger.warn("Error receiving message at address: {}", _localRequestsAddress );
}
);
// STEP #2
private void onNewRequest( Message<JsonObject> theMessage ) {
// validates the message, then forwards on to another method (see STEP #3 below) which sets up the next Observable
// or returns an error to the requestor if the received request was malformed
}
// STEP #3
_endpointUris.first()
.subscribe(
endpoint -> {
NetClient client = vertx.createNetClient();
client.connectObservable( endpoint.getPort(), endpoint.getHost() )
.subscribe(
socket -> {
onJobRequestConnectionEstablished( endpoint, socket, parameters );
},
error -> {
// TODO: connection failures should be invisible to the requestor,
// so work needed to continue retrying
_
logger.info("Unable to connect to queue at [{}], awaiting job requests", endpoint.toString() );
}
);
},
error -> {
// TODO: not sure this will occur, but if so proper handling needed
_logger.error( "Discovery returned empty list of endpoints - nothing to connect to", error );
vertx.eventBus().publish( _localEventsAddress, WaitingForEndpointsEvent );
}
);
// STEP #4: from the "onJobRequestConnectionEstablished()" method referenced in STEP #3 above
socket.toObservable()
.subscribe(
buffer -> {
String deliverTo = theParameters.getString(RequestParameterDeliverTo);
io.vertx.core.buffer.Buffer byteArrayBuffer = (io.vertx.core.buffer.Buffer) buffer.getDelegate();
String response = new String( byteArrayBuffer.getBytes(), StandardCharsets.UTF_8 );
_logger.trace("Got a job back: {}", response);
_logger.trace( "Forwarding job to requestor at [{}]", deliverTo );
// STEP #5
vertx.eventBus().<JsonObject>sendObservable( deliverTo, new JsonObject().put("result", jobResponse) )
.subscribe(
response -> {
_logger.trace( "Requestor at [{}] responded with {}",
deliverTo, response.body().toString() );
},
error -> {
// STEP #6
// TODO: handle retries on failure
_logger.error("Error receiving response from requestor at [{}]", deliverTo);
}
);
},
error -> {
_logger.error("Error receiving response", error);
}
);