Trigger only one subscriber per event?

1,133 views
Skip to first unread message

Vachagan Balayan

unread,
Feb 24, 2015, 3:09:37 AM2/24/15
to rxj...@googlegroups.com
I'm trying to implement simple producer consumer example with rx, just so that i get some practical taste of what its like...

So i think i need to declare a subject "produced" to which all consumers will subscribe,
then when a producer produces he emits a result as an subject event and onNext(value) will be executed...

but i need the onNext to be handled by one subscriber at a time... (also do i have to unsubscribe so that same consumer didnt get the job while its doing another?)

Thanks, 
i appreciate any help with this, 
i'm new to RX and want to understand what are best uses of it...

Dávid Karnok

unread,
Feb 24, 2015, 3:36:44 AM2/24/15
to rxj...@googlegroups.com
Hi. Subjects emit their values to subscribers one after another if those subscribers are sequential (no observeOn):

PublishSubject ps = PublishSubject.create();

Subscriber s1 = ...
Subscriber s2 = ...

ps.subscribe(s1);
ps.subscribe(s2);

ps.onNext(1);

// s1 receives 1 first
// s2 receives 1 only after s2 finished processing it

Since RxJava event delivery is sequential, a synchronous PublishSubject won't deliver multiple values to the same subscriber, but a ReplaySubject may send out multiple events to each subscriber if those subscribers need to catch up.

Vachagan Balayan

unread,
Feb 27, 2015, 3:38:56 AM2/27/15
to rxj...@googlegroups.com
Thanks for answer David,

so as far as i understand producer consumer is not a good example to try rx on?
maybe because consumers nature is pull based?

anyway as far as i understand i cannot create an Observable that emits event to one subscriber at a time, 
i guess it is not designed for that and i'm misunderstanding rx approach, and even if there were one that could, i'm not sure how it would know how to balance incoming events to subscribers...

Dávid Karnok

unread,
Feb 27, 2015, 3:46:55 AM2/27/15
to rxj...@googlegroups.com
2015. február 27., péntek 9:38:56 UTC+1 időpontban Vachagan Balayan a következőt írta:
Thanks for answer David,

so as far as i understand producer consumer is not a good example to try rx on?
maybe because consumers nature is pull based?


Rx is a form of Producer-Consumer relationship with dynamic push-pull switch on backpressure.

 
anyway as far as i understand i cannot create an Observable that emits event to one subscriber at a time, 
i guess it is not designed for that and i'm misunderstanding rx approach, and even if there were one that could, i'm not sure how it would know how to balance incoming events to subscribers...

What do you mean by an Observable that emits to one subscriber at a time? I.e., given such Observable, only one Subscriber is allowed to be subscribed to it at once and all other subscription attempts need to be rejected? Or do you mean your observing your source has sideeffects which you don't want to duplicate for each Subscriber? The latter case where publish() and share() can help you.

Vachagan Balayan

unread,
Feb 27, 2015, 3:55:20 AM2/27/15
to rxj...@googlegroups.com
Remember the tipical way we implement producer using synchronization...
when a producer did produced something it called notify() on a lock that was for consumers, which would wake only one consumer that would take handle as soon as it could without bothering other consumers...

what i dont understand how do i do that in rx... 

Samuel Tardieu

unread,
Feb 27, 2015, 3:56:18 AM2/27/15
to Dávid Karnok, rxj...@googlegroups.com

2015-02-27 9:46 GMT+01:00 Dávid Karnok <akar...@gmail.com>:

anyway as far as i understand i cannot create an Observable that emits event to one subscriber at a time, 
i guess it is not designed for that and i'm misunderstanding rx approach, and even if there were one that could, i'm not sure how it would know how to balance incoming events to subscribers...

What do you mean by an Observable that emits to one subscriber at a time? I.e., given such Observable, only one Subscriber is allowed to be subscribed to it at once and all other subscription attempts need to be rejected? Or do you mean your observing your source has sideeffects which you don't want to duplicate for each Subscriber? The latter case where publish() and share() can help you.

I think he might be willing to fan out events to subscribers in a round-robin fashion. Vachagan, is it what you are trying to do?

Vachagan Balayan

unread,
Feb 27, 2015, 4:24:09 AM2/27/15
to rxj...@googlegroups.com, akar...@gmail.com
Sam yes ots a lot more close to what i want to do,  
how this can be done with rx?

Dávid Karnok

unread,
Feb 27, 2015, 5:36:20 AM2/27/15
to rxj...@googlegroups.com, akar...@gmail.com
You need to implement a custom OnSubscribe-based operator for this. However, if you don't know the number of Subscribers up-front, subscribing the first subscriber to a synchronous source will most likely consume the entire sequence before any other subscriber can subscribe. If you know the number, the last subscriber can trigger a subscription to the source and you can round-robin each observed value.

Here is an example which runs with a fixed number of Subscribers (no backpressure support):

Reply all
Reply to author
Forward
0 new messages