public AccountResponse getAccount(String accountID, Headers headers) throws IllegalArgumentException {
Client client = getClient();
Response response = client.target(environment.getProperty("account.target"))
.path("v1")
.path("accounts")
.path(accountID)
.request(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON)
.header("Authorization", createAuthorizationHeader(headers))
.get();
try {
if (response.getStatus() == 200) {
grants = response.readEntity(Grants.class);
}
else {
AccountResponse accountResponse = response.readEntity(AccountResponse.class);
accountResponse.setOk(false);
return accountResponse;
}
} finally {
response.close();
}
return accountResponse;
}
public CustomerResponse getCustomer(String accountID, Headers headers) throws IllegalArgumentException {
Client client = getClient();
Response response = client.target(environment.getProperty("customer.target"))
.path("v1")
.path("customer")
.path(customerID)
.request(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON)
.header("Authorization", createAuthorizationHeader(headers))
.get();
try {
if (response.getStatus() == 200) {
grants = response.readEntity(Grants.class);
}
else {
CustomerResponse customerResponse = response.readEntity(CustomerResponse.class);
customerResponse.setOk(false);
return customerResponse;
}
} finally {
response.close();
}
return accountResponse;
}
You can wrap your JaxRS call into an Observable using using for example :
public Observable<AccountResponse> getAccount(String accountID, Headers headers) throws IllegalArgumentException {
return Observable.using(
() -> {
Client client = getClient();
Response response = client.target(environment.getProperty("account.target"))
.path("v1")
.path("accounts")
.path(accountID)
.request(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON)
.header("Authorization", createAuthorizationHeader(headers))
.get();
return response;
},
response -> {
try {
if (response.getStatus() == 200) {
grants = response.readEntity(Grants.class);
return Observable.just(grants);
}
else {
AccountResponse accountResponse = response.readEntity(AccountResponse.class);
accountResponse.setOk(false);
return Observable.just(accountResponse);
}
} catch(Exception ex) {
return Observable.error(ex);
}
}, response -> response.close());
}
Considere to use Single instead of Observable (as you web call will return something, or will fail)
public Observable<String> getServiceA()
{
Observable<String> calories = Observable
.just(RxNetty.createHttpGet("http://webs-internal-qal-qyc.api.intuit.net:80/webs-catalog-offers-rest-service-3/v3/offers/20000798")
.flatMap(response -> response.getContent())
.map((data) -> {
String str = data.toString(Charset.defaultCharset());
System.out.println("Got Service A:" + str);
return str;
}).toBlocking().single());
return calories;
}
public Observable<String> getServiceB()
{
Observable<String> calories = Observable
.just(RxNetty.createHttpGet("http://webs-internal-qal-qyc.api.intuit.net:80/webs-catalog-offers-rest-service-3/v3/offers/20000798")
.flatMap(response -> response.getContent())
.map((data) -> {
String str = data.toString(Charset.defaultCharset());
System.out.println("Got Service B:" + str);
return str;
}).toBlocking().single());
return calories;
}
- There is no subscribe function in the call to get Service A, however it still executes the call to the remote API,am not sure how this happens.
return RxNetty.createHttpGet("http://webs-internal-qal-qyc.api.intuit.net:80/webs-catalog-offers-rest-service-3/v3/offers/20000798")
.flatMap(response -> response.getContent())
.map((data) -> {
String str = data.toString(Charset.defaultCharset());
System.out.println("Got Service A:" + str);
return str;
});
This will now plug into your higher level Rx chain and be called at the appropriate time. So if you call this directly and subscribe it will happen when you subscribe. If this is part of a flapMap it will happen then.
Regarding the exceptions you can either raise an exception and let it bubble up through the Rx chain or (my preferred method), don't let any exceptions leak from here and instead of returning Observable<String>, have a return object that wraps the result of the execution which could be something as simple as setting a 'success' flag and providing the body. The exception is then handled in your API call.
Hope that clears things up for you.
public class RxNettyClientTest2
{
public Observable<String> getServiceA()
{
return RxNetty.createHttpGet("http://webs-internal-qal-qyc.api.intuit.net:80/webs-catalog-offers-rest-service-3/v3/offers/20000798")
.flatMap(response -> {
System.out.println("FlatMapA");
return response.getContent();})
.map((data) -> {
String str = data.toString(Charset.defaultCharset());
System.out.println("Got Service A:" + str);
return str;
});
}
public Observable<String> getServiceB()
{
return RxNetty.createHttpGet("http://webs-internal-qal-qyc.api.intuit.net:80/webs-catalog-offers-rest-service-3/v3/offers/20000798")
.flatMap(response -> response.getContent())
.map((data) -> {
String str = data.toString(Charset.defaultCharset());
System.out.println("Got Service B:" + str);
return str;
});
}
public static void main(String args[])
{
RxNettyClientTest2 obj= new RxNettyClientTest2();
Observable.zip(obj.getServiceA(),obj.getServiceB(),new Func2<String,String,String>(){
@Override
public String call(String a, String b)
{
return a+b;
}
}).subscribe();
}
Observable.zip(obj.getServiceA(),obj.getServiceB(),new Func2<String,String,String>(){
@Override
public String call(String a, String b)
{
return a+b;
}
})
.toBlocking()
.subscribe();
Observable.zip(obj.getServiceA(),obj.getServiceB(),new Func2<String,String,String>(){
@Override
public String call(String a, String b)
{
return a+b;
}
})
.toBlocking()
.subscribe();