Upcoming and OnDemand Webinars View full list

The RxJava Repository Pattern

Josh Skeen

What happens if a network request is made using RxJava & Retrofit, a user rotates the phone, and the request hasn’t completed yet? By default, the Observable will be recreated, the request re-requested, and the previous data lost!

In this article we’ll take a look at a solution to this common problem when using RxJava with Android. I call this the Repository pattern, which is a pragmatic way of fitting RxJava into the Android lifecycle so that the UI and data layer of your app can stay in sync with one another. We’ll solve the problem of how to cache data previously processed by an Observable and replay it when the activity that hosts it is recreated.We’ll learn it by example via a small sandbox application called Stockwatcher.

Setup

To follow along, you can clone the repository here. One small gotcha—once you open Stockwatcher in Android Studio, make sure you’ve installed the Lombok plugin. Stockwatcher uses Lombok to remove a lot of the boilerplate our plain old Java would otherwise require. The readme for Stockwatcher will guide you through how to install the Lombok plugin if you have never done it before.

Introducing Stockwatcher

Stockwatcher allows wolves of Wall Street to request current stock information for valid symbols, so that they may then make informed stock market decisions. Here, we’re using the Market On Demand Market APIs REST service as a back-end.

Here’s how it will work:

Handling Rotation

Now the central focus of the article: how will we handle a rotation, amidst a request for stock data that’s in-progress? Notice how Stockwatcher handles this common RxJava/RxJava problem with ease?

How did Stockwatcher accomplish this? To understand, we first need to take a step back and examine a couple of key classes the project includes. We’ll start by taking at look at how the dependencies have been defined for Stockwatcher. Up first, the AppModule class.

Understanding the Repository Pattern: Dagger 2 Setup

Stockwatcher wires up its dependencies via the Dagger 2 DI framework to keep things easy to manage and test. Dagger 2 introduces two unique concepts, Modules and Components. Modules define how specifically we should construct the objects in our program, and Components will define which classes make use of the injected objects.

Check out the AppModule.java class:

@Module
public class AppModule {

    private static final String STOCK_SERVICE_ENDPOINT = "http://dev.markitondemand.com/MODApis/Api/v2/";

    private final Application application;
    private final ServiceConfig serviceConfig;

    AppModule(Application application) {
        this.application = application;
        serviceConfig = new ServiceConfig(STOCK_SERVICE_ENDPOINT);
    }

    @Provides
    @Singleton
    StockDataRepository provideStockDataRepository() {
        StockService stockService = new StockService(serviceConfig);
        return new StockDataRepository(stockService);
    }
}

StockDataRepository

Notice the provideStockDataRepository method? This StockDataRepository class will wrap over our StockService class, the class which actually makes the API requests. This is a standard Retrofit service definition. Note that we mark it as a @Singleton so that it can hold onto the state of the results of each request. When we request a Repository object in our app, since we’ve annotated it as a @Singleton, we’ll get back the same object if it’s been instantiated already.

Understanding the Repository Pattern

Now that we understand how we’re using Dagger 2 to create a Repository object as a Singleton, look at what the Repository object does as it relates to Stockwatcher in the diagram below:

As you can see, StockDataRepository serves as a bridge between Service and UI layers. We will use it to manage caching Observables, and the events individual Observable instances have emitted.

Let’s dig into StockDataRepository to understand how the UI state and data are kept in sync with one another and how the caching works.

A Closer look at StockDataRepository

StockDataRepository’s job is to manage caching results from service requests made by StockService and to hand them back to the Fragment it will be used within. Fragments will request data from the repository (by subscribing to the Observables it manages), and the repository will save the Observable instances so that they can be subscribed to and played back as Android UI changes take place in the Fragment/Activity layer.

Let’s see what the Repository object contains:

public class StockDataRepository extends BaseRepository {

    private static final String CACHE_PREFIX_GET_STOCK_INFO = "stockInfo";
    private static final String CACHE_PREFIX_GET_STOCK_INFO_FOR_SYMBOL = "getStockInfoForSymbol";
    private static final String CACHE_PREFIX_GET_STOCK_SYMBOLS = "lookupStockSymbols";

    private final StockService service;

    public StockDataRepository(StockService service) {
        this.service = service;
    }

