Upcoming and OnDemand Webinars View full list

LiveDataReactiveStreams: Where RxJava meets LiveData

Bryan Lindsey

When choosing a library to make your Android application reactive, which do you choose: the trusty, ever-popular RxJava 2, or the newer, first-party LiveData? While it may be a subject of debate, the good news is that these two can work together using a tool called LiveDataReactiveStreams.

In this post, we’ll explore how LiveDataReactiveStreams works, as well why (or why not) you might want to use it to bring RxJava and LiveData together. Note that it does assume some familiarity with RxJava and LiveData.

Syntax

LiveDataReactiveStreams is a class provided as part of Google’s Jetpack components. To use it, you need to add the ReactiveStreams dependency to your project. In your build.gradle file, add the following to your dependencies block (replacing $lifecycleVersion with the latest dependency version, which is 2.0.0 as of this writing):

implementation "androidx.lifecycle:lifecycle-reactivestreams:$lifecycle_version"

In order to convert from an RxJava stream to a LiveData object, use the fromPublisher() method provided by LiveDataReactive streams, like so:

// Note, I've added explicit types for clarity. You can omit these in your code.
val myRxStream: Flowable<MyDataType> = Flowable.fromCallable{ fetchSomeDataViaNetworkCall() }

val myLiveData: LiveData<MyDataType> = LiveDataReactiveStreams.fromPublisher(myRxStream)

The fromPublisher() method takes an instance of the Publisher interface, which is implemented by RxJava Flowables, and returns a LiveData object. Also note that the type emitted by the RxJava stream (in this case, MyDataType) will be the type of the data held by the LiveData.

Importantly, what this does is it creates a LiveData object that updates its value any time the source RxJava stream emits a new item. This means that anything that observes the LiveData (such as a Fragment that updates the UI when the LiveData changes) will be notified when the RxJava stream emits.

But… why?

You might be wondering the benefit of converting like this as opposed to using your RxJava stream or your LiveData object all the way through. The answer is that doing so allows us to utilize the strengths of both in a fluid way.

In terms of strengths, RxJava is a robust toolset. There are tons of operators to allow you to manipulate your data stream in a multitude of ways, including built-in tools for specifying which threads operators execute on. This is great for complex operations on data, such as might be found in the business logic of applications.

While LiveData has a smaller set of operators, it offers the benefit of built-in lifecycle awareness. This means that it can safely and easily update UI elements even through the complex lifecycles of Activities and Fragments, which is not built in to RxJava (though there are tools like AutoDispose that can make this somewhat more automatic).

Also unlike RxJava, LiveData works with data binding out of the box.

What all of this means is that LiveData is great for pushing data to the UI but not as much for handling business logic, and RxJava is great for business logic but not as much for UI. The LiveDataReactiveStreams fromPublisher() method offers a simple way for us to push updates from our data/business-logic layers up to the UI while utilizing these strengths.

When used in this way, a cool feature of LiveDataReactiveStreams is that it automatically posts values to the main thread. If you’re using LiveData for updating UI elements, this saves you from having to manually call postValue() on a LiveData or use RxAndroid for returning to the main thread on the RxJava stream.

Finally, a key selling point of LiveDataReactiveStreams is that is a more direct way of connecting RxJava and LiveData together. Without it, you need to subscribe to your RxJava chain and post values to the LiveData in the subscriber:

// Private, mutable backing field - only update values internally
private val _myLiveData = MutableLiveData<String>()

// Observers will subscribe to this since it is immutable to them
val myLiveData: LiveData<String>
    get() = _myLiveData

// Keep a reference to the disposable, and make sure to clear it appropriately somewhere
val disposable: Disposable = Flowable.fromCallable{ methodThatReturnsString() }
        .subscribe(
            { myString -> _myLiveData.postValue(myString) },
            // Make sure to handle errors
            { error -> Log.e(TAG, "Oops, hit an error", error) }
        )

While this certainly works, calls to subscribe() feel like that you’re ending your RxJava chain and leaving the reactive world, even though you’re still trying to do reactive things with LiveData. LiveDataReactiveStreams bridges that gap so that you don’t have to leave the reactive world.

Caveats

While this tool offers simplicity in some ways, it also comes with a handful of limitations that you will want to consider. You can find a few major ones discussed below.

Error Handling

To start with, since LiveData does not have the option to add an onError handler like RxJava does, any errors that make it through LiveDataReactiveStreams from your RxJava stream will crash your application. That being said, you can use RxJava’s error handling operators in your RxJava chain to prevent this:

// Note, I've added explicit types for clarity. You can omit these in your code.
val myRxStream: Flowable<Int> = Flowable.fromCallable{ methodThatThrowsAnError() }
        .onErrorReturnItem(-1) // Use -1 as a error case

val myLiveData: LiveData<Int> = LiveDataReactiveStreams.fromPublisher(myRxStream)

Subscriptions and Lifecycles

Another major caveat lies in how the LiveData subscribes to the RxJava stream behind the scenes. When the LiveData becomes inactive because the lifecycle it is associated with moves to the DESTROYED state, then LiveData clears its subscription from the RxJava stream. It will then re-subscribe when the LiveData becomes active again. This has different implications depending on whether you are using hot vs. cold Flowables.

For cold Flowables, such as the one in the example above, the RxJava stream will start all over again when the LiveData re-subscribes. If this RxJava stream makes a network request, this means that the request will be made again when reattaching (unless you have some kind of caching mechanism in place).

For hot Flowables which can emit even with no subscribers, this has the implication that any emissions sent out after the LiveData becomes inactive will not be picked up by the LiveData.

In both cases, LiveData will still cache the last value emitted by the RxJava stream and send it to the Observer when it becomes active again, but won’t get an updated value until the stream emits another item.

Whether this behavior is problematic depends on your use case. In the end, with mindful use of RxJava features, you can still accomplish your desired flow. The key is being aware of how the lifecycle affects your subscription.

What about toPublisher()?

Astute readers may have noticed another method provided by LiveDataReactiveStreams called toPublisher(). Since we’re focusing on using LiveData to push data to the UI, fromPublisher() is sufficient to get the benefits of LiveData and RxJava together. However, I encourage you to learn more about toPublisher() if it seems like it fits your use-case.

Reactive Choices

When it comes to choosing whether to use RxJava or LiveData in your next project, keep in mind that they can work well together in the same codebase. As we’ve discussed, they have their individual strengths, and can work really well with the style of MVVM laid out by Google.

LiveDataReactiveStreams offers a simple interface that allows you to convert RxJava streams to LiveData in a direct way. However, keep in mind the slew of caveats that come with it. These caveats can bring their own complexity, so consider them when deciding if LiveDataReactiveStreams will work best for your situation.

Not Happy with Your Current App, or Digital Product?

Submit your event

Let's Discuss Your Project

Let's Discuss Your Project