I have a problem when a get a timeout and perform a retry.
The subscription is not unsubscribed until all of the graph is done.
How can i unsubscribe to the items upon the first timeout ?
best regards, Magnus Persson
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import java.util.concurrent.TimeUnit;
import static rx.Observable.OnSubscribeFunc;
import static rx.Observable.create;
public class test2 {
long t0 = System.currentTimeMillis();
public long getTime() {
return System.currentTimeMillis() - t0;
}
public Observable<Long> getObs(final String name) {
Observable<Long> g2 = create(new OnSubscribeFunc<Long>() {
@Override
public Subscription onSubscribe(final Observer<? super Long> observer) {
final Thread thread = new Thread() {
@Override
public void run() {
long nr = 0;
System.out.println("thread started:" + Thread.currentThread()+" "+name);
try {
while (true) {
Thread.sleep(500);
System.out.println("thread " + Thread.currentThread() + name+ " sending:" + nr + " at:" + getTime());
observer.onNext(nr++);
}
} catch (Exception e) {
System.out.println("produce-thread got exception:" + e.getMessage());
observer.onCompleted();
}
}
};
thread.start();
return Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("unsubscribe called at:" + getTime());
thread.interrupt();
}
});
}
});
return g2;
}
Subscription ret;
public void test99() {
System.out.println("start");
Observable<Long> o = getObs("a").timeout(400, TimeUnit.MILLISECONDS).retry(5);
ret = o.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
System.out.println("onComplete:");
}
@Override
public void onError(Throwable e) {
System.out.println("onError3:" + e);
}
@Override
public void onNext(Long s) {
System.out.println("onNext:" + s);
}
});
while (!ret.isUnsubscribed()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("stop");
}
public static void main(String[] arg) {
test2 t = new test2();
t.test99();
}
}
execution:
start
thread started:Thread[Thread-0,5,main] a
thread started:Thread[Thread-1,5,main] a
thread Thread[Thread-0,5,main]a sending:0 at:554
thread started:Thread[Thread-2,5,main] a
thread Thread[Thread-1,5,main]a sending:0 at:955
thread Thread[Thread-0,5,main]a sending:1 at:1055
thread started:Thread[Thread-3,5,main] a
thread Thread[Thread-2,5,main]a sending:0 at:1356
thread Thread[Thread-1,5,main]a sending:1 at:1456
thread Thread[Thread-0,5,main]a sending:2 at:1556
thread started:Thread[Thread-4,5,main] a
thread Thread[Thread-3,5,main]a sending:0 at:1757
thread Thread[Thread-2,5,main]a sending:1 at:1857
thread Thread[Thread-1,5,main]a sending:2 at:1957
thread Thread[Thread-0,5,main]a sending:3 at:2056
thread started:Thread[Thread-5,5,main] a
thread Thread[Thread-4,5,main]a sending:0 at:2158
thread Thread[Thread-3,5,main]a sending:1 at:2257
thread Thread[Thread-2,5,main]a sending:2 at:2358
thread Thread[Thread-1,5,main]a sending:3 at:2459
onError3:java.util.concurrent.TimeoutException
unsubscribe called at:2460
unsubscribe called at:2460
unsubscribe called at:2460
unsubscribe called at:2460
produce-thread got exception:sleep interrupted
produce-thread got exception:sleep interrupted
produce-thread got exception:sleep interrupted
produce-thread got exception:sleep interrupted
unsubscribe called at:2460
produce-thread got exception:sleep interrupted
unsubscribe called at:2461
produce-thread got exception:sleep interrupted