PublishSubject with an internal map/filter operation

503 views
Skip to first unread message

Marko Topolnik

unread,
Sep 24, 2014, 9:04:40 AM9/24/14
to rxj...@googlegroups.com
I would like to have a single object which is both the receiver of outside events and the publisher of application events. To achieve that I need to transform the incoming raw events into my application-specific ones, and apply some filtering.

I am new to the RxJava API and after a day's worth of reading I still cannot figure out how to achieve the above. I found PublishSubject, which looks like what I need, except that it has no transformation going on inside.

I also know that I can call map on the Subject, but what I get back is just Observable and not a Subject, so I can't have one object to serve as a channel where I put events on one side and they emerge processed on the other side.

How would I go about it? Or should I just use two objects?

-Marko

Brice Dutheil

unread,
Sep 26, 2014, 1:24:49 PM9/26/14
to Marko Topolnik, rxj...@googlegroups.com

I don’t think it’s possible with the current API.

You may want to create your own Subject whose inner subject is the PublishSubscribe


-- Brice

Ben Christensen

unread,
Sep 26, 2014, 2:20:12 PM9/26/14
to Marko Topolnik, rxj...@googlegroups.com
A “single object which is both the receiver and publisher” is indeed a Subject of which there are several implementations available. What are you trying to achieve? I don’t fully understand your question or use case.

Subjects generally are not needed, but instead Observable.create(OnSubscribe) is used, and this has the benefit of controlling lifecycle such as startup and cleanup whereas a Subject doesn’t. 

PublishSubject should not have transformation inside it. You compose the operators you want.

For example:

Observable<R> o = subject.map(t -> r)

Then everyone consumes from o.

Generally the only reason you should be using a Subject is because you need to do multicasting. 

Another approach if multicasting is needed is you create the Observable with the source and compose operations and then publish() it like this:


        Observable<String> o = Observable.<String> create(subscriber -> {

            // subscribe to outside events here (including unsubscribe logic)

            // https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#create

            subscriber.onNext("Hello world!");

            subscriber.onCompleted();

        })

        .filter(t -> {

            // your logic here

            return true;

        })

        .publish().refCount();


        o.forEach(t -> {

            System.out.println("A => " + t);

        });

        o.forEach(t -> {

            System.out.println("B => " + t);

        });



If you don’t need multicasting then just like this:

        Observable<String> o = Observable.<String> create(subscriber -> {

            // subscribe to outside events here

            // https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#create

            subscriber.onNext("Hello world!");

            subscriber.onCompleted();

        })

        .filter(t -> {

            // your logic here

            return true;

        });


        o.forEach(t -> {

            System.out.println("A => " + t);

        });



-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Marko Topolnik

unread,
Sep 26, 2014, 4:21:34 PM9/26/14
to rxj...@googlegroups.com, marko.t...@gmail.com
On Friday, September 26, 2014 8:20:12 PM UTC+2, Ben Christensen wrote:
A “single object which is both the receiver and publisher” is indeed a Subject of which there are several implementations available. What are you trying to achieve? I don’t fully understand your question or use case.

Subjects generally are not needed, but instead Observable.create(OnSubscribe) is used, and this has the benefit of controlling lifecycle such as startup and cleanup whereas a Subject doesn’t. 

PublishSubject should not have transformation inside it. You compose the operators you want.

For example:

Observable<R> o = subject.map(t -> r)

Then everyone consumes from o.

Generally the only reason you should be using a Subject is because you need to do multicasting. 

Multicasting is exactly what I am implementing. I actually have a fully completed project which was done not in Rx, but in an API whose author took Rx (as well as some others) as its inspiration: Lamina by Zach Tellman. I am now in the process of porting that project to RxJava. Lamina's protagonist is a channel, which internally consists of a receiver and an emitter node. These two are a very clean match to the Observer and Observable halves of a Subject. However, in that library the operations on channels result in new channels, and there are many other transformations, such as taking a chain of piped channels and wrapping them into an uberchannel. 

This last thing is what I am looking for: a single object, which i call the "event hub", which has one subscription to the outside event source, and accepts any number of subscriptions from within the application. The subscribers will receive cooked application events. They can also detach at any time.


Another approach if multicasting is needed is you create the Observable with the source and compose operations and then publish() it like this:


        Observable<String> o = Observable.<String> create(subscriber -> {

            // subscribe to outside events here (including unsubscribe logic)

            // https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#create

            subscriber.onNext("Hello world!");

            subscriber.onCompleted();

        })

        .filter(t -> {

            // your logic here

            return true;

        })

        .publish().refCount();


        o.forEach(t -> {

            System.out.println("A => " + t);

        });

        o.forEach(t -> {

            System.out.println("B => " + t);

        });


I am completely fresh to Rx so I'll need a while to take in all the subtleties present here. With my current understanding, this seems to force me to delegate the multicast capability to the outside event source. I'll have multiple subscriptions to that source, one per subscriber which attaches to me (the Observable<String> above). That's usually workable (it probably is in my case), but isn't as general (some event sources may accept only one subscriber).

--
Marko Topolnik

Ben Christensen

unread,
Sep 26, 2014, 4:36:14 PM9/26/14
to Marko Topolnik, rxj...@googlegroups.com
That sounds like .publish() or one of its siblings: https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators

The example I gave before with publish().refCount() may be of interest. 

Here is another example of possible interest: https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf

Ben Christensen
@benjchristensen
Reply all
Reply to author
Forward
0 new messages