    public Observable<StockInfoForSymbol> getStockInfoForSymbol(String symbol) {
        Timber.i("method: %s, symbol: %s", CACHE_PREFIX_GET_STOCK_INFO_FOR_SYMBOL, symbol);
        Observable<StockInfoForSymbol> stockInfoForSymbolObservable = Observable.combineLatest(
                lookupStockSymbol(symbol),
                fetchStockInfoFromSymbol(symbol),
                StockInfoForSymbol::new);
        return cacheObservable(CACHE_PREFIX_GET_STOCK_INFO_FOR_SYMBOL + symbol, stockInfoForSymbolObservable);
    }

    //stock info request, which depends on the first result from lookup stock request
    private Observable<StockInfoResponse> fetchStockInfoFromSymbol(String symbol) {
        return lookupStockSymbol(symbol)
                .map(StockSymbol::getSymbol)
                .flatMap(this::getStockInfo);
    }

    //return a single symbol from the list of symbols, or an error to catch if not.
    private Observable<StockSymbol> lookupStockSymbol(String symbol) {
        return lookupStockSymbols(symbol)
                .doOnNext(stockSymbols -> {
                    if (stockSymbols.isEmpty()) {
                        throw new StockSymbolError(symbol);
                    }
                }).flatMap(Observable::fromIterable).take(1);
    }

    private Observable<List<StockSymbol>> lookupStockSymbols(String symbol) {
        Timber.i("%s, symbol: %s", CACHE_PREFIX_GET_STOCK_SYMBOLS, symbol);
        return cacheObservable(CACHE_PREFIX_GET_STOCK_SYMBOLS + symbol, service.lookupStock(symbol).cache());
    }

    private Observable<StockInfoResponse> getStockInfo(String symbol) {
        Timber.i("method: %s, symbol: %s", CACHE_PREFIX_GET_STOCK_INFO, symbol);
        Observable<StockInfoResponse> observableToCache = service
                .stockInfo(symbol).delay(3, TimeUnit.SECONDS).cache();
        return cacheObservable(CACHE_PREFIX_GET_STOCK_INFO + symbol, observableToCache);
    }

}

There are a few keys to understand what’s going on in the code above. First, notice there’s only one public method here:

getStockInfoForSymbol(String symbol)

The stockFragment will call this method to subsequently kick off 2 requests: lookupStockSymbols, and fetchStockInfoFromSymbol.

With a bit of RxJava magic, we’re able to combine the multiple requests (combineLatest),and handle the case that the user’s input (the symbol they typed) resolved to an actual stock symbol the Stock API knows about. To understand Repository’s primary concern, caching, let’s trace one of the two requests the Repository wraps:

private Observable<List<StockSymbol>> lookupStockSymbols(String symbol) {
		Timber.i("%s, symbol: %s", CACHE_PREFIX_GET_STOCK_SYMBOLS, symbol);
		return cacheObservable(CACHE_PREFIX_GET_STOCK_SYMBOLS + symbol, service.lookupStock(symbol).cache());
}

BaseRepository

Note that we are returning a call to a method called cacheObservable.cacheObservable’s definition lives in the BaseRespository class. Let’s take a look:

abstract class BaseRepository {

    private LruCache<String, Observable<?>> apiObservables = createLruCache();

    @NonNull
    private LruCache<String, Observable<?>> createLruCache() {
        return new LruCache<>(50);
    }

    @SuppressWarnings("unchecked")
    <T> Observable<T> cacheObservable(String symbol, Observable<T> observable) {
        Observable<T> cachedObservable = (Observable<T>) apiObservables.get(symbol);
        if (cachedObservable != null) {
            return cachedObservable;
        }
        cachedObservable = observable;
        updateCache(symbol, cachedObservable);
        return cachedObservable;
    }

    private <T> void updateCache(String stockSymbol, Observable<T> observable) {
        apiObservables.put(stockSymbol, observable);
    }
}

The cacheObservable method is our main interface into the functionality the StockDataRepository is responsible for: keeping an instance of an Observable in a cache and returning it when we ask for it. Instead of beginning anew with a brand new request, we’ll cache the observable in an LRUCache, and hand that back so we can update the UI with the cached observable instead.

return cacheObservable(CACHE_PREFIX_GET_STOCK_SYMBOLS + symbol, service.lookupStock(symbol).cache());

