Merging PublishSubjects

1,017 views
Skip to first unread message

Tod Antilla

unread,
Oct 10, 2014, 8:30:51 PM10/10/14
to rxj...@googlegroups.com
Hello,

Just getting started.

Is there a way to achieve something along the lines of the following?

        PublishSubject<Integer> p1 = PublishSubject.create();
        PublishSubject<Integer> p2 = PublishSubject.create();
        PublishSubject<Integer> p3 = PublishSubject.merge(p1, p2);

        p3.subscribe(i -> System.out.println(i));    
        
        p1.onNext(1);
        p2.onNext(2);
        p3.onNext(3);
        
        //prints 1, 2, 3

Thanks in advance,
Tod



Ben Christensen

unread,
Oct 10, 2014, 8:33:39 PM10/10/14
to Tod Antilla, rxj...@googlegroups.com
Subjects are Observables so you can call Observable.merge(p1, p2).

Ben Christensen
310.782.5511
@benjchristensen

Tod Antilla

unread,
Oct 10, 2014, 10:09:39 PM10/10/14
to rxj...@googlegroups.com, tod.a...@gmail.com
I get a similar error with Observable.merge as with PublishSubject.merge:

 incompatible types: no instance(s) of type variable(s) T exist so that rx.Observable<T> conforms to rx.subjects.PublishSubject<java.lang.Integer>

To give some context, its important that if the result of merge isn't a PublishSubject it should be capable of becoming such, as I am accumulating child events for an event bus API.

Ben Christensen

unread,
Oct 10, 2014, 10:21:10 PM10/10/14
to Tod Antilla, rxj...@googlegroups.com
What exactly is your code that gets that?

Take a look at this for EventBus behavior: https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf

Ben Christensen
@benjchristensen

Ben Christensen

unread,
Oct 10, 2014, 10:45:54 PM10/10/14
to Tod Antilla, rxj...@googlegroups.com
And further merging of streams:

import rx.Observable;

import rx.subjects.PublishSubject;



public class Subjects {


    public static void main(String[] args) {

        PublishSubject<String> p1 = PublishSubject.create();

        PublishSubject<String> p2 = PublishSubject.create();

        PublishSubject<String> p3 = PublishSubject.create();

        

        Observable<String> stream = Observable.merge(p1, p2, p3);


        stream.subscribe(i -> System.out.println("ONE => " + i));    

        

        p1.onNext("1a");

        p2.onNext("2a");

        p3.onNext("3a");

        

        

        PublishSubject<String> p4 = PublishSubject.create();

        

        Observable.merge(stream, p4).subscribe(i -> System.out.println("  TWO => " + i));

        

        p4.onNext("4a");

        p4.onNext("4b");

        

        p1.onNext("1b");

        p2.onNext("2b");

        p3.onNext("3b");

    }

}


ONE => 1a

ONE => 2a

ONE => 3a

  TWO => 4a

  TWO => 4b

ONE => 1b

  TWO => 1b

ONE => 2b

  TWO => 2b

ONE => 3b

  TWO => 3b




-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

On October 10, 2014 at 7:42:22 PM, Ben Christensen (benjchr...@gmail.com) wrote:

Here is merging 3 Subjects together into a stream. 


    public static void main(String[] args) {

        PublishSubject<Integer> p1 = PublishSubject.create();

        PublishSubject<Integer> p2 = PublishSubject.create();

        PublishSubject<Integer> p3 = PublishSubject.create();

        

        Observable<Integer> stream = Observable.merge(p1, p2, p3);


        stream.subscribe(i -> System.out.println(i));    

        

        p1.onNext(1);

        p2.onNext(2);

        p3.onNext(3);

    }



Note that the merged result is an Observable and not a Subject. 

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

On October 10, 2014 at 7:40:06 PM, Tod Antilla (tod.a...@gmail.com) wrote:

Thats a nice wrapper around a single PublishSubject.
I  need the the equivalent of merging several of those event buses, the salient point being the result of the merge must continue to be capable of publishing new events.

If its possible to get something like the simple example at the beginning of the post to work, its possible with a more complex implementation. I can do it from entirely from scratch, but it seems odd that the merges that are possible with pure Observers are not possible with PublishSubject.

More likely spending most of my time as a javascript front end developer is keeping me from seeing the basis of the compiler error and how to work around it.




Ben Christensen

unread,
Oct 10, 2014, 10:47:59 PM10/10/14
to Tod Antilla, rxj...@googlegroups.com
You don’t, that’s not how Observables work.

Take a look at the second example I sent for merging the stream again with another Subject if you want to be able to emit to a stream along with the events from other streams:

PublishSubject<String> p4 = PublishSubject.create();

        

Observable.merge(streamp4).subscribe(i -> System.out.println("  TWO => " + i));


This now has all output from `stream` and gives you another PublishSubject to emit to.


-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

On October 10, 2014 at 7:45:50 PM, Tod Antilla (tod.a...@gmail.com) wrote:

Yep. I tried that before I originally posted, but could not find in the docs on how to convert the result to a PublishSubject.

On Fri, Oct 10, 2014 at 7:42 PM, Ben Christensen <benjchr...@gmail.com> wrote:
Here is merging 3 Subjects together into a stream. 


    public static void main(String[] args) {

        PublishSubject<Integer> p1 = PublishSubject.create();

        PublishSubject<Integer> p2 = PublishSubject.create();

        PublishSubject<Integer> p3 = PublishSubject.create();

        

        Observable<Integer> stream = Observable.merge(p1, p2, p3);


        stream.subscribe(i -> System.out.println(i));    

        

        p1.onNext(1);

        p2.onNext(2);

        p3.onNext(3);

    }



Note that the merged result is an Observable and not a Subject. 

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

On October 10, 2014 at 7:40:06 PM, Tod Antilla (tod.a...@gmail.com) wrote:

Thats a nice wrapper around a single PublishSubject.
I  need the the equivalent of merging several of those event buses, the salient point being the result of the merge must continue to be capable of publishing new events.

If its possible to get something like the simple example at the beginning of the post to work, its possible with a more complex implementation. I can do it from entirely from scratch, but it seems odd that the merges that are possible with pure Observers are not possible with PublishSubject.

More likely spending most of my time as a javascript front end developer is keeping me from seeing the basis of the compiler error and how to work around it.




On Fri, Oct 10, 2014 at 7:21 PM, Ben Christensen <benjchr...@gmail.com> wrote:
Reply all
Reply to author
Forward
0 new messages