Rxjava and HystrixObservableCommand

564 views
Skip to first unread message

Carey Boldenow

unread,
Dec 31, 2014, 12:31:58 PM12/31/14
to rxj...@googlegroups.com
Hello,

Using the ReactiveLab project, I created my own simple API to test some simple scenarios using RxJava and Hystrix. My intent was to simulate the ability to subscribe to multiple observables in a concurrent manner. To do this, I created a simple request handler that works with three different Hystrix commands (that each sleep for some defined period to simulate latency).  For Test1, while I am calling observe() on each of my commands, I am not extending HystrixObservableCommand in my command classes (see Test1 code below). That all being said when I run this, everything works as I would hope. As you can see below (Test1 Results), all 3 commands were executed on a different thread and called at or about time 0, and each of my subscriptions fired at the expected time (based on their sleep delay). 

For the next test, the only change I made was to have each of my command classes extend HystrixObservableCommand and modified each run() method accordingly to return Observable<String>. Please see Test2 Code below for details. For brevity, I only included Test1Command, but same changes were applied to other command classes as well. When I then ran Test2, I got much different results (see Test2 results below). As you can see, everything ran on one thread and sequentially. I tried various things like using subscribeOn(..) and observeOn(..) on my observables to get them to run on different threads, but always it ran commands on main thread seen below.  I am still trying to grasp the whole Rxjava thing, so I am certain I am missing something obvious here, so any help you folks can provide to help me better understand what is happening under the hood below with Test2 will be greatly appreciated!  

The other question/observation I have is that when I extend HystrixObservableCommand in my command classes, the "timeout" I have defined for each command is ignored and no matter how long I set sleep to, the command will run for that amount of time and finish successfully.

Thanks!
Carey


Test1 Results:
Running Test3Command on Thread: hystrix-test3-2 at time: 0
Running Test2Command on Thread: hystrix-test2-2 at time: 0
Running Test1Command on Thread: hystrix-test1-2 at time: 2
onNext3: Hola Carey at time: 217
onNext2: Ciao Carey at time: 529
onNext1: Hello Carey at time: 935


Test2 Results:
Running Test1Command on Thread: rx-netty-nio-eventloop-3-2 at time: 0
Running Test2Command on Thread: rx-netty-nio-eventloop-3-2 at time: 902
Running Test3Command on Thread: rx-netty-nio-eventloop-3-2 at time: 1404
onNext1: Hello Carey at time: 1604
onNext2: Ciao Carey at time: 1604
onNext3: Hola Carey at time: 1604


Test1 Code:

Request Handler:

public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
   
long startTime = System.currentTimeMillis();
String name = (String)request.getQueryParameters().get("name").get(0);

Observable<String> o1 = new Test1Command(name,startTime).observe();
Observable<String> o2 = new Test2Command(name,startTime).observe();
Observable<String> o3 = new Test3Command(name,startTime).observe();

o1.subscribe(
(incomingValue) -> System.out.println("onNext1: " + incomingValue + " at time: " + (System.currentTimeMillis()-startTime))
);
o2.subscribe(
(incomingValue) -> System.out.println("onNext2: " + incomingValue + " at time: " + (System.currentTimeMillis()-startTime))
);

o3.subscribe(
(incomingValue) -> System.out.println("onNext3: " + incomingValue + " at time: " + (System.currentTimeMillis()-startTime))
);
   
return response.writeStringAndFlush("Finished Test...");
}

Commands:

public class Test1Command extends HystrixCommand  {
private final String name;
private final long startTime;
public Test1Command(String name, long startTime) {
super(HystrixCommandGroupKey.Factory.asKey("test1"));
this.name = name;
this.startTime = startTime;
}
@Override
protected String run() {
System.out.println("Running Test1Command on Thread: " + Thread.currentThread().getName() + " at time: " + (System.currentTimeMillis()-startTime));
try{Thread.sleep(900);}catch(Exception e){} //delay this command for 900ms
return "Hello " + name;
}
}

public class Test2Command extends HystrixCommand {
private final String name;
private final long startTime;
public Test2Command(String name, long startTime) {
super(HystrixCommandGroupKey.Factory.asKey("test2"));
this.name = name;
this.startTime = startTime;
}
@Override
protected String run() {
System.out.println("Running Test2Command on Thread: " + Thread.currentThread().getName() + " at time: " + (System.currentTimeMillis()-startTime));
try{Thread.sleep(500);}catch(Exception e){} //delay this command for 500ms
return "Ciao " + name;
}
}

public class Test3Command extends HystrixCommand {
private final String name;
private final long startTime;
public Test3Command(String name, long startTime) {
super(HystrixCommandGroupKey.Factory.asKey("test3"));
this.name = name;
this.startTime = startTime;
}
@Override
protected String run() {
System.out.println("Running Test3Command on Thread: " + Thread.currentThread().getName() + " at time: " + (System.currentTimeMillis()-startTime));
try{Thread.sleep(200);}catch(Exception e){} //delay this command for 200ms
return "Hola " + name;
}
}

Test2 Code:

public class Test1Command extends HystrixObservableCommand<String>  {
private final String name;
private final long startTime;
public Test1Command(String name, long startTime) {
super(HystrixCommandGroupKey.Factory.asKey("test1"));
this.name = name;
this.startTime = startTime;
}
@Override
protected Observable<String> run() {
System.out.println("Running Test1Command on Thread: " + Thread.currentThread().getName() + " at time: " + (System.currentTimeMillis()-startTime));
try{Thread.sleep(900);}catch(Exception e){} //delay this command for 900ms
return Observable.just("Hello " + name);
}
}


Carey Boldenow

unread,
Jan 6, 2015, 7:37:17 PM1/6/15
to rxj...@googlegroups.com
Let me try this again by basically asking the same question with a more straightforward example :-)  Below is the standard example that Ben uses in his Rxjava presentations, and when I execute this and debug all the events, what I see is that the commands associated to the 2 nested observables highlighted below are executed concurrently. However, the commands associated to the 3 nested observables (e.g. bookmark, rating, metadata) execute sequentially. I know that by default, things should run on the same thread that the "subscribe" was called on, so I can understand why the 3 inner nested observables run sequentially, so I guess I am more interested to better understand why the 2 outer nested observables do run concurrently?  Ideally, what I really want to know is how can I get the 3 inner nested observables to also run concurrently? I tried various Schedulers with both subscribeOn and observeOn, but always the corresponding Hystrix commands run on the main netty nio eventloop thread. I am still trying to get my head around how the reactive stuff works under the hood, so thanks for any help anyone can offer!






Ben Christensen

unread,
Jan 7, 2015, 1:27:34 AM1/7/15
to rxj...@googlegroups.com, Carey Boldenow
Hi Carey, 

This is definitely the more nuanced side of Hystrix and RxJava working together, but it is quite powerful once grokked. I’ll try and explain what’s going on. 

In the ReactiveLab project everything is async using Netty and therefore everything is using HystrixObservableCommand. I’ll restrict all my comments to that context. It is very different if anything is synchronous and/or using HystrixCommand for synchronous IO. 

In the case where all IO is async, then Netty is going to always run the IO on its own event loops. Due to this the IO will always return and perform the callback from the Netty event loops. 

Thus, if you use subscribeOn(Schedulers.io()), you will schedule the work to start on the RxJava IO scheduler, which it will do, but it will then invoke the work on Netty which will schedule it on to the Netty event loops, since Netty always performs the IO on its own event loops. Thus, using subscribeOn for something that is already async is effectively a no-op and just wasted scheduling work. 

Using observeOn() on the other hand will move the data from the Netty event loop to the Scheduler you give to observeOn. This does NOT change where the callback originates from (always Netty event loops) but will move where you handle it. 

For example:

PersonalizedCatalogCommand(user).observe().subscribe(data -> System.out.println(Thread.currentThread()))

This will always print the Netty event loop group since “PersonalizedCatalogCommand” is executing on Netty: https://github.com/Netflix/ReactiveLab/blob/master/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/clients/PersonalizedCatalogCommand.java#L43


PersonalizedCatalogCommand(user).observe()
.subscribeOn(Schedulers.io())
.subscribe(data -> System.out.println(Thread.currentThread()))

Adding subscribeOn here will do something, but it is useless, since it will schedule the subscribe onto Schedulers.io() which will then run the Hystrix command which runs asynchronously on Netty event loops. Thus, the only thing that happens on Schedulers.io() is the execution of the Netty request which immediately returns since it is async. 


PersonalizedCatalogCommand(user).observe()
.doOnNext(data -> System.out.println(Thread.currentThread()))
.observeOn(Schedulers.newThread())
.subscribe(data -> System.out.println(Thread.currentThread()))

This will execute on the Netty event loop, callback on the Netty event loop (which you’ll see with the doOnNext) and then move to a new thread (via Schedulers.newThread()) and you’ll see it output in subscribe on the new thread. 

Hope this helps.

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Carey Boldenow

unread,
Jan 7, 2015, 8:57:09 AM1/7/15
to rxj...@googlegroups.com
Thanks Ben, your response helped a lot! However, I do still have a bit of grokking yet to do in this area, but very much looking forward to it as this style of programming is very cool and powerful :-)


On Wednesday, December 31, 2014 11:31:58 AM UTC-6, Carey Boldenow wrote:
Message has been deleted

Ben Christensen

unread,
Jan 13, 2015, 1:24:51 PM1/13/15
to Carey Boldenow, rxj...@googlegroups.com
The async commands are rare limited (rejected) via semaphore (concurrency limits) instead of the thread pool size. 

Ben Christensen
@benjchristensen

On Jan 13, 2015, at 10:09 AM, Carey Boldenow <carey.b...@gmail.com> wrote:

Hi Ben,

Things are much clearer now as to how this is all working under the hood with RxNetty and my HystrixObservableCommands. However, is it true that we now have lost to some extent the ability to do rate limiting using Hystrix thread pools since those pools seem to be irrelevant given that most, if not all commands are doing I/O and therefore, Netty is running all that on its event loops? Thanks again for your insights!

Regards,
Carey

Carey Boldenow

unread,
Jan 13, 2015, 1:25:32 PM1/13/15
to rxj...@googlegroups.com
Reply all
Reply to author
Forward
Message has been deleted
0 new messages