Notice that in the above excerpt from StockDataRepository there are actually two levels of caching going on? One is cacheObservable, which returns a cached observable instance from the LRUCache that was initialized in the BaseRepository. The second is the .cache() operator, which instructs that Observable instance to record and then play back events it has previously emitted. Without the .cache() operator, rotation would work correctly, but we wouldn’t actually replay any of the events that had been previously emitted in the last subscription.

Wiring it up to the UI

Up next we’ll take a look at the StockFragment itself, where the request will be triggered when the user provides the symbol they would like information for.

public class StockInfoFragment extends RxFragment {

    @Inject
    StockDataRepository stockDataRepository;
    private FragmentStockInfoBinding binding;

    @Override
    public void onCreate(@Nullable Bundle savedInstanceState) {
        StockWatcherApplication.getAppComponent(getActivity()).inject(this);
        super.onCreate(savedInstanceState);
    }

    @Nullable
    @Override
    public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
        super.onCreateView(inflater, container, savedInstanceState);
        binding = DataBindingUtil.inflate(inflater, R.layout.fragment_stock_info, container, false);
        binding.fetchDataButton.setOnClickListener(v -> {
            binding.errorMessage.setVisibility(View.GONE);
            loadRxData();
        });
        return binding.getRoot();
    }

    @Override
    public void loadRxData() {
        Observable.just(binding.tickerSymbol.getText().toString())
                .filter(symbolText -> symbolText.length() > 0)
                .singleOrError().toObservable()
                .flatMap(symbol -> stockDataRepository.getStockInfoForSymbol(symbol))
                .compose(RxUtil.applyUIDefaults(StockInfoFragment.this))
                .subscribe(this::displayStockResults, this::displayErrors);

    }
    private void displayStockResults(StockInfoForSymbol stockInfoForSymbol) {
        binding.stockValue.setText(stockInfoForSymbol.toString());
    }
}

Here, we hand the user input to the Repository object, which then makes the request when the user clicks the button. Notice that all of the requests for data occur within loadRxData()? Whenever resubscription is required, if we follow this rule, then we’ll be able to simply call loadRxData().

RxFragment

Now, we’ll look at RxFragment, the superclass for StockFragment. We will use this abstract class as a superclass any time a fragment should work with Observable data from the repository.

public abstract class RxFragment extends Fragment {

    private static final java.lang.String EXTRA_RX_REQUEST_IN_PROGRESS = "EXTRA_RX_REQUEST_IN_PROGRESS";

    @Getter @Setter //Lombok getter/setter generation
    private boolean requestInProgress;

    @Getter @Setter
    private CompositeDisposable compositeDisposable;

    public abstract void loadRxData();

    @Override
    public void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        compositeDisposable = new CompositeDisposable();
        if (savedInstanceState != null) {
            requestInProgress = savedInstanceState.getBoolean(EXTRA_RX_REQUEST_IN_PROGRESS, false);
        }
    }

    @Override
    public void onSaveInstanceState(Bundle outState) {
        super.onSaveInstanceState(outState);
        outState.putBoolean(EXTRA_RX_REQUEST_IN_PROGRESS, requestInProgress);
    }

    @Override
    public void onResume() {
        super.onResume();
        if (isRequestInProgress()) {
            loadRxData();
        }
    }

    @Override
    public void onPause() {
        super.onPause();
        compositeDisposable.clear();
    }
}

Note that we’re persisting the state of the “requestInProgress” boolean via onSaveInstanceState:

@Override
    public void onSaveInstanceState(Bundle outState) {
        super.onSaveInstanceState(outState);
        outState.putBoolean(EXTRA_RX_REQUEST_IN_PROGRESS, requestInProgress);
    }

This is the key to allowing the Repository object to play back its cached results in the case where a user rotates the device while we’re making a request with RxJava and Retrofit. If isRequestInProgress returns true, loadRxData() is called. loadRxData() will then subsequently fetch the data from the Repository cache, and will re-register to update the UI upon completion.

Understanding the RxUtil class

Now, for the last piece of the puzzle: how did isRequestInProgress on RxFragment actually get set?Take another look at StockFragment’s loadRxData() method:

