Re: my flatMap only accepts String as input; why?

176 views
Skip to first unread message
Message has been deleted

malibu66

unread,
Aug 9, 2020, 10:39:28 AM8/9/20
to RxJava


On Sunday, 9 August 2020 11:38:12 UTC-3, malibu66 wrote:
Hi there, I am trying to make a stream that starts low power ranging for bluetooth beacons.  I run an Observable.create() that triggers the ranging.  Two callback methods are required for this, so I save the observable in a global variable and call onNext from the callback once the beacons are found.  I'm doing a lot of summarizing here:

public class MainActivity extends BaseActivity implements BeaconConsumer {

private Subscriber beaconSubscriber = null;
....

@Override
public void onBeaconServiceConnect() {
   
beaconManager.removeAllRangeNotifiers();
   
beaconManager.addRangeNotifier(new RangeNotifier() {
       
@Override
        public void didRangeBeaconsInRegion(Collection<Beacon> beacons, Region region) {
           
if (beacons.size() > 0) {
               
Beacon closest_beacon = beacons.iterator().next();
               
double closest_beacon_dist = closest_beacon.getDistance();
               
Log.i("TAGTAGTAG.bt", "The first beacon I see is about "+closest_beacon_dist+" meters away.");
               
if (beaconSubscriber != null) {
                   
beaconSubscriber.onNext(closest_beacon);
               
}
           
}
       
}
   
});

   
try {
       
beaconManager.startRangingBeaconsInRegion(new Region("myRangingUniqueId", null, null, null));
   
} catch (RemoteException e) {    }
}

protected void rangeBT() {


         
this.beaconSubscription = Observable.create(new Observable.OnSubscribe<String>() {
           
@Override
            public void call(Subscriber<? super String> subscriber) {
               
try {
                   
beaconSubscriber = subscriber;
                    rangeBT
();
               
} catch (Exception e) {
                    subscriber
.onError(e);        // Signal about the error to subscriber
                }
           
}
       
}).distinctUntilChanged() // ONLY EMIT WHEN CLOSEST BEACON CHANGED
               
.flatMap((b) -> {               // I NEED THIS FLATMAP TO WORK
                   
return callProximity(b));
               
})
//        Integer[] myints = {1,2,3,4,5,6,5,4,3,2,1};
//        this.beaconSubscription = Observable.from(myints)
//                .flatMap(v -> Observable.just(v*3))
                .subscribe(
               
new Observer() {
                           
@Override
                            public void onCompleted() {
                               
Log.i("TAGTAGTAG","MainActivity reportIBeaconProximity onComplete");
                           
}

                           
@Override
                            public void onError(Throwable e) {
                               
Log.i("TAGTAGTAG","MainActivity reportIBeaconProximity onError"+e.getMessage());
                           
}

                           
@Override
                            public void onNext(Object o) {
                               
Log.i("TAGTAGTAG","MainActivity reportIBeaconProximity onNext"+o);
                           
}
                       
}
       
);    beaconManager = BeaconManager.getInstanceForApplication(this);

    beaconManager.getBeaconParsers().add(new BeaconParser().
            setBeaconLayout
("m:2-3=0215,i:4-19,i:20-21,i:22-23,p:24-24"));;
   
beaconManager.bind(this);
}
.





Sorry about the code formatting, I would tihnk Google could do better.  Anynow, I want to draw your attention to the where I have commented I NEED THIS FLATMAP TO WORK.  Basically, the create has emitted a 'Beacon' object that represents the nearest beacon.  I would like to use a flatMap to create an observable for a web request which will find more details about this beacon.  For some reason Java will not work with a flatMap unless I emit a String or Integer to it.  Note the alternate observer I have left in the comments; it emits Integers and goes through a flatmap just fine.  However, when I try to emit 'Beacon' to the flatmap the app will not compile, it gives me an error of "Cannot infer functional interface type".

I spent all day yesterday googling on the error and reading about functional interfaces and java lambdas but the reason why this happens still alludes me.  flatMap should be able to take any kind of object and return an Observable.  What am I missing??

malibu66

unread,
Aug 9, 2020, 2:25:26 PM8/9/20
to RxJava
Hi there.  I am trying to make sense of rxJava within an Android app and I'm running into problems with typing.  Many thanks in advance in advance to anyone who can spot what I'm doing wrong.

In a nutshell, I cannot figure out how to make an rxJava flatMap accept a custom type.  I am trying to use the Android beacon library to detect beacons and I have that starting with a create() and then piping through a distinctUntilChanged and that part works.  It filters out all the repeated 'Beacon' classes and only sends on the new ones.  Great.

However, when I chain a flatMap to that so that it can accept an onNext(Beacon) it will not compile, it gives me an error "cannot infer functional interface type".  It works with String or Integer but not Beacon.  Furthermore, the lambda does not seem to understand that it has a Beacon type and it gets an error calling the handler function 'requires Beacon  but sending String'.

