Beginner issues...

80 views
Skip to first unread message

Gregory Guthrie

unread,
May 13, 2021, 10:34:01 PM5/13/21
to RxJava

I am trying to convert a very simple previous Observer/Observed example to Rx.
I have read many of the examples and tutorials, but apparently am missing somethings, as what I have come up with is more complex than the older Java classes, and I cannot get the types to work.
It's very simple;
class Manager implements Observer<Employee>
class Employee implements ObservableSource<Employee>
Main { ...
Manager mgr = new Manager(); // Observer

Employee emp = new Employee(); // Observable
Observable<Employee> worker = Observable.create (emp::subscribe);
worker.subscribe(mgr); // register for observing

emp.setSalary(100.0f);
... }

But for Employee::subscribe a basic type fails:
    public void subscribe(Observer<Employee> man) { manager = man; }

"'subscribe(Observer<Employee>)' in 'Employee' clashes with 'subscribe(Observer<? super T>)' in 'io.reactivex.rxjava3.core.ObservableSource'; both methods have same erasure, yet neither overrides the other"

So changing it to:
public void subscribe(Observer<? super Employee> man) { manager = man; }
gives another error:
     manager not compatible with required: Observer <capture of ? super Employee>

although Manager is: class Manager implements Observer<Employee>

Also, to create the pair and subscrioption,
    Observable<Employee> worker = Observable.create (emp::subscribe);
fails:
    no instance(s) of type variable(s) T exist so that ObservableEmitter<T> conforms to Observer<? super Employee>

Also;
  Manager::onSubscribe(Employee) fails,
it has to be
   public void onSubscribe(Disposable e)
I get some vague logic of it, but I haven't linkied Manager & Disposable.

