RxAndroid Basics: Part 1

2016-02-10

Welcome friend! So glad to see you! So you're a little curious about this RxJava stuff and how it applies to Android. Maybe you've seen it used a few places but you're still a little confused and would like some clarifications. Well look no further!

When I was first introduced to RxJava I was mostly cargo coding with it. Some of it made sense, but I had a deep misunderstanding of some of the fundamentals. I was also really frustrated by a lack of good examples that actually explained what the heck was going on. In order to actually get a complete understanding of RxJava, I had to do a lot of digging and go through a lot of trial and error.

In an effort to save you, the cherished reader, some leg work, I decided to throw together a few examples based on my findings. The hope is these examples will provide you with the necessary knowledge to begin using RxJava in your Android apps.

These examples can all be found as part of a fully functional sample app. At the beginning of each example, I'll link to the specific Activity in which the following code snippets live. I've divided the examples into two parts. In Part 1, we'll focus on using RxJava to load data asynchronously. In Part 2, we'll dive into some more advanced usage patterns.

Some Concepts

Before we jump into the nitty gritty, let's start off with some basic concepts. RxJava, at it's core, is about two things: Observables and Observers. Observables are said to "emit" values. Their counterpart, Observers, watch Observables by "subscribing" to them.

Observers can take actions when an Observable emits a value, when the Observable says an error has occurred, or when the Observable says that it no longer has any values to emit. All three of these actions are encapsulated in the Observer interface. The corresponding functions are onNext(), onError(), and onCompleted().

With all that in mind, let's take a look at some examples.

Example 1: The Basics

Let's make an Activity that simply displays a list of colors. We're going to make an Observable that emits a single value, a list of strings, and then completes. We'll then use the emitted value to populate the list. We'll do this by using the Observable.just() method. This method creates an Observable such that when an Observer subscribes, the onNext() of the Observer is immediately called with the argument provided to Observable.just(). The onCompleted() will then be called since the Observable has no other values to emit.

Observable<List<String>> listObservable = Observable.just(getColorList());

Note that getColorList() is a non-blocking method call. This may not seem important now, but we'll come back to it later.

Next, we setup an Observer to watch the Observable:

listObservable.subscribe(new Observer<List<String>>() { 

    @Override 
    public void onCompleted() { } 

    @Override 
    public void onError(Throwable e) { } 

    @Override
    public void onNext(List<String> colors) {
        mSimpleStringAdapter.setStrings(colors);
    }
});

This is where the magic happens. As mentioned above, upon subscribing to the Observable with the subscribe() method the following happens immediately, in this order:

  1. The onNext() method is called and the emitted list of colors is set as the data for the adapter.
  2. Since there is no more data (we only gave our Observable a single value to emit in Observable.just()), the onCompleted() callback is called.

I find it useful to think about an Observable in terms of what it does upon subscription. In fact, Observables are primarily defined by their behavior upon subscription. Let's take care to remember this, as it will come in handy later. I'm going to state it again since it's so important:

Observables are primarily defined by their behavior upon subscription.

In this case we really don't care about what happens when the Observable has completed so we leave the onCompleted() method empty. There's also no way an error could get thrown, so we're just going to leave onError() empty as well.

All of this may seem like overkill. We could have just set the color list on our adapter directly. But with this in mind, let's take a look at something a little more interesting.

Example 2: Asynchronous Loading

Let's make an Activity that asynchronously loads a list of favorite television shows. Asynchronously loading data is probably the most common use of RxJava in Android. First, let's create our Observable:

Observable<List<String>> tvShowObservable = Observable.fromCallable(new Callable<List<String>>() { 

    @Override 
    public List<String> call() { 
        return mRestClient.getFavoriteTvShows(); 
    }
});

In our last example, we used Observable.just() to create our Observable. It would then reasonably follow that we might want to say something like Observable.just(mRestClient.getFavoriteTvShows()) to create our Observable for this example.

But we can't do this because mRestClient.getFavoriteTvShows() is a blocking network call. If we use it with Observable.just(), mRestClient.getFavoriteTvShows() will be evaluated immediately and block the UI thread.

Enter the Observable.fromCallable() method. It gives us two important things:

  1. The code for creating the emitted value is not run until someone subscribes to the Observable.
  2. The creation code can be run on a different thread.

These two properties will come in handy in a moment. Now let's subscribe to the Observable:

TvShowSubscription = tvShowObservable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<List<String>>() {
        
        @Override 
        public void onCompleted() { }
        
        @Override 
        public void onError(Throwable e) { }

        @Override 
        public void onNext(List<String> tvShows){
            displayTvShows(tvShows);
        }
    });

Let's break this down one function at a time. subscribeOn() essentially alters the Observable we created above. All of the code that this Observable would normally run, including the code the gets run upon subscription, will now run on a different thread. This means the logic in our Callable object, including the call to getFavoriteTvShows(), will run on a different thread. But which thread will it run on?

