Beginner issues...

Skip to first unread message

Gregory Guthrie

May 13, 2021, 10:34:01 PM5/13/21
to RxJava

I am trying to convert a very simple previous Observer/Observed example to Rx.
I have read many of the examples and tutorials, but apparently am missing somethings, as what I have come up with is more complex than the older Java classes, and I cannot get the types to work.
It's very simple;
class Manager implements Observer<Employee>
class Employee implements ObservableSource<Employee>
Main { ...
Manager mgr = new Manager(); // Observer

Employee emp = new Employee(); // Observable
Observable<Employee> worker = Observable.create (emp::subscribe);
worker.subscribe(mgr); // register for observing

... }

But for Employee::subscribe a basic type fails:
    public void subscribe(Observer<Employee> man) { manager = man; }

"'subscribe(Observer<Employee>)' in 'Employee' clashes with 'subscribe(Observer<? super T>)' in 'io.reactivex.rxjava3.core.ObservableSource'; both methods have same erasure, yet neither overrides the other"

So changing it to:
public void subscribe(Observer<? super Employee> man) { manager = man; }
gives another error:
     manager not compatible with required: Observer <capture of ? super Employee>

although Manager is: class Manager implements Observer<Employee>

Also, to create the pair and subscrioption,
    Observable<Employee> worker = Observable.create (emp::subscribe);
    no instance(s) of type variable(s) T exist so that ObservableEmitter<T> conforms to Observer<? super Employee>

  Manager::onSubscribe(Employee) fails,
it has to be
   public void onSubscribe(Disposable e)
I get some vague logic of it, but I haven't linkied Manager & Disposable.