Broken source attached. :-(
Is there a simpler way to model the old Java approach in Rx?
Thanks for any clarification(s).

ObserverDemo.java

Dávid Karnok

unread,
May 14, 2021, 2:02:06 AM5/14/21
to Gregory Guthrie, RxJava
Hi.

What was that previous example you try to convert?

--
You received this message because you are subscribed to the Google Groups "RxJava" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rxjava+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/rxjava/c42c89a3-3b41-419a-af5c-4fba4ca0c19bn%40googlegroups.com.


--
Best regards,
David Karnok

Dávid Karnok

unread,
May 14, 2021, 8:51:32 AM5/14/21
to Gregory Guthrie, Gregory Guthrie, RxJava
I see. For this type of usage, a so-called BehaviorSubject will be needed. It is roughly equivalent to the j.u.Observable.

var employee = BehaviorSubject.<Float>create();
var manager = new DisposableObserver<Float>() {
    @Override public void onNext(Float t) {
        System.out.println( employee.getClass().getName()
             + " changed and new salary is " + t );
    }
    @Override public void onError(Throwable t) {
    }
    @Override public void onComplete() { }
};

employee.subscribe(manager);

employee.onNext(100f);


Gregory Guthrie <gut...@miu.edu> ezt írta (időpont: 2021. máj. 14., P, 14:38):

/*
* Simple Obesrver Example
*/
package demos;

/**
* @author guthrie
*/
import java.util.Observable;
import java.util.Observer;

public class ObserverDemo {
     public static void main(String[] args){
       Employee e = new Employee();
       Manager  m = new Manager();
       e.addObserver(m); // register for observing

       e.setSalary(100.0f);
     }
   }
// ----------------------------------------------------------
class Employee extends Observable {
     float salary;
     float getSalary() {
      return salary;
     }
     public void setSalary(float newSalary){
       salary=newSalary;  // salary has changed
       setChanged();      // mark that this object has changed
       notifyObservers(new Float(salary));  // notify all observers
     }
   }
// ----------------------------------------------------------
class Manager implements Observer {

      public void update(Observable obj, Object arg){  // Ugh - untyped Java inerface
        System.out.println( obj.getClass().getName() + " changed and new salary is " +
             (Float)arg );   //  Boo! Downcast!
      }
   }

 

 

Dr. Gregory Guthrie

Dávid Karnok

unread,
May 14, 2021, 9:15:41 AM5/14/21
to Gregory Guthrie, Gregory Guthrie, RxJava
Not exactly. create is for emitting values to a single observer when that observer subscribes. The emitter is an abstraction over an observer, not the actual observer.

var o = Observable.<Integer>create(emitter -> {
    System.out.println("A new observer has come");
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
});

o.subscribe(v -> System.out.println("First: " + v));
o.subscribe(v -> System.out.println("Second: " + v));



Gregory Guthrie <gut...@miu.edu> ezt írta (időpont: 2021. máj. 14., P, 15:02):

Thanks very much – will try it. I haven’t seen that before.

Most examples I have seen are either creating Observables from fixed structures (lists, …), or with embedded anonymous Observers in a lambda.

 

But from my readings the .create factory seems like it should be useable as I tried, yes? Having it reference a method which registers the observer for further events.

Gregory Guthrie

unread,
May 14, 2021, 3:24:38 PM5/14/21
to RxJava

Thanks. On looking at this, the main difference is that this is using new local anonymous objects, and my example was trying to link two existing external classes (which I only showed as skeletons, but would have a bunch of other logic & methods).

 That is why I was trying to use the create(registration.function) factory and Observer/Observable inheritances, similar to the old Java approach. I want to take two existing classes, and then modify and link them as Observer/Observable.

 Sounds conceptually easy – but as noted I didn’t get it to work (yet!).

I did mention the type errors I am getting - but the type signatures are perplexing to me - that I have seems like it should work with the specifications.

 Thanks,

Gregory

Dávid Karnok

unread,
May 14, 2021, 3:40:13 PM5/14/21
to Gregory Guthrie, RxJava
This is wrong:
Observable.create (emp::subscribe);

create requires an ObservableOnSubscribe instance, which is a SAM type with void subscribe(ObservableEmitter<T>). ObservableEmitter is not related to Observer in the type system thus a method of void subscribe(Observer) can't work.

Gregory Guthrie

unread,
May 14, 2021, 11:06:15 PM5/14/21
to RxJava
Ah; thanks - I get that. I had missed the proper SAM conversion type of the lambda for the argument to create().

At the higher level - is there a simple way in the main to bind preexisting Employee/Manger objects - which functionally are the Observed/Observer - similar to the previous Java approach?

I realize I had to add some new methods to each to now inherit from the changed Observer/Observable interface/classes (if those are indeed the proper derivations now).

Dávid Karnok

unread,
May 15, 2021, 3:09:00 AM5/15/21
to Gregory Guthrie, RxJava
I'm not sure what you mean by bind. If you want the observer to know who sent the value, you have to create a composite type:

record EmployeeSalary {
   Employee emp;
   float salary;
}

class Employee {
    BehaviorSubject<EmployeeSalary> onSalaryChange = BehaviorSubject.create();

    Observable<EmployeeSalary> salaryObservable() {
        return onSalaryChange;
    }
    void setSalary(float newSalary) {
        onSalaryChange.onNext(new EmployeeSalary(this, newSalary));
    }
}

class Manager {
    Map<Employee, Disposable> employees = new HashMap<>();

    void hire(Employee emp) {
        emp.salaryObservable().subscribe(new Observer<EmployeeSalary>() {
            @Override public void onSubscribe(Disposable d) {
                 employees.put(emp, d);
             }
            @Override public void onNext(EmployeeSalary t) {
                onSalaryChange(t);
             }
             @Override public void onError(Throwable t) { }
             @Override public void onComplete() { }
        });
    }

    void onSalaryChange(EmployeeSalary newSalary) {
        System.out.println("Employee " + newSalary.emp + " wants a salary of " + newSalary.salary);
        if (newSalary.salary > 100) {
            System.out.println("  too much, fire the employee!");
            employees.remove(newSalary.emp).dispose();
        }
    }
}

var emp1 = new Employee();
var emp2 = new Employee();

var manager = new Manager();

manager.hire(emp1);
manager.hire(emp2);

emp1.setSalary(50);
emp2.setSalary(150);


Gregory Guthrie

unread,
May 15, 2021, 8:53:03 AM5/15/21
to RxJava
Very interesting and helpful - thank you.

This made the one attribute (salary) observable, instead of an Observer/Observable relation between Manager & Employee.
You've clarified several things for me, thanks for the time and help.

Gregory Guthrie

unread,
May 16, 2021, 1:06:23 PM5/16/21
to RxJava
While I recognize that Rx provides a richer set of features and capabilities, for something simple like this it is over twice as complex as the basic Java previous Observer/Observed model.

Rx Comparison.png
Reply all
Reply to author
Forward
0 new messages