Observables, and Callbacks and Threading, Oh My!

So you want to react-ify your Android app? Excellent! Welcome to the club! If you’re like me you’ll start by using several of the RxBindings libraries to turn those ugly boiler plate Android UI callbacks into sexy new Observables. And if you’re even more like me, after a while you’ll notice that RxBindings is missing a few callbacks that you really really need.

So you decide to write one yourself! Mazel Tov! But, wait, you have no idea how to even go about doing that… Let’s walk through some code to see how you can create your own Observables from existing callbacks and some “gotchas” to look out for.

Listener vs. RxJava

Let’s start with everyone’s favorite callback, View.OnClickListener.

Here is the original non-Rx version:

view.setOnClickListener(new OnClickListener() {
  @Override
  public void onClick(View v) {
    doSomething();
  }
});

And here it is using RxJava & RxBindings:

RxView.clicks(view).subscribe(new Action1<Void>() {
  @Override
  public void call(Void aVoid) {
    doSomething();
  }
});

RxView.clicks: The What and the How

Let’s walk through the RxView.clicks() method to see what’s going on under the hood.

@CheckResult @NonNull
public static Observable<Void> clicks(@NonNull View view) {
  checkNotNull(view, "view == null");
  return Observable.create(new ViewClickOnSubscribe(view));
}

First, we check that the View we passed in isn’t null. Even RxJava can’t spare us from having to guard against null pointer exceptions.

Next, we create a new Observable via the create method. Unlike the Observable.just() or Observable.from() methods, which allow you to create an Observable from just about any object, the create method takes in an object that implements the OnSubscribe interface.

Tell me more about this OnSubscribe

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>>

The documentation for OnSubscribe is short and to the point. It reads:

Invoked when Observable.subscribe is called.

Seems simple enough. To create an Observable using Observable.create() you need an object that implements the OnSubscribe interface. With this object, you can write whatever sort of magic you want to be executed when subscribe is called on your Observable. While that is all technically true, it’s not that easy.

If you look at the documentation for create(), you will see that the authors had a good deal to say about how to implement the OnSubscribe interface.

Write the function you pass to {@code create} so that it behaves as an Observable.

Which means that the function you create should accept a Subscriber<T> and invoke the Subscriber<T>’s onNext, onError, and onCompleted methods appropriately.

Now that we have the requirements for implementing OnSubcribe, let’s go back to our View.OnClickListener example and take a look at how RxBindings implements it in the ViewClickOnSubscribe class.

ViewClickOnSubscribe: An OnSubscribe Implementation

final class ViewClickOnSubscribe implements Observable.OnSubscribe<Void> {
  final View view;

  ViewClickOnSubscribe(View view) {
    this.view = view;
  }

  @Override
  public void call(final Subscriber<? super Void> subscriber) {
    checkUiThread();

    View.OnClickListener listener = new View.OnClickListener() {
      @Override
      public void onClick(View v) {
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(null);
        }
      }
    };

    view.setOnClickListener(listener);

    subscriber.add(new MainThreadSubscription() {
      @Override
      protected void onUnsubscribe() {
        view.setOnClickListener(null);
      }
    });
  }
}

See anything you recognize? Our old friend View.OnClickListener! Here is where we finally see the transformation from listener (callback) to Observable. We can also see all the requirements for OnSubscribe we learned about earlier in action!

But there are a few others things going on here. You might notice that this class holds a strong reference to the View that is being observed. So for memory management it is important to unsubscribe to the Observable to free the reference.

But there’s something else going on here: there’s a good deal of talk about the main thread.

Threading And Observables

Observables can be “observed on” and “subscribed on” particular threads using Schedulers. Sometimes you can manually set these threads via a Scheduler and sometimes it’s set for you. There is a metric ton of material out there explaining all about threading and Observables in much greater detail than I will go into. For now, let’s focus on how threads are used in ViewClickOnSubscribe.

First up, let’s look into the checkUiThread() method. As stated in the View class:

The entire view tree is single threaded. You must always be on the UI thread when calling any method on any view.

So we need to ensure the Observable we create is subscribed to from the main thread. Just as it is important to call subscribe() on the Observable on the main thread, it is equally important that we call unSubscribe() on the Observable on the main thread.

At the bottom of the class we see that ViewClickOnSubscribe takes care of this for us via a MainThreadSubscription. This Subscription ensures that the unSubscribe() call is executed on the main thread.

Lather, Rinse, Repeat

Voila! We have now gone from ugly callback to sexy Observable! Before we go forward let’s recap the basic steps towards creating an Observable from a listener.

  1. Create a class that implements Observable.OnSubscribe<T> where T is the type of object that will be passed in the subscriber.onNext() call.

  2. In your implementation of call(final Subscriber<? super T> subscriber), create an instance of the listener you are converting.

  3. Make sure to use the Subscriber to call onNext, onError or onCompleted in the call() method so your function “behaves like an Observable.”

Easy peasy! What can go wrong?

As we saw in the ViewClickOnSubscribe class, when you’re dealing with UI related callbacks, we know for sure that all our work will happen on the main thread. But what about when you’re not working with UI callbacks?

In the ViewClickOnSubscribe class, when we set our internal listener to listen for click events from the View, there is something happening in the background that we need to pay close attention to. What thread is the callback being triggered from? In this case, it is the main thread. But that might not always be so obvious.

The thread that the callback is triggered from is very important. It affects how our OnSubscribe implementation will work. The Subscriber passed in our call() method will receive the Observable’s events (i.e. onNext(), onError(), and onCompleted()) on the thread from which the callback was fired.

The object implementing the OnSubscribe interface emits events on the thread from which the callback is fired.

So, why should I care?

I know I said I wouldn’t get into threading too much, but a little background is necessary in understanding why you need to care about what thread callbacks are triggered from.

subscribeOn and observeOn

As stated in Michael Parkers’ Effective-RxJava:

By default, Observable instances specify an execution policy of “immediate.”

While in most cases that may be fine, it’s not so great for larger, more intensive operations that may block the main thread. Luckily, RxJava has solved this problem by giving us subscribeOn(Scheduler scheduler) and observeOn(Scheduler scheduler).

subscribeOn(Scheduler scheduler) applies to upstream Observable instances and its scheduler specifies the thread on which the upstream subscribe method is invoked. In English, every Observable and call above the subscribeOn(Scheduler scheduler) call will happen on the thread specified by subscribeOn().

Conversely, the observeOn(Scheduler scheduler) method applies to downstream Observable instances. Its Scheduler parameter specifies the thread on which events, such as the next emitted value or the stream terminating normally or with an error, are observed downstream.

Okay, but seriously, why should I care?

Using observeOn and subscribeOn means you can reasonably assume what is being done on what thread. However if you are using those methods in conjunction with an Observable that has an internal listener that is being triggered from a callback on a different thread, you lose that guarantee. Because now your custom Observable is internally switching threads.

That means that your calls to subscribeOn or observeOn on your Observable will be ignored once you hit the custom Observable because it will switch threads without you realizing.

To quote the great Porky Pig, "That’s All Folks!”