1. Observable creation and error handling
Consider the following example:
public Observable<Book> getFavoriteBook(User user) { return Observable.just(user.getFavoriteBookId()) .flatMap(bookId -> bookService.getById(bookId)) .onErrorReturn(throwable -> DEFAULT_FAVORITE_BOOK); }
Focus on the error handling part. In my experience in 95% of the cases the expectation behind the statement .onErrorReturn(...);
is to ensure that the method getFavoriteBook()
is “safe”, e.g. that an exception cannot be thrown from it at all, as if it was surrounded by a giant try-catch.
Of course a NullPointerException
can easily occur if the user
object is null – after all that’s plain Java. The onErrorReturn()
couldn’t kick in, as we couldn’t enter the RX world at all. If you think about it – we tried to give the just()
operator a value, but we crashed when this value was being evaluated. This example may be obvious, but I’ve seen the same mistake being made so often. Here’s a few more examples:
public void doSomething(Items... items) { Observable.from(items) .onErrorReturn(throwable -> DEFAULT_ITEM) .subscribe(... do something with item ...); }
public Observable<Boolean> isAnyAvailable(List<String> bookIds) { return Observable.from(bookIds) .flatMap(bookId -> bookService.getById(bookId)) .filter(book -> book.isAvailable()) .defaultIfEmpty(libraryCache.isAnythingAvailable()) .onErrorReturn(throwable -> NOTHING_AVAILABLE); }
Notice the statement libraryCache.isAnythingAvailable()
– despite sitting somewhere in the middle of an RX
chain, it’s evaluated when creating the observable itself (in other words when the method isAnyAvailable(bookIds)
is called), thus a NullPointerException
will be thrown if libraryCache
is null.
If we really wanted to make the first example error-safe, we could for example use the defer()
operator.
public Observable<Book> getFavoriteBook(User user) { return Observable.defer( () -> Observable.just(user.getFavoriteBookId()) .flatMap(bookId -> bookService.getById(bookId)) ) .onErrorReturn(throwable -> DEFAULT_FAVORITE_BOOK); }
defer()
postpones the execution of the passed in observableFactory
method (() -> Observable.just(user.getFavoriteBookId().flatMap(bookId -> bookService.getById(bookId))
) to when someone subscribes to it. You can think of this execution happening “in the RX world”, so this time our onErrorReturn()
handles even the null pointer exception.
Rule of thumb: pay careful attention whether a statement will be executed when creating the observable (e.g. just()
, defaultIfEmpty()
, etc), or as part of the execution of the observable chain (e.g. in a flatMap()
transformation).
2. Where should subscribeOn() and observeOn() stay in an RX chain
As a quick refresher, having multiple subscribeOn()
operators in a single RX chain doesn’t make sense, as we have just one moment of subscribing to an Observable
. This will happen on the thread represented by the subscribeOn()
that sits closest to the Observable creation.
Later in the chain we can move execution between threads as much as we like with multiple observeOn()
operators.
Here’s a simple example:
@Test public void schedulingTest() throws Exception { CountDownLatch latch = new CountDownLatch(2); Observable.range(1, 5) .subscribeOn(Schedulers.io()) .doOnNext(num -> printValueAndThreadInfo("received " + num)) .observeOn(Schedulers.computation()) .doOnNext(num -> printValueAndThreadInfo("filtering " + num)) .filter(num -> num > 3) .observeOn(Schedulers.newThread()) .doOnNext(num -> printValueAndThreadInfo("calculating square of " + num)) .map(num -> num * num) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.computation()) .subscribe(square -> { printValueAndThreadInfo("onNext " + square); latch.countDown(); }); latch.await(); }
Ignore the CountDownLatch
part, that’s just used to ensure our unit test won’t terminate before we’ve seen the nice output we’re expecting, namely:
received 1, on thread RxIoScheduler-3, total threads: 10 received 2, on thread RxIoScheduler-3, total threads: 10 received 3, on thread RxIoScheduler-3, total threads: 10 received 4, on thread RxIoScheduler-3, total threads: 10 filtering 1, on thread RxComputationScheduler-2, total threads: 10 filtering 2, on thread RxComputationScheduler-2, total threads: 10 filtering 3, on thread RxComputationScheduler-2, total threads: 10 filtering 4, on thread RxComputationScheduler-2, total threads: 10 received 5, on thread RxIoScheduler-3, total threads: 10 calculating square of 4, on thread RxIoScheduler-2, total threads: 10 filtering 5, on thread RxComputationScheduler-2, total threads: 10 onNext 16, on thread RxComputationScheduler-1, total threads: 10 calculating square of 5, on thread RxIoScheduler-2, total threads: 10 onNext 25, on thread RxComputationScheduler-1, total threads: 10
* please note the order of these events is not guaranteed
As expected, we emit all items from the range()
operator on an IO
scheduler (RxIoScheduler-3), as subscribeOn(Scuedulers.io())
sits closest to the observable creation. Notice how the last subscribeOn(Schedulers.newThread())
seems to be ignored – it has no effect for our execution. All observeOn()
operators however have the desired effect – notice how each filtering is happening on a computation thread (RxComputationScheduler-2), each calculation of square on an IO thread and finally – each onNext()
event comes on a computation thread as well.
To get into a bit more details, in reality all extra subscribeOn()
and observeOn()
operators aren’t just ignored – they’re actually wasting a tiny bit of resources each (up to a certain limit). To prove this point, let’s add some more redundant operators. Notice how the number of total threads is immediately increased:
@Test public void schedulingTest() throws Exception { CountDownLatch latch = new CountDownLatch(2); Observable.range(1, 5) .subscribeOn(Schedulers.io()) .doOnNext(num -> printValueAndThreadInfo("received " + num)) .observeOn(Schedulers.computation()) .doOnNext(num -> printValueAndThreadInfo("filtering " + num)) .filter(num -> num > 3) .observeOn(Schedulers.newThread()) .doOnNext(num -> printValueAndThreadInfo("calculating square of " + num)) .map(num -> num * num) .subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.computation()) .subscribe(square -> { printValueAndThreadInfo("onNext " + square); latch.countDown(); }); latch.await(); }
Output:
received 1, on thread RxIoScheduler-2, total threads: 15 received 2, on thread RxIoScheduler-2, total threads: 15 received 3, on thread RxIoScheduler-2, total threads: 15 filtering 1, on thread RxComputationScheduler-2, total threads: 15 received 4, on thread RxIoScheduler-2, total threads: 15 received 5, on thread RxIoScheduler-2, total threads: 15 filtering 2, on thread RxComputationScheduler-2, total threads: 15 filtering 3, on thread RxComputationScheduler-2, total threads: 15 filtering 4, on thread RxComputationScheduler-2, total threads: 15 filtering 5, on thread RxComputationScheduler-2, total threads: 15 calculating square of 4, on thread RxNewThreadScheduler-7, total threads: 15 calculating square of 5, on thread RxNewThreadScheduler-7, total threads: 15 onNext 16, on thread RxComputationScheduler-1, total threads: 15 onNext 25, on thread RxComputationScheduler-1, total threads: 15
* notice that the number of total threads is exactly 5 more than before (as number of redundant operators we added)
Now a more real-world example:
private CompositeSubscription subscriptions = new CompositeSubscription(); private BookMapper mapper = new BookMapper(); ... public void onScreenReady() { subscriptions.add( interactor.fetchBooks() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(mapper::mapBooks) .flatMap(Observable::from) .filter(book -> book.isAvailable()) .subscribe(this::onBookReceived, this::onFetchingBookFailed) ); }
I’ve seen the pattern of putting subscribeOn
and observeOn
close to each other, perhaps because they both receive a Scheduler
as parameter. Doing so though cause all the mapping and filtering of books to be performed on the main thread, which we surely don’t want.
One thing to mention here – if you’re creating an API to be used by other layers of your app or other people (if you’re creating a library), putting subscribeOn()
on your Observables should be a deliberate and carefully thought-of decision. Doing so takes away control from your clients to specify subscription thread (remember – only the first subscribeOn()
matters!). This might make sense in case of blocking network / IO operations, where it’s a protection of your clients, but that’s not always the case, so be sure to have solid reasons when putting it there.
Rule of thumb: when it comes to blocking network / IO operations put subscribeOn()
as close to the observable creation as possible, in other cases put both subscribeOn()
and observeOn()
as close to the subscription as possible.
3. Think about compose() when you see yourself duplicating code
Sometimes there’s a sequence of transformations that need to be performed multiple times. Consider the following example for a shopping app:
public Observable<Bag> addItem(String itemId) { return service.addItem(itemId) .map(this::parseResponse) .map(this::someOtherTransformation) .doOnNext(this::someSideEffect) .onErrorResumeNext(this::handleError); } public Observable<Bag> removeItem(String itemId) { return service.removeItem(itemId) .map(this::parseResponse) .map(this::someOtherTransformation) .doOnNext(this::someSideEffect) .onErrorResumeNext(this::handleError); } public Observable<Bag> getBag(String bagId) { return service.getBag(bagId) .map(this::parseResponse) .map(this::someOtherTransformation) .doOnNext(this::someSideEffect) .onErrorResumeNext(this::handleError); } ...
Due to the nature of our API and business requirements we need to perform the same operations for every item manipulation. Using a Transformer
is a handy way of reducing duplication:
private Observable.Transformer<BagModel, Bag> bagTransformer = new Observable.Transformer<BagModel, Bag>() { @Override public Observable<BagModel> call(Observable<Bag> bagModelObservable) { return bagModelObservable .map(this::parseResponse) .map(this::someOtherTransformation) .doOnNext(this::someSideEffect) .onErrorResumeNext(this::handleError); } }; public Observable<Bag> addItem(String itemId) { return service.addItem(itemId) .compose(bagTransformer); } public Observable<Bag> removeItem(String itemId) { return service.removeItem(itemId) .compose(bagTransformer); } public Observable<Bag> getBag(String bagId) { return service.getBag(bagId) .compose(bagTransformer); }
We moved the duplicated operations to the Transformer
without doing any changes to the sequence. Even the error handling part was moved – that’s because the Transformer
operates on the whole Observable
chain, thus error events are flowing through it as well. Last thing to point out – it’s perfectly fine to change the type of the data that flows through our transformer – in our case from BagModel
to a Bag
objects.
Rule of thumb: using a Transformer
is a nice way to avoid duplicating RX sequences.
P.S. if you want to learn more about Transformers, Dan Lew’s post about transformers is an awesome read.
4. toBlocking() is a code smell
The toBlocking()
operator should be used rarely, only when there’s no other option. Think of it as the “exit of the RX realm”. It’s fine to use it when integrating new RX code into legacy one and you don’t want to perform big refactoring, but that’s about it.
Rule of thumb: if you find yourself using toBlocking()
in new code or feature you’re developing – most probably you should rethink your design or APIs!
Thanks for reading and stay tuned for part 2 🙂
5 comments
Wolfram Rittmeyer
8 years agoI disagree with your second recommendation and would instead say: Postpone both subscribeOn() as well as observeOn() for as long as possible and use them close to the last user. It doesn’t make any difference where in the chain you create subscribeOn(). The Scheduler used for the first subscribeOn() call will be used for all of the chain – including all parts of the chain that come before the subscribeOn() call.
If you use a subscribeOn() call too early you might hinder the client/subscriber of your Observable to select what is most appropriate given the circumstances of this specific use.
Normally I put both subscribeOn() and observeOn() immediately before my call to subscribe() in my presenters.
An exception could be, when you use IO, In this case it can make sense to enforce the IO scheduler usage by putting a subscribeOn() call close to where you use IO for the first time. For example for database and/or remote access in your repository.
Looking forward to your answer 🙂
veskoiliev
8 years agoHey, thanks for the comment!
When I put this “Rule of thumb” I actually had in mind the most common (in my experience) case, which is creating
Observables
for some networking or blocking IO operations. For that one it seems we both agree that puttingsubscribeOn()
close to the creation is a good idea 🙂 Indeed this takes away the power of the users to choose where to subscribe, but this choice makes sense as a “protection” to your users (e.g. Retrofit subscribes onSchedulers.io()
for you). I’ll update the post to reflect that specifically.For other cases where it makes sense to leave the choice of a subscription thread to the clients, I do agree it’s a good practice to postpone the
subscribeOn()
as much as possible. This way one has the freedom to combine streams as per own individual needs, to use parallelism, etc. However in this case, a client also has the responsibility of choosing the right treading model.Yair Carreno
7 years agoGood article, thanks. In regards to 4 recommendation, I found this great advice by David Karnok https://github.com/ReactiveX/RxJava/issues/3956
A typical mistake that comes up is something like this:
source.map(v -> someAPI(v).toBlocking().first())…
Instead, you should be using any of the flatMap, concatMap, etc.
source.concatMap(v -> someAPI(v))…
veskoiliev
7 years agoNice tip, thanks for sharing it!
ramtech
5 years agoThanks a lot. I was making the exact same mistake described in the first example and was struggling to figure out what was broken in the error flow.
After spending lot of time on it, I finally reached here and WOW.. everything is fixed now 🙂