I have googled for over a day.  I have looked at Java Lambdas and functional interfaces but there are no examples that show how to use them with a flatMap.  All rxjava flatmap examples out there seem to only use String or Integer.

I will paste code in a further comment.  Is there a way to edit posts in google groups?  My apologies for this duplicate post, I tried to edit my last post and got it all messed up.

malibu66

unread,
Aug 9, 2020, 2:51:21 PM8/9/20
to RxJava
My code needs to be refactored badly and is very big.  I tried to highlight the important sections:

public class MainActivity extends BaseActivity implements BeaconConsumer {   // I like to get it working in MainActivity first

private Subscriber beaconSubscriber = null; // Init an empty subscriber for the stream

....

   
@Override
   
protected void onCreate(Bundle savedInstanceState) {
       
....
        initBT
(); // See definition of this below the callback

       
....
   
}
   
   
@Override
   
public void onBeaconServiceConnect() {
        beaconManager
.removeAllRangeNotifiers();
        beaconManager
.addRangeNotifier(new RangeNotifier() {
           
@Override
           
public void didRangeBeaconsInRegion(Collection<Beacon> beacons, Region region) {
               
if (beacons.size() > 0) {
                   
Beacon closest_beacon = beacons.iterator().next();

                   
if (beaconSubscriber != null) {
                        beaconSubscriber
.onNext(closest_beacon);
                   
}
               
}
           
}
       
});

       
try {
            beaconManager
.startRangingBeaconsInRegion(new Region("myRangingUniqueId", null, null, null));
       
} catch (RemoteException e) {    }
   
}

   
   
private Observable<String> callProximity(Beacon beacon) {
       
return QJumperApplication.wampClient.reportIBeaconProximity(beacon.getId1().toHexString(), beacon.getId2().toInt(), beacon.getId3().toInt());
   
}
   
   
protected void initBT() {
       
.... (Android permission stuff)
       
// THIS STREAM DOESN'T WORK        

       
this.beaconSubscription = Observable.create(new Observable.OnSubscribe<String>() {
           
@Override
           
public void call(Subscriber<? super String> subscriber) {
               
try {
                    beaconSubscriber
= subscriber;
                    rangeBT
();
               
} catch (Exception e) {
                    subscriber
.onError(e);        // Signal about the error to subscriber
               
}
           
}
       
})
       
.distinctUntilChanged()

       
.flatMap((b) -> callProximity(b))       // flatMap will not work with type 'Beacon' whether I use callProximity as a wrapper or not.
                                               
// Basically, this goes off and does a web request for information about the beacon,
                                               
// returning an observable for the response.

// THIS STREAM DOES WORK

//        Integer[] myints = {1,2,3,4,5,6,5,4,3,2,1};
//        this.beaconSubscription = Observable.from(myints)
//                .flatMap(v -> Observable.just(v*3))

       
.subscribe(
           
new Observer() {
                           
@Override
                           
public void onCompleted() {

                               
Log.i("TAG","MainActivity reportIBeaconProximity onComplete");

                           
}

                           
@Override
                           
public void onError(Throwable e) {

                               
Log.i("TAG","MainActivity reportIBeaconProximity onError"+e.getMessage());

                           
}

                           
@Override
                           
public void onNext(Object o) {

                               
Log.i("TAG","MainActivity reportIBeaconProximity onNext"+o);
                           
}
                       
}
       
);
   
}



.

malibu66

unread,
Aug 9, 2020, 2:53:35 PM8/9/20
to RxJava
Please note the stream that I have commented as "THIS DOES WORK".  For some reason flatMap decides to work with String or Integer but not Beacon.

I am no Java expert, but this seems very strange.

malibu66

unread,
Aug 9, 2020, 3:18:10 PM8/9/20
to RxJava
With all due respect, the last time I tried Stack Overflow I got ignored.  Then I read many comments from people on the internet who could not get attention on Stack Overflow.  Also this seems like a bug to me; if it is not than let me thank you again for your time.

I searched for a "rxjava forum" and this is where I ended up.  All I really need is a full end to end example which I have not been able to find.


So are you telling me the error "could not infer functional interface" could be caused by the return of the down stream observable?

Here is what the function returns:

Observable observable=Observable.create(new Observable.OnSubscribe< Object>() {
@Override
public void call(final Subscriber<? super Object> subscriber) {

You know what, another developer did this possibly incorrectly. I've noticed that there is an Observable.create( with a new observable inside and that doesn't jive with that I thought.




You know what, maybe I just need to know whether this 'should' work or not and then I can get out of your hair.

malibu66

unread,
Aug 9, 2020, 3:31:05 PM8/9/20
to RxJava
Never mind, from what I read the .create(new Observable.OnSubscribe(  may be redundant but shouldn't cause a difference in this regard.
Reply all
Reply to author
Forward
0 new messages