In this case we specify that the code is run on the "IO Scheduler" (Schedulers.io()). For now we can just think of a Scheduler as a separate thread for doing work. There's actually a little more going on here but this description will suffice for our purposes.

We've hit a bit of a snag now though. Since our Observable is set to run on the IO Scheduler, this means it's going to interact with our Observer on the IO Scheduler as well. This is a problem because it means our onNext() method is going to get called on the IO Scheduler. But the code in our onNext() calls methods on some of our views. View methods can only be called on the UI thread.

There's a simple way to address this. We can tell RxJava that we want to observe this Observable on the UI thread, i.e. we want our onNext() callback to be called on the UI thread. We do this by specifying a different scheduler in the observeOn() method, namely the scheduler returned by AndroidSchedules.mainThread() (a scheduler for the UI thread).

Last but not least we call subscribe(). This is critical since the code in our Callable won't run until we actually subscribe something to it. Remember how I said Observables are primarily defined by their behavior upon subscription? This is a great example of that.

There's one last thing. What's this mTvShowSubscription thing? When an Observer subscribes to an Observable a Subscription is created. A Subscription represents a connection between an Observer and an Observable. Sometimes we need to sever this connection. Let's take a look at some code that should go in our Activity's onDestroy() method in order to see why.

if (mTvShowSubscription != null && !mTvShowSubscription.isUnsubscribed()) {
    mTvShowSubscription.unsubscribe();
}

If you've ever done threading work on Android before you know there's typically one huge problem: what happens if your thread finishes (or never finishes) it's execution after an Activity has been torn down? This can cause a whole host of problems including memory leaks and NullPointerExceptions.

Subscriptions allow us to deal with this problem. We can say "Hey, Observable, this Observer doesn't want to receive your emissions anymore. Please disconnect from the Observer." We do this by calling unsubscribe(). After calling unsubscribe(), the Observer we created above will no longer receive emissions and in doing so avoid all the problems associated with a thread completing it's work (or not completing at all) after the Activity has been destroyed.

We also wrap our unsubscribe in an if statement. We need make sure that: a) we're not trying to unsubscribe something we never properly initialized (the null check), and b) that we didn't somehow unsubscribe somewhere else (the isUnsubscribed() check). Neither of those cases are possible in our simple example, but the check is good code-hygiene regardless.

Phew. Well there we go. If you've made it this far, you're past the hard part. Let's do a quick recap:

Example 3: Using Singles

Let's make an Activity that loads a list of favorite TV Shows, but this time let's do it in a simpler fashion. Observables are great, but in many cases they're kind of overkill. For example, you'll notice in the last two examples we only ever emitted a single value and never used the onCompleted() callback.

As it turns out, there's a simpler version of an Observable called a Single. Singles work almost exactly the same as Observables. But instead of there being an onCompleted(), onNext(), and onError(), there are only two callbacks: onSuccess() and onError(). Let's redo our example from above using a Single.

First we'll create a Single:

Single<List<String>> tvShowSingle = Single.fromCallable(new Callable<List<String>>() { 

    @Override
    public List<String> call() throws Exception {
        return mRestClient.getFavoriteTvShows(); 
    }
});

And then let's subscribe to it:

mTvShowSubscription = tvShowSingle
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new SingleSubscriber<List<String>>() {
        
        @Override 
        public void onSuccess(List<String> tvShows) {
            displayTvShows(tvShows); 
        }

        @Override 
        public void onError(Throwable error) {
            displayErrorMessage(); 
        } 
    });

This should look pretty familiar. We're calling subscribeOn() to make sure that our call to getFavoriteTvShows() is run off of the UI thread and we're calling observeOn() to make sure that the result of the Single is emitted on the UI thread.

Instead of using an Observer, we're using a class called SingleSubscriber. It's very similar to an Observer except that it just has the two methods we mentioned above: onSuccess() and onError(). A SingleSubscriber is to a Single what an Observer is to an Observable. Subscribing to a Single also results in the creation of a Subscription object. This Subscription behaves the same way as in Example 2 and we should make sure to unsubscribe in our onDestroy() method as well.

One final note. We've actually added some error handling code in this example. So if our call to mRestClient throws an error, we'll get the onError() callback. I encourage you to actually build and play around with this example in particular and see what happens when you use mRestClient.getFavoriteTvShowsWithException() instead of mRestClient.getFavoriteTvShows().

Wrap Up

That's it for Part 1. I hope you've found these examples helpful. Be sure to checkout Part 2 for some advanced examples.

Further Reading

The Observer Pattern

More about Observable.fromCallable()

RxLifecycle can make things a little easier in terms of remembering to unsubscribe