Streaming Redux state as an Observable with RxJS

November 25, 2017 0 Comments

Streaming Redux state as an Observable with RxJS

 

 

Redux is a great library for managing state in your applications. Combining it with React.js also gives your application a nice structure allowing you to benefit from various other tools built and supported by the community.

I also enjoy RxJS a lot. And the journey of building FrintJS has helped me embrace reactive programming even further. In this post, I will explore how we can stream the state from a Redux store using Observables.

Let’s say we have a simple Redux store, that increments and decrements a counter value.

We can start by creating our reducer first:

const INITIALSTATE = { value: 0 };
function counterReducer(state = INITIALSTATE, action) {
switch (action.type) {
case 'INCREMENTCOUNTER':
return Object.assign({}, {
value: state.value + 1
});
    case 'DECREMENTCOUNTER':
return Object.assign({}, {
value: state.value - 1
});
    default:
return state;
}
}

Now we can create a Store out of it:

import { createStore } from 'redux';
const store = createStore(counterReducer);

Since your store is ready, you can start dispatching actions to it:

store.dispatch({ type: 'INCREMENTCOUNTER' }); // 1 (+1)
store.dispatch({ type: 'INCREMENT
COUNTER' }); // 2 (+1)
store.dispatch({ type: 'DECREMENT_COUNTER' }); // 1 (-1)

You can start listening to your state changes with simple callback:

const unsubscribe = store.subscribe(function () {
const currentState = store.getState(); // { value: 1 }
});
// cancel listener when you don't need it
unsubscribe();

Listening to state changes with a simple callback can fit most applications’ needs. But if you are working with Observables already, it would make it easier for you to access the Redux state as a stream, which you can then connect with other Observables as you see fit.

But how can we convert the store to a state$ stream?

It is common convention in the community to end your variable or function name with a $ sign, if it is returning an Observable.

Let’s create a function that accepts Redux store, and returns an Observable of its state.

import { Observable } from 'rxjs/Observable';
function getState$(store) {
return new Observable(function (observer) {
// more to follow...
});
}
const state$ = getState$(store);
const subscription = state$.subscribe(function (state) { 
console.log(state);
});

We want the state$ to emit new values as the Redux store changes over time. So let’s add that logic to the function:

function getState$(store) {
return new Observable(function (observer) {
const unsubscribe = store.subscribe(function () {
observer.next(store.getState());
});

});
}

What we did above is start listening to the Redux store for changes, and whenever there is any change, we are emitting a new next event with the current state of store.

But we cannot stop just here. Irrespective of when a state change occurs (via dispatching of actions), we want our state$ subscribers to be able to receive an initial value right after their subscription:

function getState$(store) {
return new Observable(function (observer) {
observer.next(store.getState());
    const unsubscribe = store.subscribe(function () {
observer.next(store.getState());
});
});
}

Now the subscribers will get an initial value right away, and as more state changes happen, they will keep receiving the new values over time.

We just need to make one more addition to our function. We have to make sure that as soon as our Observables are unsubscribed, the store listener is also cancelled.

We can do this by returning a function, which will be treated as an unsubscribe callback:

function getState$(store) {
return new Observable(function (observer) {
observer.next(store.getState());
    const unsubscribe = store.subscribe(function () {
observer.next(store.getState());
});
    return unsubscribe;
});
}

Unsubscribing in RxJS will be done like this:

const subscription = getState$(store);
subscription.unsubscribe();

Here’s a fully working function with comments, that receives the Redux store as an argument, and returns the state as an Observable:

import { Observable } from 'rxjs/Observable';
function getState$(store) {
return new Observable(function (observer) {
// emit the current state as first value:
observer.next(store.getState());
    const unsubscribe = store.subscribe(function () {
// emit on every new state changes
observer.next(store.getState());
});
    // let's return the function that will be called
// when the Observable is unsubscribed
return unsubscribe;
});
}

While we did everything manually in this post to go through the process of creating an Observable out of Redux store, you can also just use Observable.from():

import { from } from 'rxjs/observable/from';
const state$ = from(store);
Thanks to Sebastian Sebald and Marcel Miranda for pointing it out in the comments section!

Operators in RxJS will allow to process your state further with ease.

You could get only the counter value (integer) out of your state as a stream:

import { map } from 'rxjs/operators/map';
const state$ = getState$(store);
const counter$ = state$.pipe(
map(state => state.value)
);
The pipe method is introduced since RxJS v5.5, and you can read further about lettable operators here.

You can then subscribe to counter$ only:

counter$.subscribe(n => console.log(n));

You can decide which values are emitted with filter. Let’s say, you only want to emit values, if the numbers are even:

import { map } from 'rxjs/operators/map';
import { filter } from 'rxjs/operators/filter';
const state$ = getState$(store);
const evenCounter$ = state$.pipe(
map(state => state.value),
filter(n => n % 2 === 0)
);

You can learn about more operators in their documentation here.

Redux is awesome, and has helped a big chunk of the JavaScript community to think in a functional way. RxJS is great, and is continuously evolving and helping developers embrace reactive programming with ease.

The two make a great pair. And hope you can benefit from both!

You may also want to check out redux-observable, which has a concept of “Epics”, which allows you to access actions as a stream.

This post was fully focused on state as a stream.

Find me on twitter, if you have any questions.


Tag cloud