RxJava, Android, auto-unsubscribing and preventing context leaking

3,184 views
Skip to first unread message

Kaushik Gopal

unread,
Sep 2, 2014, 5:28:59 PM9/2/14
to rxj...@googlegroups.com
I've run into a little predicament, that I'm not sure how to tackle. I'm trying to understand how to automatically unsubscribe a subscription, after it completes. 

My use case is as follows:
  1. User selects a list of pictures (in an Android fragment)
  2. We take in those pictures, create a bunch of "Post" objects from those pictures (database i/o operation)
    • method signature Observable<List<Post>> createListOfPhotoPostsForSelectedPhotos()
  3. Process the pictures in each of those posts (i/o operation)
    • method signature Observable<Post> processPhotoPosts(List<Post> photoPostList)
  4. Go ahead and upload it to our server  (network operation)
    • method signature Observable uploadPhotoPostsReadyForUpload()
I also have a CompositeSubscription called "subscriptions" which basically takes in each of the subscription from the above operations via a subscriptions.add(createListOfPhotoPostsForSelectedPhotos().subscribeOn(Schedulers.io).subscribe()) ... etc. On the destruction of my fragment/activity in question, I call subscriptions.unsubsribe() in order to ensure my activity/fragment/inner subscribers don't get leaked (a common pattern/issue for Android).

Typically after operation 1. we no longer need the user's attention and would like to exit out of the fragment/activity, and proceed with operations 2, 3 and 4 in the background. This makes it non-blocking and allows the user to proceed to other parts of the app, while we dutifully perform the long-running database+network operations in the background.

This is not too much of a problem usually, as we can easily schedule the work to be performed on background (non-ui) threads and be on our merry way. Unfortunately as I mention, since I unsubscribe right on exit, operations 3 and 4 are halted on the exit of activity, which happens after operation 1. Ideally, what I would like to do is proceed with operations 2, 3 & 4 and then have them auto-unsubscribed/released after the operation is done.

Question 1 (RxJava specific): Does RxJava provide a mechanism for auto-unsubscribing onComplete? Essentially, I'm looking for a way to not have to manually call subscriptions.unsubscribe in my onDestroy, but just let the Rx work go on to completion and then release all associated resources. Is this possible? Am i misunderstanding the usage? 

Question 2 (Android + RxJava): An alternative approach would be to proceed with operations 2,3 and 4 in an Android "service" and deal with unsubscribing there (instead of doing everything inside my Fragment/Activity). However, I was hoping to alleviate the use of the Android Service completely with RxJava as one of the primary use cases is to run stuff in parallel, in the background, which RxJava does much more elegantly. Is this a pipe dream :P ? 


My code if it's helpful (i've tried to simplify it a bit as i do a couple of more things and make use of a replay subject):

        subscriptions.add(
              createListOfPhotoPostsForSelectedPhotos()                    
                    .subscribeOn(Schedulers.io()) //
                    .observeOn(Schedulers.io()) //
                    .subscribe(new Action1<List<Post>>() {
                              
                        @Override
                        public void call(List<Post> posts) {

                                 subscriptions.add(processPhotoPosts(posts)
                                                       .map(new Func1<Post, Object>() {


                                                           @Override
                                                           public Object call(Post post) {
                                                               processedPhotoPostsCollectorSubject.onNext(post);
                                                               return Observable.empty();
                                                           }
                                                       })
                                                       .subscribeOn(Schedulers.io())
                                                       .subscribe());

                                  subscriptions.add(uploadPhotoPostsReadyForUpload() //
                                                            .subscribeOn(Schedulers.newThread())
                                                            .subscribe());

                                 getActivity().finish();
                             }
                         }));


    @Override
    public void onDestroy() {
        super.onDestroy();
        _subscriptions.unsubscribe();
        Timber.d("All your RX has been unsubscribed !");
    }


    public Observable<List<Post>> createListOfPhotoPostsForSelectedPhotos() {
        return Observable.from(getSelectedPictures())

                         .map(new Func1<Photo, Post>() {
                             @Override
                             public Post call(Photo item) {
                                 
                                 // make a photo post
                                 // ...
                                 photoPost.saveToDatabase();
                                 return photoPost;
                             }
                         })

                         .collect(createdPosts, new Action2<List<Post>, Post>() {


                             @Override
                             public void call(List<Post> posts, Post post) {
                                 createdPosts.add(post);
                                 Collections.sort(createdPosts);
                             }
                         })
    }


    public Observable<Post> processPhotoPosts(List<Post> photoPostList) {
        return Observable.from(photoPostList).map(new Func1<Post, Post>() {


            @Override
            public Post call(Post post) {
                // ...
                // do some fancy photo processing
                // update the photo post in the local database
                return post;
            }
        }).subscribeOn(Schedulers.io());
    }




        public Observable uploadPhotoPostsReadyForUpload() {
            // processedPhotoPostsCollector is a ReplaySubject
            return processedPhotoPostsCollector.asObservable()
                                           .flatMap(new Func1<Post, Observable<?>>() {


                                               @Override
                                               public Observable<?> call(Post post) {

                                                   Timber.d("Adding photo post (%s) to tape queue as it is ready - photo dimensions - %d, %d",
                                                            post.getIdentCode(),
                                                            post.getImageWidth(),
                                                            post.getImageHeight());

                                                   _queue.add(new PhotoPostsUploadTask(post.getIdentCode(),
                                                                                       firstUploadStartTime));
                                                   return Observable.empty();
                                               }
                                           });
    }

Jake Wharton

unread,
Sep 2, 2014, 10:19:20 PM9/2/14
to Kaushik Gopal, rxj...@googlegroups.com
If the lifetime of the subscription outlives the activity you should not be tying it to a composite subscription which is. If the Observable is mutating (either directly or with various doOnXxx) you can simply call .subscribe() without an argument. Otherwise you should tie to the subscription to something which has a lifetime that matches that of the observable. For example, if the Observable interacts with global state then a service or even a singleton is more appropriate to hold the subscription.
Message has been deleted

Juan Carlos Yu

unread,
Aug 2, 2016, 11:46:48 AM8/2/16
to RxJava, m...@kaush.co
I stumbled upon this problem recently and I would like to ask for some clarifications regarding your advice on how to handle unsubscribing to subscriptions:

1. If the Observable is mutating (either directly or with various doOnXxx) you can simply call .subscribe() 
Won't this throw an exception if an error is not handled properly in .subscribe()?

2. Otherwise you should tie to the subscription to something which has a lifetime that matches that of the observable.
For example, if I would wrap a network request in an observable which, upon success (onNext), would update some sort of UI in XActivity. If I unsubscribe to the subscription, would this prevent the network request from being executed? The reason why I'm unsubscribing is to prevent the context from leaking since I will be updating views but I would still want that network request to be executed even after unsubscribing. 
Reply all
Reply to author
Forward
0 new messages