Hello,
Using the ReactiveLab project, I created my own simple API to test some simple scenarios using RxJava and Hystrix. My intent was to simulate the ability to subscribe to multiple observables in a concurrent manner. To do this, I created a simple request handler that works with three different Hystrix commands (that each sleep for some defined period to simulate latency). For Test1, while I am calling observe() on each of my commands, I am not extending HystrixObservableCommand in my command classes (see Test1 code below). That all being said when I run this, everything works as I would hope. As you can see below (Test1 Results), all 3 commands were executed on a different thread and called at or about time 0, and each of my subscriptions fired at the expected time (based on their sleep delay).
For the next test, the only change I made was to have each of my command classes extend HystrixObservableCommand and modified each run() method accordingly to return Observable<String>. Please see Test2 Code below for details. For brevity, I only included Test1Command, but same changes were applied to other command classes as well. When I then ran Test2, I got much different results (see Test2 results below). As you can see, everything ran on one thread and sequentially. I tried various things like using subscribeOn(..) and observeOn(..) on my observables to get them to run on different threads, but always it ran commands on main thread seen below. I am still trying to grasp the whole Rxjava thing, so I am certain I am missing something obvious here, so any help you folks can provide to help me better understand what is happening under the hood below with Test2 will be greatly appreciated!
The other question/observation I have is that when I extend HystrixObservableCommand in my command classes, the "timeout" I have defined for each command is ignored and no matter how long I set sleep to, the command will run for that amount of time and finish successfully.
Thanks!
Carey
Test1 Results:
Running Test3Command on Thread: hystrix-test3-2 at time: 0
Running Test2Command on Thread: hystrix-test2-2 at time: 0
Running Test1Command on Thread: hystrix-test1-2 at time: 2
onNext3: Hola Carey at time: 217
onNext2: Ciao Carey at time: 529
onNext1: Hello Carey at time: 935
Test2 Results:
Running Test1Command on Thread: rx-netty-nio-eventloop-3-2 at time: 0
Running Test2Command on Thread: rx-netty-nio-eventloop-3-2 at time: 902
Running Test3Command on Thread: rx-netty-nio-eventloop-3-2 at time: 1404
onNext1: Hello Carey at time: 1604
onNext2: Ciao Carey at time: 1604
onNext3: Hola Carey at time: 1604
Test1 Code:
Request Handler:
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
long startTime = System.currentTimeMillis();
String name = (String)request.getQueryParameters().get("name").get(0);
Observable<String> o1 = new Test1Command(name,startTime).observe();
Observable<String> o2 = new Test2Command(name,startTime).observe();
Observable<String> o3 = new Test3Command(name,startTime).observe();
o1.subscribe(
(incomingValue) -> System.out.println("onNext1: " + incomingValue + " at time: " + (System.currentTimeMillis()-startTime))
);
o2.subscribe(
(incomingValue) -> System.out.println("onNext2: " + incomingValue + " at time: " + (System.currentTimeMillis()-startTime))
);
o3.subscribe(
(incomingValue) -> System.out.println("onNext3: " + incomingValue + " at time: " + (System.currentTimeMillis()-startTime))
);
return response.writeStringAndFlush("Finished Test...");
}
Commands:
public class Test1Command extends HystrixCommand {
private final String name;
private final long startTime;
public Test1Command(String name, long startTime) {
super(HystrixCommandGroupKey.Factory.asKey("test1"));
this.startTime = startTime;
}
@Override
protected String run() {
System.out.println("Running Test1Command on Thread: " + Thread.currentThread().getName() + " at time: " + (System.currentTimeMillis()-startTime));
try{Thread.sleep(900);}catch(Exception e){} //delay this command for 900ms
return "Hello " + name;
}
}
public class Test2Command extends HystrixCommand {
private final String name;
private final long startTime;
public Test2Command(String name, long startTime) {
super(HystrixCommandGroupKey.Factory.asKey("test2"));
this.startTime = startTime;
}
@Override
protected String run() {
System.out.println("Running Test2Command on Thread: " + Thread.currentThread().getName() + " at time: " + (System.currentTimeMillis()-startTime));
try{Thread.sleep(500);}catch(Exception e){} //delay this command for 500ms
return "Ciao " + name;
}
}
public class Test3Command extends HystrixCommand {
private final String name;
private final long startTime;
public Test3Command(String name, long startTime) {
super(HystrixCommandGroupKey.Factory.asKey("test3"));
this.startTime = startTime;
}
@Override
protected String run() {
System.out.println("Running Test3Command on Thread: " + Thread.currentThread().getName() + " at time: " + (System.currentTimeMillis()-startTime));
try{Thread.sleep(200);}catch(Exception e){} //delay this command for 200ms
return "Hola " + name;
}
}
Test2 Code:
public class Test1Command extends HystrixObservableCommand<String> {
private final String name;
private final long startTime;
public Test1Command(String name, long startTime) {
super(HystrixCommandGroupKey.Factory.asKey("test1"));
this.startTime = startTime;
}
@Override
protected Observable<String> run() {
System.out.println("Running Test1Command on Thread: " + Thread.currentThread().getName() + " at time: " + (System.currentTimeMillis()-startTime));
try{Thread.sleep(900);}catch(Exception e){} //delay this command for 900ms
return Observable.just("Hello " + name);
}
}