Question about RxJava threading behavior

62 views
Skip to first unread message

avi maslati

unread,
Sep 15, 2019, 8:49:27 AM9/15/19
to RxJava
Hi,

I am new to RxJava and I am trying to understand the following threading behavior:

When I run the below program (using RxJava 2.2.12)

public static void main(String[] args) throws Exception {
List<String> items = new ArrayList<>();
items.add("a");
items.add("b");
items.add("c");
items.add("d");

Observable<String> myObservable = Observable.fromIterable(items);

myObservable.
flatMap(s->Observable.just(s+1).subscribeOn(Schedulers.newThread())).
subscribe(s->{System.out.println(Thread.currentThread().getId() + " " + s);});

Thread.sleep(5000);

}

This output is printed:

16 d1
15 c1
14 b1
13 a1

Which implies that the Consumer is being called on a seperate thread for each item and the job is done concurrently.

When Thread.sleep is added to the end/start of the Consumer method e.g. 

s->{System.out.println(Thread.currentThread().getId() + " " + s); Thread.sleep(1000);}

The output is:

15 c1
15 a1
15 b1
15 d1

The order of the items can be changed but the thread id is always the same, and it looks like the job is done sequentially.

I would expect it to behave the same way the first example

Can anyone exaplain this behaviour ?

Thanks a lot
Avi









Dávid Karnok

unread,
Sep 15, 2019, 9:08:47 AM9/15/19
to avi maslati, RxJava
Hi. 

flatMap runs those inner just() calls in parallel and flatMap uses one of the threads to emit the flattened values, but those are always serialized for the consumer. So you may see different thread ids on each call but the calls do not run concurrently. If you add sleep to the consumer, flatMap will buffer up items and the same thread will be servicing the other items as well. 

Use observeOn after flatMap to ensure the values are delivered on a desired thread without hopping.

--
You received this message because you are subscribed to the Google Groups "RxJava" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rxjava+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/rxjava/ac58dc93-3490-481d-ac31-cb6e9fb02173%40googlegroups.com.


--
Best regards,
David Karnok

avi maslati

unread,
Sep 15, 2019, 9:26:10 AM9/15/19
to RxJava

Thanks for making it clear
To unsubscribe from this group and stop receiving emails from it, send an email to rxj...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages