Check out our Bootcamp Schedule View Schedule

What is Functional Reactive Programming?

Josh Skeen

Functional Reactive Programming (FRP) offers a fresh perspective on solving modern programming problems. Once understood, it can greatly simplify your project, especially when it comes to code dealing with asynchronous events with nested callbacks, complex list filtering/transformation or timing concerns.

I will strive to skip academic explanations of Functional Reactive Programming (there are many of those on the internet already!) and focus instead on helping you gain a pragmatic understanding of what Functional Reactive Programming is—and how we can put it to work for us. This article will revolve around a particular implementation of Functional Reactive Programming called RxJava, which works on Java and Android.

Getting Started

Let’s start with a tangible example of how the Functional Reactive Programming ideas can improve our code’s readability. Our task is to query GitHub’s API to first get a list of users, and then request the details for each user. This will involve two web service endpoints:

https://api.github.com/users – retrieve list of users

https://api.github.com/users/{username} – retrieve details for a specific username, such as https://api.github.com/users/mutexkid

Old Style

The example below shows what you may already be familiar with: It calls to the web service, uses a callbacks interface to pass the successful result to the next web service call, define another success callback, and then moves on to the next web service request. As you can see, this results in two nested callbacks:

//The "Nested Callbacks" Way
    public void fetchUserDetails() {
        //first, request the users...
        mService.requestUsers(new Callback<GithubUsersResponse>() {
            @Override
            public void success(final GithubUsersResponse githubUsersResponse,
                                final Response response) {
                Timber.i(TAG, "Request Users request completed");
                final List<GithubUserDetail> githubUserDetails = new ArrayList<GithubUserDetail>();
                //next, loop over each item in the response
                for (GithubUserDetail githubUserDetail : githubUsersResponse) {
                    //request a detail object for that user
                    mService.requestUserDetails(githubUserDetail.mLogin,
                                                new Callback<GithubUserDetail>() {
                        @Override
                        public void success(GithubUserDetail githubUserDetail,
                                            Response response) {
                            Log.i("User Detail request completed for user : " + githubUserDetail.mLogin);
                            githubUserDetails.add(githubUserDetail);
                            if (githubUserDetails.size() == githubUsersResponse.mGithubUsers.size()) {
                                //we've downloaded'em all - notify all who are interested!
                                mBus.post(new UserDetailsLoadedCompleteEvent(githubUserDetails));
                            }
                        }

                        @Override
                        public void failure(RetrofitError error) {
                            Log.e(TAG, "Request User Detail Failed!!!!", error);
                        }
                    });
                }
            }

            @Override
            public void failure(RetrofitError error) {
                Log.e(TAG, "Request User Failed!!!!", error);
            }
        });
    }

Though it isn’t the worst code—it is asynchronous and therefore doesn’t block while it waits for each request to complete, at least—it’s less than desirable for the reasons that it’s messy (adding more levels of callbacks exponentially increases the illegibility), and isn’t very easy to work with when we inevitably need to change it (we’re dependent upon the previous callback’s state and so it doesn’t lend itself to modularization or changing the data handed off to the next callback in the previous web service call). Affectionately, this is referred to as “callback hell.”

The RxJava Way

Now, let’s look at the same functionality written with RxJava:

public void rxFetchUserDetails() {
        //request the users
        mService.rxRequestUsers().concatMap(Observable::from)
        .concatMap((GithubUser githubUser) ->
                        //request the details for each user
                        mService.rxRequestUserDetails(githubUser.mLogin)
        )
        //accumulate them as a list
        .toList()
        //define which threads information will be passed on
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        //post them on an eventbus
        .subscribe(githubUserDetails -> {
            EventBus.getDefault().post(new UserDetailsLoadedCompleteEvent(githubUserDetails));
        });
    }

As you can see, we lose the callbacks entirely when using the Functional Reactive Programming model, and wind up with a much smaller program. Let’s begin to unpack what just happened here by starting with a basic definition of Functional Reactive Programming, and work our way toward understanding the code above, which is available on GitHub.

Fundamentally, Functional Reactive Programming is the Observer pattern, with support for manipulating and transforming the stream of data our Observables emit. In the example above, the Observables are a pipeline our data will flow through.

To recap, the Observer Pattern involves two roles: an Observable, and one or many Observers. The Observable emits events, while the Observer subscribes and receives them. In the example above, the .subscribe() call adds an Observer to the Observable and the requests are made.

Building an Observable Pipeline

Each operation on the Observable pipeline returns a new Observable, with either the same data as its content, or a transformation of that data. By taking this approach, it allows us to decompose the work we do and modify the stream of events into smaller operations, and then plug those Observables together to build more complex behavior or reuse individual pieces in the pipeline. Every method call we do on the Observable is adding onto the overall pipeline for our data to flow through.

Let’s look at a concrete example by setting up an Observable for the first time:

Observable<String> sentenceObservable = Observable.from(this, is, a, sentence);

Here we have defined the first segment of our pipeline, an Observable. The data that flows through it is a series of strings. The first thing to realize is that this is non-blocking code that hasn’t done anything yet, it’s just a definition of what we want to accomplish. Observable will only begin to do it’s work when we “subscribe” to it – in other words register an observer with it:

Observable.subscribe(new Action1<String>() {
          @Override
          public void call(String s) {
          	System.out.println(s);
          }
 }); 

Only now will Observable emit each chunk of data added in the from() call as individual Observables. The pipeline will continue to emit the observables until it runs out of them.

Transforming streams

Now that we’ve got a stream of strings being emitted, we can transform them as we like, and build up more complicated behavior.

Observable<String> sentenceObservable = Observable.from(this, is, a, sentence);

sentenceObservable.map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return s.toUpperCase() + " ";
            }
        })
.toList()
.map(new Func1<List<String>, String>() {
            @Override
            public String call(List<String> strings) {
                Collections.reverse(strings);
                return strings.toString();
            }
        })
//subscribe to the stream of Observables
.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });

Once the Observable is subscribed to, we will get “SENTENCE A IS THIS.”
The .map method we call above takes a Func1 object with two generic types: its input (the previous observable’s contents), and its output (in this case, a new string that’s capitalized, formatted and wrapped in a new Observable instance, then passed on to the next method). As you can see, we’ve composed more complicated behaviour out of reusable chunks of a pipeline.

One thing about the above example: it can be much more simply expressed using java 8 lambda syntax as:

Observable.just("this", "is", "a", "sentence").map(s -> s.toUpperCase() + " ").toList().map(strings -> {
            Collections.reverse(strings);
            return strings.toString();
        });

In the subscribe method, we pass an Action1 object as an argument with the type String for its generic parameter. This defines a Subscriber’s behavior when the last item emitted from the Observable, a String, is received. This is the simplest from of the .subscribe() method (see this documentation for more elaborate signatures).

This example shows one transformation method: .map(), and one aggregation method: .toList(). This is barely the tip of the iceberg with what’s possible in manipulating the data stream (a list of all of the available stream operators, but it shows you the underlying concept: In Functional Reactive Programming, we can transform a stream of data with discrete pieces of a pipeline of transformations/data manipulation, and we can reuse those individual pieces as needed on other pipelines composed of Observables. By plugging these Observable pieces together, we can compose more complicated features, but keep them as smaller pieces of composable logic that are easy to understand and modify.

Managing Threads with The Scheduler

In the web service example, we showed how to make a web request using RxJava. We talked about transforming, aggregating and subscribing to the stream of Observables, but we didn’t talk about how the Observable stream of web requests is made asynchronous.

This falls under that category of what the FRP model calls the Scheduler—a strategy for defining which thread Observable stream events occur on, and which thread consumes the result of the emitted Observables as a Subscriber. In the web service example, we want the requests to happen in the background, and the subscription to occur on the main thread, so we define it this way:

        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        //post them on an eventbus
        .subscribe(githubUserDetails -> {
            EventBus.getDefault().post(new UserDetailsLoadedCompleteEvent(githubUserDetails));
        });

Observable.subscribeOn(Scheduler scheduler) specifies that the work done by the Observable should be done on a particular Scheduler (thread).

Observable.observeOn(Scheduler scheduler) specifies in which Scheduler (thread) the Observable should invoke the Subscribers'onNext( ), onCompleted( ) and onError( ) methods, and call the Observable’s observeOn( ) method, passing it the appropriate Scheduler.

Here are the possible Schedulers:

  • Schedulers.computation( ):
    meant for computational work such as event loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead)

  • Schedulers.from(executor): uses the specified Executor as a Scheduler

  • Schedulers.immediate( ): schedules work to begin immediately in the current thread

  • Schedulers.io( ): meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( )

  • Schedulers.newThread( ): creates a new thread for each unit of work

  • Schedulers.test( ): useful for testing purposes; supports advancing events to unit test behavior

  • Schedulers.trampoline( ): queues work to begin on the current thread after any already queued work

By setting the observeOn and subscribeOn schedulers, we define which threads we will use for the network requests (Schedulers.newThread( )).

Next Steps

We’ve covered a lot of ground in this article, but you should have a good idea of how Functional Reactive Programming works at this point. Examine the GitHub project we shared in the article and understand it, read the RxJava documentation and check out the rxjava-koans project for a test-driven approach to mastering the Functional Reactive Programming paradigm.

Want to learn even more about Java? Check out our new Introduction to Java for Android bootcamp. And if you’ve already mastered Java, consider our Android bootcamp.

Similar Posts:

Not Happy with Your Current App, or Digital Product?

Submit your event

Let's Discuss Your Project

Let's Discuss Your Project