Problems using timeout with retry.

407 views
Skip to first unread message

Magnus Persson

unread,
Apr 4, 2014, 8:26:02 AM4/4/14
to rxj...@googlegroups.com
I have a problem when a get a timeout and perform a retry.
The subscription is not unsubscribed until all of the graph is done.
How can i unsubscribe to the items upon the first timeout ?

best regards, Magnus Persson


import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

import java.util.concurrent.TimeUnit;

import static rx.Observable.OnSubscribeFunc;
import static rx.Observable.create;

public class test2 {
    long t0 = System.currentTimeMillis();
    public long getTime() {
        return System.currentTimeMillis() - t0;
    }

    public Observable<Long> getObs(final String name) {
        Observable<Long> g2 = create(new OnSubscribeFunc<Long>() {
            @Override
            public Subscription onSubscribe(final Observer<? super Long> observer) {
                final Thread thread = new Thread() {
                    @Override
                    public void run() {
                        long nr = 0;
                        System.out.println("thread started:" + Thread.currentThread()+" "+name);
                        try {
                            while (true) {
                                Thread.sleep(500);
                                System.out.println("thread " + Thread.currentThread() + name+ " sending:" + nr + " at:" + getTime());
                                observer.onNext(nr++);
                            }
                        } catch (Exception e) {
                            System.out.println("produce-thread got exception:" + e.getMessage());
                            observer.onCompleted();
                        }
                    }
                };
                thread.start();
                return Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        System.out.println("unsubscribe called at:" + getTime());
                        thread.interrupt();
                    }
                });
            }
        });
        return g2;
    }

    Subscription ret;

    public void test99() {
        System.out.println("start");

        Observable<Long> o = getObs("a").timeout(400, TimeUnit.MILLISECONDS).retry(5);
        ret = o.subscribe(new Observer<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onComplete:");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError3:" + e);
            }

            @Override
            public void onNext(Long s) {
                System.out.println("onNext:" + s);
            }
        });
        while (!ret.isUnsubscribed()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("stop");

    }

    public static void main(String[] arg) {
        test2 t = new test2();
        t.test99();
    }
}

execution:


start
thread started:Thread[Thread-0,5,main] a
thread started:Thread[Thread-1,5,main] a
thread Thread[Thread-0,5,main]a sending:0 at:554
thread started:Thread[Thread-2,5,main] a
thread Thread[Thread-1,5,main]a sending:0 at:955
thread Thread[Thread-0,5,main]a sending:1 at:1055
thread started:Thread[Thread-3,5,main] a
thread Thread[Thread-2,5,main]a sending:0 at:1356
thread Thread[Thread-1,5,main]a sending:1 at:1456
thread Thread[Thread-0,5,main]a sending:2 at:1556
thread started:Thread[Thread-4,5,main] a
thread Thread[Thread-3,5,main]a sending:0 at:1757
thread Thread[Thread-2,5,main]a sending:1 at:1857
thread Thread[Thread-1,5,main]a sending:2 at:1957
thread Thread[Thread-0,5,main]a sending:3 at:2056
thread started:Thread[Thread-5,5,main] a
thread Thread[Thread-4,5,main]a sending:0 at:2158
thread Thread[Thread-3,5,main]a sending:1 at:2257
thread Thread[Thread-2,5,main]a sending:2 at:2358
thread Thread[Thread-1,5,main]a sending:3 at:2459
onError3:java.util.concurrent.TimeoutException
unsubscribe called at:2460
unsubscribe called at:2460
unsubscribe called at:2460
unsubscribe called at:2460
produce-thread got exception:sleep interrupted
produce-thread got exception:sleep interrupted
produce-thread got exception:sleep interrupted
produce-thread got exception:sleep interrupted
unsubscribe called at:2460
produce-thread got exception:sleep interrupted
unsubscribe called at:2461
produce-thread got exception:sleep interrupted

Peter McDonnell

unread,
Apr 4, 2014, 11:52:25 AM4/4/14
to rxj...@googlegroups.com
hey magnus,

i noticed this also - seems like the retry() operator doesn't unsubscribe from previous attempts unless it completes. i've created an issue here https://github.com/Netflix/RxJava/issues/1018

-peter

Ben Christensen

unread,
Apr 4, 2014, 1:37:04 PM4/4/14
to Peter McDonnell, rxj...@googlegroups.com

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Magnus Persson

unread,
Apr 7, 2014, 2:11:22 AM4/7/14
to rxj...@googlegroups.com
Thanks for notifying me.
I tried the new code, it did not work fully.
This was the execution-printout:


start
thread started:Thread[Thread-0,5,main] a
unsubscribe called at:455

produce-thread got exception:sleep interrupted
unsubscribe called at:455

thread started:Thread[Thread-1,5,main] a
produce-thread got exception:sleep interrupted
onComplete:

I would have liked the producer-thread to be recreated 5 times.
If there some other way to recreate the whole graph in case of a timeout ?

best regards, Magnus Persson

Peter McDonnell

unread,
Apr 8, 2014, 4:00:55 PM4/8/14
to rxj...@googlegroups.com
hey magnus,

think there is still an issue. i recreated your problem with a slightly different test

- you call observer.onCompleted() when the thread was interrupted, since the subscribing timeout() is already failing this seems unnecessary (or could create a race)
- i switched from using Thread.interrupt to a flag to make it a bit safer

test is here (testTimeoutWithRetry)


there was still an issue with OperatorRetry which causes the retry to fail. i've opened an issue at https://github.com/Netflix/RxJava/issues/1024 

-peter

Magnus Persson

unread,
Apr 9, 2014, 2:53:29 AM4/9/14
to rxj...@googlegroups.com
Hi, thanks for looking into my issue.
I have tried using your fixed code. It seems to solve my problem ! Thanks !
It does not pass the new tests you added (it hangs in the call to await()).

Best regards, Magnus

Peter McDonnell

unread,
Apr 9, 2014, 11:58:43 AM4/9/14
to rxj...@googlegroups.com
Thanks Magnus,

I'm not able to recreate the test failure, do both tests always fail? (testUnsubscribeAfterError and testTimeoutWithRetry) and is it 100% consistent?

-Peter
Reply all
Reply to author
Forward
0 new messages