I've run into a little predicament, that I'm not sure how to tackle. I'm trying to understand how to automatically unsubscribe a subscription, after it completes.
My use case is as follows:
- User selects a list of pictures (in an Android fragment)
- We take in those pictures, create a bunch of "Post" objects from those pictures (database i/o operation)
- method signature Observable<List<Post>> createListOfPhotoPostsForSelectedPhotos()
- Process the pictures in each of those posts (i/o operation)
- method signature Observable<Post> processPhotoPosts(List<Post> photoPostList)
- Go ahead and upload it to our server (network operation)
- method signature Observable uploadPhotoPostsReadyForUpload()
I also have a CompositeSubscription called "subscriptions" which basically takes in each of the subscription from the above operations via a subscriptions.add(createListOfPhotoPostsForSelectedPhotos().subscribeOn(Schedulers.io).subscribe()) ... etc. On the destruction of my fragment/activity in question, I call subscriptions.unsubsribe() in order to ensure my activity/fragment/inner subscribers don't get leaked (a common pattern/issue for Android).
Typically after operation 1. we no longer need the user's attention and would like to exit out of the fragment/activity, and proceed with operations 2, 3 and 4 in the background. This makes it non-blocking and allows the user to proceed to other parts of the app, while we dutifully perform the long-running database+network operations in the background.
This is not too much of a problem usually, as we can easily schedule the work to be performed on background (non-ui) threads and be on our merry way. Unfortunately as I mention, since I unsubscribe right on exit, operations 3 and 4 are halted on the exit of activity, which happens after operation 1. Ideally, what I would like to do is proceed with operations 2, 3 & 4 and then have them auto-unsubscribed/released after the operation is done.
Question 1 (RxJava specific): Does RxJava provide a mechanism for auto-unsubscribing onComplete? Essentially, I'm looking for a way to not have to manually call subscriptions.unsubscribe in my onDestroy, but just let the Rx work go on to completion and then release all associated resources. Is this possible? Am i misunderstanding the usage?
Question 2 (Android + RxJava): An alternative approach would be to proceed with operations 2,3 and 4 in an Android "service" and deal with unsubscribing there (instead of doing everything inside my Fragment/Activity). However, I was hoping to alleviate the use of the Android Service completely with RxJava as one of the primary use cases is to run stuff in parallel, in the background, which RxJava does much more elegantly. Is this a pipe dream :P ?
My code if it's helpful (i've tried to simplify it a bit as i do a couple of more things and make use of a replay subject):
subscriptions.add(
createListOfPhotoPostsForSelectedPhotos()
.subscribeOn(Schedulers.io()) //
.observeOn(Schedulers.io()) //
.subscribe(new Action1<List<Post>>() {
@Override
public void call(List<Post> posts) {
subscriptions.add(processPhotoPosts(posts)
.map(new Func1<Post, Object>() {
@Override
public Object call(Post post) {
processedPhotoPostsCollectorSubject.onNext(post);
return Observable.empty();
}
})
.subscribeOn(Schedulers.io())
.subscribe());
subscriptions.add(uploadPhotoPostsReadyForUpload() //
.subscribeOn(Schedulers.newThread())
.subscribe());
getActivity().finish();
}
}));
@Override
public void onDestroy() {
super.onDestroy();
_subscriptions.unsubscribe();
Timber.d("All your RX has been unsubscribed !");
}
public Observable<List<Post>> createListOfPhotoPostsForSelectedPhotos() {
return Observable.from(getSelectedPictures())
.map(new Func1<Photo, Post>() {
@Override
public Post call(Photo item) {
// make a photo post
// ...
photoPost.saveToDatabase();
return photoPost;
}
})
.collect(createdPosts, new Action2<List<Post>, Post>() {
@Override
public void call(List<Post> posts, Post post) {
createdPosts.add(post);
Collections.sort(createdPosts);
}
})
}
public Observable<Post> processPhotoPosts(List<Post> photoPostList) {
return Observable.from(photoPostList).map(new Func1<Post, Post>() {
@Override
public Post call(Post post) {
// ...
// do some fancy photo processing
// update the photo post in the local database
return post;
}
}).subscribeOn(Schedulers.io());
}
public Observable uploadPhotoPostsReadyForUpload() {
// processedPhotoPostsCollector is a ReplaySubject
return processedPhotoPostsCollector.asObservable()
.flatMap(new Func1<Post, Observable<?>>() {
@Override
public Observable<?> call(Post post) {
Timber.d("Adding photo post (%s) to tape queue as it is ready - photo dimensions - %d, %d",
post.getIdentCode(),
post.getImageWidth(),
post.getImageHeight());
_queue.add(new PhotoPostsUploadTask(post.getIdentCode(),
firstUploadStartTime));
return Observable.empty();
}
});
}