RxJava newbie question: avoiding nested subscribe() calls in messaging systems

1,845 views
Skip to first unread message

chefho...@gmail.com

unread,
Oct 24, 2015, 7:07:57 PM10/24/15
to RxJava
So, I went through the "grokking rxjava" tutorials online and then have been mulling around through the RxJava wiki as needed to try and pick up how to work with the library. I think I have a very basic entry-level understanding of how the framework can be used.

But, I think I'm still pretty clueless on how to avoid nesting calls to subscribe starting from a root observable except in what seem, to me, to be somewhat simpler cases.

I'm using RxJava in the context of Vert.x 3.1.0, which does async messaging between deployable components called verticles and has async libraries for network communications, database stuff and several other things.

So, a typical sequence to carry out goes like so:

1) I have an initial Observable which emits a request message when one is received from another component in the system.
2) On receipt of this message, I need to verify that the received message is valid and, if so, I need to...
3) ...establish a network connection to a remote component, which requires the production of a new Observable that will emit a socket when the connection is successfully established
4) When I receive the socket, I use it to send a request and this requires yet another observable so I can receive the response from the remote system. This can be a very long wait, as this remote request is to a work queue and there might be nothing to work on for some time.
5) When I receive the response, I package it up and send it to the original requestor, which requires yet another Observable which will emit the confirmation from the requestor that they received the item
6) Once I've received the response from the original requestor, then this sequence is done

So, my first question is: should I be expecting to be able to implement this series of exchanges without having nested subscriptions? Certainly, I'd like to be able to do that.

But if it's possible, it hasn't become clear to me how this can be done cleanly, due to the fact that for each step, the types produced by the observables and actions needed when the result is success versus failure are different at each juncture. Some steps on failure should attempt a retry of some kind, rather than giving up and responding with an error, which seems like it would require yet another tree of Observables and emitted types with different execution paths for the different possible results.

Here's an example of what I'm currently doing - it basically amounts to hiding the nested subscribe calls (in most cases) in the bodies of separate methods on the class where the root observable is used. Any suggestions for improving are welcome. I'm hoping one day to be able to look at this, slap my forehead, and chide myself for missing the obvious.

// STEP #1
_localRequests.subscribe(
message -> {
onNewRequest( message );
},
error -> {
_logger.warn("Error receiving message at address: {}", _localRequestsAddress );
}
);

        // STEP #2
private void onNewRequest( Message<JsonObject> theMessage ) {
                // validates the message, then forwards on to another method (see STEP #3 below) which sets up the next Observable
                // or returns an error to the requestor if the received request was malformed
        }

       // STEP #3
        _endpointUris.first()
    .subscribe(
endpoint -> {
NetClient client = vertx.createNetClient();
client.connectObservable( endpoint.getPort(), endpoint.getHost() )
     .subscribe(
        socket -> {
        onJobRequestConnectionEstablished( endpoint, socket, parameters );
        },
        error -> {
        // TODO: connection failures should be invisible to the requestor,
        //       so work needed to continue retrying
        _logger.info("Unable to connect to queue at [{}], awaiting job requests", endpoint.toString() );
        }
     );
},
error -> {
// TODO: not sure this will occur, but if so proper handling needed
_logger.error( "Discovery returned empty list of endpoints - nothing to connect to", error );
            vertx.eventBus().publish( _localEventsAddress, WaitingForEndpointsEvent );
}
    );

        // STEP #4: from the "onJobRequestConnectionEstablished()" method referenced in STEP #3 above
socket.toObservable()
  .subscribe(
  buffer -> {
  String deliverTo = theParameters.getString(RequestParameterDeliverTo);

  io.vertx.core.buffer.Buffer byteArrayBuffer = (io.vertx.core.buffer.Buffer) buffer.getDelegate();

  String response = new String( byteArrayBuffer.getBytes(), StandardCharsets.UTF_8 );
  _logger.trace("Got a job back: {}", response);
  
  _logger.trace( "Forwarding job to requestor at [{}]", deliverTo );

                                           // STEP #5
  vertx.eventBus().<JsonObject>sendObservable( deliverTo, new JsonObject().put("result", jobResponse) )
  .subscribe(
  response -> {
  _logger.trace( "Requestor at [{}] responded with {}",
         deliverTo, response.body().toString() );
  },
  error -> {
                                                                   // STEP #6
  // TODO: handle retries on failure
  _logger.error("Error receiving response from requestor at [{}]", deliverTo);
  }
  );
  },
  error -> {
  _logger.error("Error receiving response", error);
  }
      );

Dávid Karnok

