I don’t think it’s possible with the current API.
You may want to create your own Subject whose inner subject is the PublishSubscribe
Observable<String> o = Observable.<String> create(subscriber -> {
// subscribe to outside events here (including unsubscribe logic)
// https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#create
subscriber.onNext("Hello world!");
subscriber.onCompleted();
})
.filter(t -> {
// your logic here
return true;
})
.publish().refCount();
o.forEach(t -> {
System.out.println("A => " + t);
});
o.forEach(t -> {
System.out.println("B => " + t);
});
Observable<String> o = Observable.<String> create(subscriber -> {
// subscribe to outside events here
// https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#create
subscriber.onNext("Hello world!");
subscriber.onCompleted();
})
.filter(t -> {
// your logic here
return true;
});
o.forEach(t -> {
System.out.println("A => " + t);
});
A “single object which is both the receiver and publisher” is indeed a Subject of which there are several implementations available. What are you trying to achieve? I don’t fully understand your question or use case.Subjects generally are not needed, but instead Observable.create(OnSubscribe) is used, and this has the benefit of controlling lifecycle such as startup and cleanup whereas a Subject doesn’t.PublishSubject should not have transformation inside it. You compose the operators you want.For example:Observable<R> o = subject.map(t -> r)Then everyone consumes from o.Generally the only reason you should be using a Subject is because you need to do multicasting.
Another approach if multicasting is needed is you create the Observable with the source and compose operations and then publish() it like this:Observable<String> o = Observable.<String> create(subscriber -> {
// subscribe to outside events here (including unsubscribe logic)
// https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#create
subscriber.onNext("Hello world!");
subscriber.onCompleted();
})
.filter(t -> {
// your logic here
return true;
})
.publish().refCount();
o.forEach(t -> {
System.out.println("A => " + t);
});
o.forEach(t -> {
System.out.println("B => " + t);
});