Cursor Observable in Android

2,013 views
Skip to first unread message

Dmitry Zaytsev

unread,
Oct 2, 2014, 4:58:44 AM10/2/14
to rxj...@googlegroups.com
Did anyone managed to nicely fit together RxJava and SQLite database in Android? So far I found almost nothing on this topic.

In my current (maybe naïve) approach I'm using Subject, which is notified on database updates:

private final BehaviorSubject<Void> updates = BehaviorSubject.create();

 Then as soon as value is inserted/updated/deleted:

public void insertValue(MyPojo pojo) {
    //... insert to DB 
    updates.onNext(null);
 
... and then all queries to database observe that Subject:

public Observable<Cursor> getMyValues() {
    return updates.map(new Func1<Void, Cursor>() {
                    @Override
                    public Cursor call(Void aVoid) {
                        return databaseHelper.getReadableDatabase().rawQuery("SELECT 0 _id, RANDOM() value", null);
                    }
                });

Everything seems just fine, but Cursors are not closed after usage and I don't see how can I manage it with current approach. Any ideas? 

Ron Shapiro

unread,
Oct 2, 2014, 10:47:50 PM10/2/14
to rxj...@googlegroups.com
At Venmo, we use a combination of our IterableCursor library (https://github.com/venmo/cursor-utils) and Rx in our database. One convenient thing we do is (with Java 8 for simplicity):

Observable<IterableCursor<Pojo>> obserable = db.loadSomeData();
observable.doOnTerminate(Cursor::close); 
Observable<Pojo> pojoObservable = observable.flatMap(cursor -> Observable.from(cursor)); // optional, but since our cursors are also Iterable, this will now emit Pojos directly
pojoObservable.subscribe(/* ... */);

You could also .map(cursor -> new CursorList(cursor)) instead of flatMap and then use the cursor directly, but safely close the actually SQLiteCursor.

If you want to support backpressure, you may want to listen for when the subscriber is unsubscribe and close.

Let me know if you want to see more code examples.

Best,
Ron

Roman Mazur

unread,
Oct 3, 2014, 6:35:20 AM10/3/14
to Ron Shapiro, rxj...@googlegroups.com
I'm a bit worried by your second line in

Observable<IterableCursor<Pojo>> obserable = db.loadSomeData();
observable.doOnTerminate(Cursor::close); 

Note that observables are not mutable. Hence, you must do 

obserable = db.loadSomeData().doOnTerminate(Cursor::close);

in order to close the cursor ;)
--
Best regards,
Roman Mazur 

Software engineer at Stanfy (http://stanfy.com.ua)
Skype: roman.mazur.f

Ron Shapiro

unread,
Oct 3, 2014, 7:22:29 AM10/3/14
to Roman Mazur, rxj...@googlegroups.com

Correct, I separated it on multiple lines for clarity in email, but that's how I would suggest actually writing it.

--
Ron Shapiro
Sent from my Android-inspired phone

Dmitry Zaytsev

unread,
Oct 3, 2014, 7:26:36 AM10/3/14
to rxj...@googlegroups.com
Thanks for example! Library in particular looks great.

Although few points are not yet clear to me.
  1. If you're closing Cursor onTerminate (onComplete) then you have to iterate through whole Cursor. It makes it problematic to use with CursorAdapter - I guess you're just not using it, but  instead some custom implementation of BaseAdapter?
  2. How do you reload Cursor on database updates?

Roman Mazur

unread,
Oct 3, 2014, 7:48:31 AM10/3/14
to Dmitry Zaytsev, rxj...@googlegroups.com
Here's another example

It's done with Enroscar Async https://github.com/stanfy/enroscar/tree/master/async in combination with rx support.

Dmitry Zaytsev

unread,
Oct 3, 2014, 8:08:03 AM10/3/14
to rxj...@googlegroups.com, dmitry...@gmail.com
That seems like a way to go, but I want to completely replace loaders and ContentProvider with RxJava.
Please, correct me if I did not understood correctly how it's done in Enroscar Async.

Roman Mazur

unread,
Oct 3, 2014, 8:32:52 AM10/3/14
to Dmitry Zaytsev, rxj...@googlegroups.com
Yep, it will use a loader and query a content provider via uri :). Notifications about updates are also done via content resolver (basically it will cause your onNext to be invoked).
Cursor will be closed when loader is reset.
And you'll be able to work with an Observable that is backed by loader callbacks.

Ron Shapiro

unread,
Oct 3, 2014, 9:14:42 AM10/3/14
to Roman Mazur, Dmitry Zaytsev, rxj...@googlegroups.com
What exactly are you trying to do? CursorAdapter closes the cursor if you getFilter().filter() on the ListView (and implement runQueryOnBackgroundThread). It uses .changeCursor(), which replaces the cursor and closes the old one. I'm not sure RxJava really has much to help in this case - SQLite Cursors aren't inherently streams that "finish" in the CursorAdapter model - the Observable can't know when you're done looking at it's rows.

Dmitry Zaytsev

unread,
Oct 3, 2014, 10:45:35 AM10/3/14
to rxj...@googlegroups.com, mazur...@gmail.com, dmitry...@gmail.com
I'm assuming that app works not with a stream of rows, but with an infinite stream of cursors. Going that way we can close Cursor as soon as there are no more subscribers (or if new Cursor coming in).
Thanks for a different perspective - I'll play with those solutions and then will post update on my approach here.
Reply all
Reply to author
Forward
0 new messages