Applying RxJava on REST Clients

1,451 views
Skip to first unread message

Rajkumar Ramakrishnan

unread,
Apr 6, 2016, 10:04:28 PM4/6/16
to RxJava
I have written jersey clients using the JAXRS 2.0 to execute the REST API's  and would like to replace them using RxJava and also would like to execute them in parallel.

I am struggling a bit as how to apply the RX Java API's on my client functions.

Example I have two functions getCustomer and getAccount , each of these functions makes a call to get the Rest Client and get the JSON Response.

How do i convert these functions using RX Java. 

Example for one of client function

public AccountResponse getAccount(String accountID, Headers headers) throws IllegalArgumentException {
Client client = getClient();
Response response = client.target(environment.getProperty("account.target"))
.path("v1")
.path("accounts")
.path(accountID)
.request(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON)
.header("Authorization", createAuthorizationHeader(headers))
.get();

try {
if (response.getStatus() == 200) {
grants = response.readEntity(Grants.class);
}
else {
AccountResponse accountResponse = response.readEntity(AccountResponse.class);
accountResponse.setOk(false);
return accountResponse;
}
} finally {
response.close();
}

return accountResponse;
}


public CustomerResponse getCustomer(String accountID, Headers headers) throws IllegalArgumentException {
Client client = getClient();
Response response = client.target(environment.getProperty("customer.target"))
.path("v1")
.path("customer")
.path(customerID)
.request(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON)
.header("Authorization", createAuthorizationHeader(headers))
.get();

try {
if (response.getStatus() == 200) {
grants = response.readEntity(Grants.class);
}
else {
CustomerResponse customerResponse = response.readEntity(CustomerResponse.class);
customerResponse.setOk(false);
return customerResponse;
}
} finally {
response.close();
}

return accountResponse;
}




Appreciate any guidance on how can I convert these functions to RX Java Observables and how can I execute them in parallel.

Colin Vipurs

unread,
Apr 7, 2016, 4:50:22 AM4/7/16
to Rajkumar Ramakrishnan, RxJava
Is moving to RxNetty an option? You'll get a lot of you want "for free" then.
--
Maybe she awoke to see the roommate's boyfriend swinging from the chandelier wearing a boar's head.

Something which you, I, and everyone else would call "Tuesday", of course.

Rajkumar Ramakrishnan

unread,
Apr 7, 2016, 7:19:02 AM4/7/16
to RxJava
HI Colin,

Appreciate your response. All our rest API's are currently deployed in the tomcat container, so the move to netty might be a big jump for now , although I would consider the move somewhat later due to it non-blocking capabilities and superior performance.

For now I would prefer to move these functions to RxObservables and still rely on the underlying jersey stack.

Please let me know if you can offer some guidance on this.


Thanks

Raj.

david

unread,
Apr 8, 2016, 4:27:29 AM4/8/16
to RxJava

You can wrap your JaxRS call into an Observable using using for example :


public Observable<AccountResponse> getAccount(String accountID, Headers headers) throws IllegalArgumentException {

    return Observable.using(

        () -> {

             Client client = getClient();
           Response response = client.target(environment.getProperty("account.target"))
              .path("v1")
              .path("accounts")
              .path(accountID)
              .request(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON)
              .header("Authorization", createAuthorizationHeader(headers))
              .get();

           return response;
    },

    response -> {
    try {
        if (response.getStatus() == 200) {
            grants = response.readEntity(Grants.class);

           return Observable.just(grants);
        }
        else {
            AccountResponse accountResponse = response.readEntity(AccountResponse.class);
            accountResponse.setOk(false);
            return Observable.just(accountResponse);
        }

   } catch(Exception ex) {

        return Observable.error(ex); 

}

}, response -> response.close());

}

Considere to use Single instead of Observable (as you web call will return something, or will fail)

Colin Vipurs

unread,
Apr 8, 2016, 5:20:02 AM4/8/16
to Rajkumar Ramakrishnan, RxJava
I don't mean move your entire stack, you can just use the RxNetty HTTP client.  There are examples here: https://github.com/allenxwang/RxNetty-1/tree/master/rx-netty-examples.

For your code it could be something as simple as:

public Account getAccount(String accountId, Headers headers) {
    HttpClientRequest request = HttpClientRequest.createGet(path).withHeader(yourHeaders);

    return rxNettyClient.submit(request)
        .map(yourBusinessLogic());
}


Rajkumar Ramakrishnan

unread,
Apr 8, 2016, 3:04:32 PM4/8/16
to RxJava
Hi,


I have a couple of other questions.

Since I am trying to execute the calls in parallel for example getCustomer and getAccount, when does the actual call to the REST API happens.
I cant see any subscribe function here, so I am kind of lost when the execution happens.

I am trying to understand the call chain and the call sequence.

Thanks

Raj.

Rajkumar Ramakrishnan

unread,
Apr 8, 2016, 11:01:42 PM4/8/16
to RxJava

These are the two functions that I was able to write using Observables and Rx Netty Clients.

public Observable<String> getServiceA()
{
Observable<String> calories = Observable
.just(RxNetty.createHttpGet("http://webs-internal-qal-qyc.api.intuit.net:80/webs-catalog-offers-rest-service-3/v3/offers/20000798")
.flatMap(response -> response.getContent())
.map((data) -> {
String str = data.toString(Charset.defaultCharset());

System.out.println("Got Service A:" + str);
return str;

}).toBlocking().single());

return calories;
}


