Schedulers.io creates too many threads

1,601 views
Skip to first unread message

kim young ill

unread,
Jul 17, 2018, 4:48:47 PM7/17/18
to RxJava
Hi rx gurus,
what i want to achieve is the following:

- collect a batch of rows from db
- transform to an io-job
- execute this jobs parallel (copying files over network)
- update result of each job back to db
 
when all jobs of those batch finished, make a pause & start from beginning again.

--

this is what the code look like:

---
//tick
    void schedule(){
        Completable job = createCompletable();
        Executors.newSingleThreadScheduledExecutor().schedule(() -> {
            job.subscribeOn(Schedulers.io()).subscribe();
        }, 1l, TimeUnit.SECONDS);
    }

    private Completable createCompletable(){
        return Completable.defer(() -> Completable.fromAction(() -> {
                exec().subscribeWith(new ResourceSubscriber<F>() {
                    @Override
                    public void onNext(F f) {
                        updateDB(f);
                    }

                    @Override
                    public void onError(Throwable t) {
                        onComplete();
                    }

                    @Override
                    public void onComplete() {
                        dispose();
                        schedule();
                    }
                });
        }));
    }

    //execution block itself.
    private Flowable<F> exec(){
        int batchSize = 64;
        int par = 8;
        Flowable<R> rows = selectRows(batchSize);
        return rows.<R>onBackpressureDrop()
                .parallel(par)
                .runOn(Schedulers.io())
                .flatMap(this::toIOJob)
                .map(this::copyContent)
                .sequential();
    }

    /**
     * select data from mysql to create IO jobs.
     */
    abstract Flowable<R> selectRows(int batchSize);
    /**
     * convert mysql row data to IO job.
     */
    abstract Flowable<F> toIOJob(R row);
    /**
     * copy content/data.
     */
    abstract F copyContent(F ioJob);
    /**
     * update mysql after copy finished.
     */
    abstract boolean updateDB(F result);

---

local tests with fake Thread.sleep() to simulate IO-ops (on copy data & update DB) does not expose any problem, but  when running on system with data , after some hours the number of threads created in Schedulers.io grows up until OutOfMemory occurs, am i doing it wrong here ? what is th best practice to moninor/control the IO threads  ? thanx for any advise


currently to temporary fix the problem,i just throw the jobs into a bounded Executor with a Countdownlatch & reschedule it again.

cheers





Dávid Karnok

unread,
Jul 17, 2018, 5:20:01 PM7/17/18
to khi...@googlemail.com, RxJava
Hi.

Schedulers.io() is unbounded but can reuse threads. There is no way to control how many threads it uses. We recommend using Schedulers.from() with a pool size of your chosing. Looks like you are leaking them somehow, for example, with never terminating sequences. Also you are creating a thread with  Executors.newSingleThreadScheduledExecutor() over and over which could also lead to OOME. A thread dump or monitoring tools should be able to tell if you are leaking io() threads or not. Also please check out `Flowable.interval` to create a periodic action and use flatMap instead of subscribing inside actions.
--
Best regards,
David Karnok

kim young ill

unread,
Jul 17, 2018, 6:03:27 PM7/17/18
to RxJava
Hi Karnok,

thanx, it's definitely an IO-thread issues,

there were over 11K threads like this from jstack:

RxCachedThreadScheduler-333" #326388 daemon prio=5 os_prio=0 tid=0x0000000002109000 nid=0x820 waiting on condition [0x00007fb6a600b000]

i don't have any clue about unlimited seq (the flowable are generated from a bounded list), i thought calling dispose explicitly will release the threads but it does not seem to solve prob.

about Flowable.interval: what would be a good way to start an action only when the last fired action really completed (to avoid floading the system when the interval is too small). ?

thanx

Dávid Karnok

unread,
Jul 18, 2018, 3:40:12 AM7/18/18
to kim young ill, RxJava
Does selectRows terminate? How is it implemented?

Interval usage:

Flowable.interval(1, TimeUnit.SECONDS)
.onBackpressureDrop()
.flatMap( time -> asyncActivityThatShouldComplete(), 1)
.subscribe(...)


George Campbell

unread,
Jul 19, 2018, 11:41:00 AM7/19/18
to RxJava
Try Scheduler.when() it was designed for putting bounds on default schedulers.

        int maxWorkers = 8;

        Schedulers.io().when((actions) -> {

            return Completable.merge(Flowable.merge(actions, maxWorkers));

        });

Brice Dutheil

unread,
Jul 24, 2018, 10:31:16 AM7/24/18
to aber...@gmail.com, RxJava

Wow the documentation of when is really hard to understand. @kim young ill you should really understand what you want to limit from the doc, the scheduler that George is suggesting can deadlock if a you Flowable uses the zip operator. But it might be perfect if just used for simple IO actions.

       Scheduler limitScheduler = Schedulers.computation().when(workers -> {
        // use merge max concurrent to limit the number of concurrent
        // Flowables two at a time // understand two worker at a time
        return Completable.merge(Flowable.merge(workers, 2));
       });
       Scheduler limitScheduler = Schedulers.computation().when(workers -> {
        // use merge max concurrent to limit the number of concurrent
        // callbacks two at a time // understand two actions at a time
        return Completable.merge(Flowable.merge(workers), 2);
       });

— Brice

Reply all
Reply to author
Forward
0 new messages