Broken source attached. :-(
Is there a simpler way to model the old Java approach in Rx?
Thanks for any clarification(s).

Dávid Karnok

May 14, 2021, 2:02:06 AM5/14/21
to Gregory Guthrie, RxJava

What was that previous example you try to convert?

You received this message because you are subscribed to the Google Groups "RxJava" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
To view this discussion on the web visit

Best regards,
David Karnok

Dávid Karnok

May 14, 2021, 8:51:32 AM5/14/21
to Gregory Guthrie, Gregory Guthrie, RxJava
I see. For this type of usage, a so-called BehaviorSubject will be needed. It is roughly equivalent to the j.u.Observable.

var employee = BehaviorSubject.<Float>create();
var manager = new DisposableObserver<Float>() {
    @Override public void onNext(Float t) {
        System.out.println( employee.getClass().getName()
             + " changed and new salary is " + t );
    @Override public void onError(Throwable t) {
    @Override public void onComplete() { }



Gregory Guthrie <> ezt írta (időpont: 2021. máj. 14., P, 14:38):

* Simple Obesrver Example
package demos;

* @author guthrie
import java.util.Observable;
import java.util.Observer;

public class ObserverDemo {
     public static void main(String[] args){
       Employee e = new Employee();
       Manager  m = new Manager();
       e.addObserver(m); // register for observing

// ----------------------------------------------------------
class Employee extends Observable {
     float salary;
     float getSalary() {
      return salary;
     public void setSalary(float newSalary){
       salary=newSalary;  // salary has changed
       setChanged();      // mark that this object has changed
       notifyObservers(new Float(salary));  // notify all observers
// ----------------------------------------------------------
class Manager implements Observer {

      public void update(Observable obj, Object arg){  // Ugh - untyped Java inerface
        System.out.println( obj.getClass().getName() + " changed and new salary is " +
             (Float)arg );   //  Boo! Downcast!



Dr. Gregory Guthrie

Dávid Karnok

May 14, 2021, 9:15:41 AM5/14/21
to Gregory Guthrie, Gregory Guthrie, RxJava
Not exactly. create is for emitting values to a single observer when that observer subscribes. The emitter is an abstraction over an observer, not the actual observer.

var o = Observable.<Integer>create(emitter -> {
    System.out.println("A new observer has come");

o.subscribe(v -> System.out.println("First: " + v));
o.subscribe(v -> System.out.println("Second: " + v));

Gregory Guthrie <> ezt írta (időpont: 2021. máj. 14., P, 15:02):

Thanks very much – will try it. I haven’t seen that before.

Most examples I have seen are either creating Observables from fixed structures (lists, …), or with embedded anonymous Observers in a lambda.


But from my readings the .create factory seems like it should be useable as I tried, yes? Having it reference a method which registers the observer for further events.

Gregory Guthrie

May 14, 2021, 3:24:38 PM5/14/21
to RxJava

Thanks. On looking at this, the main difference is that this is using new local anonymous objects, and my example was trying to link two existing external classes (which I only showed as skeletons, but would have a bunch of other logic & methods).

 That is why I was trying to use the create(registration.function) factory and Observer/Observable inheritances, similar to the old Java approach. I want to take two existing classes, and then modify and link them as Observer/Observable.

 Sounds conceptually easy – but as noted I didn’t get it to work (yet!).

I did mention the type errors I am getting - but the type signatures are perplexing to me - that I have seems like it should work with the specifications.



Dávid Karnok

May 14, 2021, 3:40:13 PM5/14/21
to Gregory Guthrie, RxJava
This is wrong:
Observable.create (emp::subscribe);

create requires an ObservableOnSubscribe instance, which is a SAM type with void subscribe(ObservableEmitter<T>). ObservableEmitter is not related to Observer in the type system thus a method of void subscribe(Observer) can't work.

Gregory Guthrie

May 14, 2021, 11:06:15 PM5/14/21
to RxJava
Ah; thanks - I get that. I had missed the proper SAM conversion type of the lambda for the argument to create().

At the higher level - is there a simple way in the main to bind preexisting Employee/Manger objects - which functionally are the Observed/Observer - similar to the previous Java approach?

I realize I had to add some new methods to each to now inherit from the changed Observer/Observable interface/classes (if those are indeed the proper derivations now).

Dávid Karnok

May 15, 2021, 3:09:00 AM5/15/21
to Gregory Guthrie, RxJava
I'm not sure what you mean by bind. If you want the observer to know who sent the value, you have to create a composite type:

record EmployeeSalary {
   Employee emp;
   float salary;

class Employee {
    BehaviorSubject<EmployeeSalary> onSalaryChange = BehaviorSubject.create();

    Observable<EmployeeSalary> salaryObservable() {
        return onSalaryChange;
    void setSalary(float newSalary) {
        onSalaryChange.onNext(new EmployeeSalary(this, newSalary));

class Manager {
    Map<Employee, Disposable> employees = new HashMap<>();

    void hire(Employee emp) {
        emp.salaryObservable().subscribe(new Observer<EmployeeSalary>() {
            @Override public void onSubscribe(Disposable d) {
                 employees.put(emp, d);
            @Override public void onNext(EmployeeSalary t) {
             @Override public void onError(Throwable t) { }
             @Override public void onComplete() { }

    void onSalaryChange(EmployeeSalary newSalary) {
        System.out.println("Employee " + newSalary.emp + " wants a salary of " + newSalary.salary);
        if (newSalary.salary > 100) {
            System.out.println("  too much, fire the employee!");

var emp1 = new Employee();
var emp2 = new Employee();

var manager = new Manager();



Gregory Guthrie

May 15, 2021, 8:53:03 AM5/15/21
to RxJava
Very interesting and helpful - thank you.

This made the one attribute (salary) observable, instead of an Observer/Observable relation between Manager & Employee.
You've clarified several things for me, thanks for the time and help.

Gregory Guthrie

May 16, 2021, 1:06:23 PM5/16/21
to RxJava
While I recognize that Rx provides a richer set of features and capabilities, for something simple like this it is over twice as complex as the basic Java previous Observer/Observed model.

Rx Comparison.png
Reply all
Reply to author
0 new messages