@Override
public void loadRxData() {
    Observable.just(binding.tickerSymbol.getText().toString())
            .filter(symbolText -> symbolText.length() > 0)
            .singleOrError().toObservable()
            .flatMap(s -> stockDataRepository.getStockInfoForSymbol(s))
            .compose(RxUtil.applyUIDefaults(StockInfoFragment.this))
            .subscribe(this::displayStockResults, this::displayErrors);

}

Notice the line:

.compose(RxUtil.applyUIDefaults(StockInfoFragment.this))

This is what configured the behavior of setting isRequestInProgress when the subscription begins and set it to false upon completition. If you have not discovered Transformers (of the non-autobot variety) yet, they are a great way to apply a uniform set of changes to Observables in a generic way, so we’ll use them. By the way, if you’re new to Transformer and the compose operator, a good start to understanding is Dan Lew’s article on what they offer and why you will want to use them: Don’t Break the Chain.

Let’s take a look at RxUtil class it uses:

public class RxUtil {

    private static final String LOADING_MESSAGE = "Loading";

    public static <T> ObservableTransformer<T, T> applyUIDefaults(RxFragment rxFragment) {
        return upstream -> upstream
                .compose(RxUtil.addToCompositeDisposable(rxFragment))
                .compose(RxUtil.applySchedulers())
                .compose(RxUtil.applyRequestStatus(rxFragment))
                .compose(RxUtil.showLoadingDialog(rxFragment));
    }

    private static <T> ObservableTransformer<T, T> applyRequestStatus(RxFragment rxFragment) {
        return upstream -> upstream.doOnSubscribe(disposable -> rxFragment.setRequestInProgress(true))
                .doOnTerminate(() -> rxFragment.setRequestInProgress(false));
    }

    private static <T> ObservableTransformer<T, T> applySchedulers() {
        return (ObservableTransformer<T, T>) schedulersTransformer;
    }

    private static <T> ObservableTransformer<T, T> addToCompositeDisposable(RxFragment rxFragment) {
        return upstream -> upstream.doOnSubscribe(disposable -> rxFragment.getCompositeDisposable().add(disposable));
    }

    private static <T> ObservableTransformer<T, T> showLoadingDialog(RxFragment rxFragment) {
        return observable -> observable
                .doOnSubscribe(disposable -> DialogUtils.showProgressDialog(rxFragment.getFragmentManager(), LOADING_MESSAGE))
                .doOnTerminate(() -> DialogUtils.hideProgressDialog(rxFragment.getFragmentManager()));
    }
}

Triggering loadRxData()

Notice the applyRequestStatus method? We composed an RxJava transformer onto the Observable to manage the isRequestInProgress boolean depending on the lifecycle of the request’s progress. Upon subscription, any Observable with the applyRequestStatus composed on it will call setRequestInProgress(true) on the RxFragment it was passed, and upon termination (when the subscription completed and is unsubscribed) will call setRequestInProgress(false). When RxFragment is instantiated, it will use this value to determine if loadRxData() should be called again to resubscribe to the Observable.

  @Override
    public void onResume() {
        super.onResume();
        if (isRequestInProgress()) {
            loadRxData();
        }
    }

Since onResume will be called in the normal Android lifecycle for the fragment, Observable subscriptions will be resubscribed if they are required. This means rotation will be correctly supported with the Observables we created and added to the Repository cache, and will play back their events.

The RxJava Repository Pattern, Understood

If you’ve followed the example and understood the Stockwatcher codebase you have now seen an approach for allowing RxJava to work with device rotation support and data caching on Android. Now you should be free to worry less about manually dealing with the edgecases of whether a subscription has been completed or not when the fragment or activity is destroyed and recreated. By caching the Observable in the model layer and fitting the Observable subscriptions into loadRxData we have a general purpose solution that will fit Observables into the Android lifecycle.

In the next article, I will be showing a solution to another often needed yet strangely elusive pattern: how can I test the RxJava and Retrofit based service layer of my Android app, with mocked API responses? If you’d like to test the whole networking stack but provide canned responses from the server API instead, check back soon!

And, as always, please share your comments, insights, and thoughts about the RxJava Repository pattern. Submit pull requests and get in touch with your questions, code refinements or ideas!

Not Happy with Your Current App, or Digital Product?

Submit your event

Let's Discuss Your Project

Let's Discuss Your Project