unread,
Oct 26, 2015, 3:38:05 AM10/26/15
to chefho...@gmail.com, RxJava
Hi,

Whenever you feel you have to subscribe to an Observable which itself emits Observables, you should consider using flatMap or concatMap in your operator chain.
--
Best regards,
David Karnok

chefho...@gmail.com

unread,
Oct 26, 2015, 9:23:14 AM10/26/15
to RxJava, chefho...@gmail.com
Hi David,

Thanks for the suggestion. It isn't so much the observables emitting observables I'm having a hard time with - it's needing to take different retry looping actions in response to errors at different points in the sequence. For example, if I were to boil the sequence i'd like to have, it would look something like this:

1) on receipt of message from the root observable, inspect for validity. If invalid, return an error response immediately and end
2) If valid, attempt to establish a connection to remote service (produce a new observable that either emits a socket or an exception). There may be multiple addresses to try. This step should retry indefinitely until a connection is established, waiting a little longer each time before retrying again. Failed connections here produce exceptions, but rather than terminating, I need to bump up the wait time (or move to a different endpoint address) and try the connection again until I get a socket.
3) Once I have a socket, submit a request to the remote service (a new observable that emits either a response or an exception)
4) Based on what happens with the remote service, there are 3 possible responses:
    a) If the connection was dropped (produces an exception), go back to step 2 and loop until I have a socket
    b) If the service responds that there is no work yet, wait a little (a little longer each time) and retry, starting at Step #3
    c) If the service responds with a work item, then forward the item back to the original requestor

What I am currently struggling with from the above sequence is:

i) The looping behavior at step 4a: step 2 seems like I could probably achieve it with the retry() operator, but it's not clear to me how to achieve step 4a, where I effectively need to jump back to step 2 and continue from there. I'm currently looking into retryWhen() for this, but it's not clear to me yet how to achieve the effect of 'starting over' at an earlier point in the chain. I've been thinking I need an observable factory that creates a chain covering steps 2 - 4. When I hit 4a, use retryWhen() and get a new instance from this factory. Does that sound right? Is there a better way?

ii) How two or more different requests can be in process at the same time, with the processing interleaved, rather than running one message to completion before starting on the next message. This is necessary because two different requests could be asking for work items from different sources and one source could have work items while the other doesn't (and won't for some time). The requestor interested in sources that have work should not be held up by requestors asking for work items from idle sources.

I'm not exactly sure how the requirement in (ii) above translates to the construction of this processing chain - all the marble diagrams and examples I've encountered so far show the final emission sequence correlating to the original sequence in time. It seems like maybe having subscriptions earlier in the chain might make this possible - is that correct? Is that a situation where having some nested subscriptions would make sense?

chefho...@gmail.com

unread,
Oct 26, 2015, 8:53:15 PM10/26/15
to RxJava, chefho...@gmail.com
I am starting to come to the conclusion that this particular control flow cannot be achieved through RxJava observable and operator composition.

I suppose it's possible this could be due to a limitation in my understanding of RxJava, but at the moment I'm not seeing how to achieve it.

The first problem appears to be related to the fact that requesting the establishment of a remote connection (this is a raw TCP connection) in Vert.x involves the use of an Observable factory method returning an Observable that will only provide one emission: a socket (if the connection was established) or an error (if the connection couldn't be established). The only way to try again is to produce a new Observable by calling that factory method again.

As far as I can tell, there is no way in RxJava to set up an infinite retry loop that requires the production of a new observable on each retry attempt. The closest fit appears to be retryWhen(), but this operator will only tolerate a single retry attempt on the newly created observable. If the new observable returns an error, retryWhen() will terminate the subscription. So, I can't use that operator, and I can't find any other operator that would provide something like this.

I have an open question for this kind of thing on StackOverflow (see here), in case anybody has any ideas for getting around it.

Maybe it's possible to write my own operator to do what I'm looking to do, but at this point I don't know that I'd want to step into that space given my limited experience so far, and I'm not seeing an answer in the other available operators.

If anybody knows of a technique to achieve this, please post it!

Thanks...
Message has been deleted

Ben Christensen

unread,
Oct 26, 2015, 9:32:20 PM10/26/15
to chefho...@gmail.com, RxJava
Use defer() to make a factory method lazy so each time it is subscribed to it will execute that function. 

Ben Christensen
@benjchristensen
Message has been deleted

chefho...@gmail.com

unread,
Oct 27, 2015, 8:36:14 AM10/27/15
to RxJava, chefho...@gmail.com
Ben - thanks! That's elegant. I was just experimenting with Observable.create() to see if I could achieve something similar, but defer() seems preferable
Reply all
Reply to author
Forward
0 new messages