public Observable<String> getServiceB()
{
Observable<String> calories = Observable
.just(RxNetty.createHttpGet("http://webs-internal-qal-qyc.api.intuit.net:80/webs-catalog-offers-rest-service-3/v3/offers/20000798")
.flatMap(response -> response.getContent())
.map((data) -> {
String str = data.toString(Charset.defaultCharset());

System.out.println("Got Service B:" + str);
return str;

}).toBlocking().single());

return calories;
}


Clarifications

1.When does the call to the remote API get executed? Seems like the createHttpGet returns an observable, but I am not sure when this call actually gets executed.
  • There is no subscribe function in the call to get Service A, however it still executes the call to the remote API,am not sure how this happens.
 
2.Do I need to create another observable and subscribe  to execute these calls to services A and B in parallel?
  • Do the API's get executed only when I subscribe or they are execute when the call to createHttpGet happens in the getServiceA method.

3.Why do I need to have the call as toBlocking().single()?
4.How do I throw an exception with the right status code and error message in case the call to get ServiceA fails.

Appreciate your guidance on this. I want to understand some fundamental behaviors of how this works.

Thanks

Rajk



On Wednesday, 6 April 2016 19:04:28 UTC-7, Rajkumar Ramakrishnan wrote:

Colin Vipurs

unread,
Apr 11, 2016, 5:23:32 AM4/11/16
to Rajkumar Ramakrishnan, RxJava
The API call is made when you subscribe to the Rx chain created by createHttpGet.   In the code you have now the API call will be made when you call getServiceA() or getServiceB(), and because you're using toBlocking() this will _not_ happen asynchronously.

What you'll want to do is something like this:


.flatMap(response -> response.getContent())
.map((data) -> {
String str = data.toString(Charset.defaultCharset());

System.out.println("Got Service A:" + str);
return str;

            });
This will now plug into your higher level Rx chain and be called at the appropriate time.   So if you call this directly and subscribe it will happen when you subscribe.  If this is part of a flapMap it will happen then.

Regarding the exceptions you can either raise an exception and let it bubble up through the Rx chain or (my preferred method), don't let any exceptions leak from here and instead of returning Observable<String>, have a return object that wraps the result of the execution which could be something as simple as setting a 'success' flag and providing the body.   The exception is then handled in your API call.

Hope that clears things up for you.

Rajkumar Ramakrishnan

unread,
Apr 13, 2016, 3:12:57 AM4/13/16
to RxJava
Hi Colin,

Are you saying if I modify my functions to return observables like how you had mentioned and then if I zip these observables and subscribe it will call getServiceA and getServiceB?

Can you also throw some light on your statement "If this is part of a flapMap it will happen then."

I changed my code as per your suggestions but now I don't see the calls to service A and B getting fired when I call subscribe on the higher order Observable.

public class RxNettyClientTest2
{

public Observable<String> getServiceA()
{
                    .flatMap(response -> {
System.out.println("FlatMapA");
return response.getContent();})

.map((data) -> {
String str = data.toString(Charset.defaultCharset());

System.out.println("Got Service A:" + str);
return str;

});
}


        public Observable<String> getServiceB()
{

return RxNetty.createHttpGet("http://webs-internal-qal-qyc.api.intuit.net:80/webs-catalog-offers-rest-service-3/v3/offers/20000798")
.flatMap(response -> response.getContent())
.map((data) -> {
String str = data.toString(Charset.defaultCharset());

                        System.out.println("Got Service B:" + str);
return str;

                    });
}

public static void main(String args[])
{
RxNettyClientTest2 obj= new RxNettyClientTest2();

Observable.zip(obj.getServiceA(),obj.getServiceB(),new Func2<String,String,String>(){
@Override
public String call(String a, String b)
{
return a+b;
}
}).subscribe();
}



On Wednesday, 6 April 2016 19:04:28 UTC-7, Rajkumar Ramakrishnan wrote:

Colin Vipurs

unread,
Apr 13, 2016, 4:51:03 AM4/13/16
to Rajkumar Ramakrishnan, RxJava
Your code is fine, the problem is that the calls to service A and service B are now async but you don't wait for the results anywhere.   As this is run in a main method the Rx chain is kicked off but the main method exits straight away.

One way to solve this is to make your top level Observable blocking like this:

Observable.zip(obj.getServiceA(),obj.getServiceB(),new Func2<String,String,String>(){
@Override
public String call(String a, String b)
{
return a+b;
}
})
            .toBlocking()
            .subscribe();

Rajkumar Ramakrishnan

unread,
Apr 19, 2016, 3:19:33 AM4/19/16
to RxJava
Hi Colin,

The below mentioned code does not compile when you call .subscribe but works when I call .single.

Observable.zip(obj.getServiceA(),obj.getServiceB(),new Func2<String,String,String>(){
@Override
public String call(String a, String b)
{
return a+b;
}
})
.toBlocking()
.subscribe();

On Wednesday, 6 April 2016 19:04:28 UTC-7, Rajkumar Ramakrishnan wrote:
Reply all
Reply to author
Forward
0 new messages