This article is the second article of a serie about Async Programming and Reactive Programming and its use in JavaScript/TypeScript. The introductory article was about Async Programming, callbacks, Promises and async/await.

In the previous article, we’ve seen several ways of writing async code in Javascript and seen some of their shortcomings. Thankfully there is one more way to write async code in an elegant way: Reactive Programming.

A different paradigm: Reactive Programming

Reactive Programming is a paradigm which enables to express the notion of data changes in terms of data streams. Those data streams can be treated separately or combined to create a unique stream. They can also be transformed using a pipeline of operations that take a value upstream and produces a new value downstream. In that sense, the Reactive Programming enables to express declaratively how to shape the data that you want given an input stream of data.

The declarative aspect of Reactive Programming is crucial in making code more readable, more understandable, more testable and ultimately more robust.

Reactive programming also has a foundation in Functional Programming, in that it favours composition of streams through “factory” operators and processing of the data through a pipeline executing pure functions (function with no side-effect).

The suite of observable constructors and pipeline operators available also help take care of the performance issue that an important rate of data can cause. We’ll see examples of some of those operators in the following code listings.

Discovering the power of the Reactive paradigm through examples

Let’s dive into Reactive Programming through examples. In the following code listings, we’ll see how to use Reactive Programming to evolve some typical old-fashion JavaScript code.

Note: old-fashion code often works fine, it is not always required to update your code to switch to the latest trends. However you’ll find that as your application gets more complex, Reactive Programming is a useful paradigm to have in your toolset.

In JavaScript, we can execute code when clicking on a button like so:

1
2
const myButton = document.getElementById('buttonId');
myButton.addEventListener('click', (event) => console.log('Button has been clicked!'));

With RXJS, we can write like this:

1
2
3
const myButton = document.getElementById('buttonId');
fromEvent(myButton, 'click')
  .subscribe((event) => console.log('Button has been clicked!'));

This doesn’t look like much of an improvement: same amount of code, not much difference in code readability. However the power of this code is hidden. Why? Because we directly subscribed to the stream and didn’t apply any transformation to it. As we add requirements, the benefits of Reactive Programming become evident.

Combining streams

Let’s start with something simple: adding a second button and performing the same action.

Non-reactive JS:

1
2
3
4
5
const myButton1 = document.getElementById('buttonId1');
const myButton2 = document.getElementById('buttonId2');
const action = (event) => console.log('Button has been clicked!');
myButton1.addEventListener('click', action);
myButton2.addEventListener('click', action);

With RXJS, we can write like this:

1
2
3
4
5
6
const myButton1 = document.getElementById('buttonId1');
const myButton2 = document.getElementById('buttonId2');
const button1Clicked$ = fromEvent(myButton1, 'click');
const button2Clicked$ = fromEvent(myButton2, 'click');
merge(button1Clicked$, button2Clicked$)
  .subscribe((event) => console.log('Button has been clicked!'));

The merge function allows to combine several streams of data into one by merging their emissions. The subscribe method takes a callback function that gets executed everytime a new value is available in the stream. By using merge, you’ll get the value from any of the source streams that emitted. This is great: we can now performed a single action whenever several streams emit. The code is cleaner: the action performed on data emission is only written once, no need to extract it into a separate function (as long as the function is simple enough of course).

Using timing operators

Now something a bit more difficult: limit the rate of execution of the action. E.g. if the user clicks repeatedly on the button, avoid executing the action more than once every second.

Non-reactive JS:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const myButton1 = document.getElementById('buttonId1');
const action = (event) => console.log(`${event.timeStamp} - Button has been clicked!`);
let timeout;

myButton1.addEventListener('click', (event) => {
  if (!timeout) {
    action(event);
    timeout = setTimeout(() => {
      clearTimeout(timeout);
      timeout = undefined;
    }, 1000);
  }
  // else don't do anything
});

With RXJS, we can write like this:

1
2
3
4
5
const myButton1 = document.getElementById('buttonId1');
const action = () => console.log('Button has been clicked!');
fromEvent(myButton1, 'click')
  .pipe(throttleTime(1000))
  .subscribe((event) => action());

In the first code listing, we couldn’t see the advantage of using Reactive Programming. In the second, it becomes clearer. Now in this one we clearly start to see the power:

  • it is short: it took a single line .pipe(throttleTime(1000)) instead of multiple lines with the non-reactive version
  • it is declarative: the intent of the line is easy to understand whereas in the non-RX version the code is mixed: the callback consists in the throttling of the action execution and the action itself
  • it is pure: there is no side effect, the pipeline only relies on its input whereas in the non-RX version we need to define a variable outside the event listener callback scope and modify it inside the callback.

The pipe method initiates the ‘pipeline’ on the source observable. All the operators used inside the pipeline will apply (ideally) pure transformation on the upstream and create a new stream as output. This output stream will be the input stream of the following operator. And so on. In the above example the pipeline consists of a single operator but in real-world scenarios, there are often several operators.

The throttleTime operator is one of the timing operators offered by RXJS: it emits the value of the input observable and starts a timer (1000 milliseconds in our example). Any value emitted by the input observable until the timer completes will not be emitted downstream. When the timer completes, the operator is ready again to emit a value. In our example, it emits the ‘leading’ value (the emitted value that started the timer) but it can be used to emit the ‘trailing’ value (when the timer completes, it emits the last value emitted during the timer).

There are other timing operators which are equally useful depending on the kind of timing operation you want to perform.

Let’s another example: a search input field that sends an http query to retrieve results based on this input. This is often used to perform auto-complete operations or to show the results of a query as the user types. Those operations are “expensive” because they involve a query to a server. Depending on the network’s bandwidth and latency and the cost of the operation on the server-side, those queries can take up to several seconds. If we were to make a query for every character typed by the user, the user experience would be bad because the user would be typing faster than the results would be queried and displayed. And while the user types at a continuous rate, we would be making a lot of useless queries and wasting server-side resources. The typical UX for this use case is to wait for the user to stop typing and after a short time (300 to 500 milliseconds), perform the query.

In the next example, let’s concentrate only on reading the user’s input. We’ll use a debounce delay of 1000 milliseconds for the sake of being consistent with the other examples.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const input = document.getElementById('myInputId');
const action = (text) => console.log(`the user typed: ${event.target.value}`);
let timeout;

input.addEventListener('input', (event) => {
  if (timeout) {
    // if there is a timeout, it means the callback has been executed less than 1 second ago
    // cancel the action to be performed
    clearTimeout(timeout);
    timeout = undefined;
  }
  // set the timer
  timeout = setTimeout(() => action(event.target.value), 1000);
});

With RXJS, we can write like this:

1
2
3
4
const input = document.getElementById('myInputId');
fromEvent(input, 'input')
  .pipe(debounceTime(1000))
  .subscribe((event) => action(event.target.value));

As with the previous listing, the reactive version is cleaner, pure (the non-RX uses a variable defined outside the callback’s scope) and more understandable and much less prone to coding errors.

The debounceTime operator is another timing operator offered by RXJS: it starts a timer (1000 milliseconds in our example) when the input observable emits a value. Any value emitted by the input observable while the timer is on will restart the timer. When the timer completes (no new value has been emitted since the value that last restarted the timer), the operator emits the last value emitted by the input observable.

We can see the difference with the throttleTime operator: with throttleTime a value is emitted at most every second (the time specified), with debounceTime a value is emitted only after there has been a second delay between two values emitted by the input observable.

When using timing operators, one has to be aware of their behavior. As we saw above,, throttleTime and debounceTime don’t behave the same. Effectively if the user types at a constant rate and doesn’t stop for a second, debounceTime will not emit anything! It is OK to use it for user input because we know the user will stop typing or clicking rather quickly (unless he is a maniac :)).

Now let’s say we want to take into account the case of the maniac :) The user types continously so we want to display intermediate results even if he doesn’t stop typing for a while. So we take the last value entered every second.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
const input = document.getElementById('myInputId');
const action = (event) => console.log(`the user typed: ${event.target.value}`);
let timeout;
let lastEvent;

input.addEventListener('input', (event) => {
  lastEvent = event;
  
  if (!timeout) {
    timeout = setTimeout(() => {
      action(lastEvent);
      clearTimeout(timeout);
      timeout = undefined;
    }, 1000);
  }
});

With RXJS, we can write like this:

1
2
3
4
const input = document.getElementById('myInputId');
fromEvent(input, 'input')
  .pipe(auditTime(1000))
  .subscribe(action);

That was easy, we just add to change one operator! Once again, the power of Reactive Programming being declarative.

The auditTime is like throttleTime but it emits the ‘trailing’ value (the last value emitted during the timer) where throttleTime emits the ‘leading’ value (the value that triggered the timer).

The fourth very useful timing operator we won’t demonstrate is sampleTime.

sampleTime is similar to auditTime in that it emits the last value during a timer. But for sampleTime the timer is an interval that starts and completes and restarts regardless of if there were values emitted, it checks on a schedule continously.

Advantages of Reactive Programming

Throughtout the previous examples, we have been able to see some of the benefits of Reactive Programming.

Declarative

In the timing examples above, we have seen one major benefit of Reactive Programming: it is declarative rather than imperative, we describe what we want to achieve rather than how we want to achieve it. This makes the code less error-prone and more readable. The pipeline operators have dedicated functionalities so just by looking at the operator name we understand what the operator will do. If you compare this with a regular callback (or the callback passed to a Promise then() method): in the regular callback, you mix all the logic: the timing management and the business logic. With RXJS operators, one operator takes care of the timing management, the next operator deals with the data: the business logic is not mixed anymore and you can concentrate on the true value added code.

Composability

In Functional Programming, composability refers to the capability of creating a complex function by composing several simpler functions. For example, if we want a mathematical function such as f(x)=2*x+3, we could write a single function:

1
const f = (x) => 2*x + 3;

or we could decompose it into simpler functions that we then combine:

1
2
3
4
5
const mult = (a) => (b) => a * b;
const add = (a) => (b) => a + b;
const multBy2 = mult(2);
const add3 = add(3);
const f = (x) => add3(multBy2(x));

In the case of a stream of data, we could do everything in the subscribe() method but it’s best to compose operators to transform the stream one operation at a time in the pipeline. In the end, the output stream is a composition of all the pipeline’s operatorations applied onto the input stream:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// not using composition
from(dataArray)
  .subscribe(x => {
    if (filteringCondition(x)) {
      const result1 = transformation1(x);
      if (filteringCondition1(result1)) {
        const result = transformation2(result1);
        // do something with result
        ...
      }
    }
  });

// using composition
from(dataArray)
  .pipe(
    filter(filteringCondition),
    map(tranformation1),
    filter(filteringCondition1),
    map(transformation2),
  )
  .subscribe(result => {
    // do something with result
  });

Referential transparency

Referential transparency is enabled by pure functions. A pure function is a function that has no externally visible side-effect: its output depends solely on its input and it doesn’t make modification on the outside world. Therefore, given an input, you can take out the function call and replace it with its output without affecting the program: this is Referential Transparency. If the function had side effect, you couldn’t do that because your program would depend on the side effect performed within the function and it would fail or misbehave if you took out the function call and replaced it with its output.

Reactive Programming focuses on applying pure functions (with no side effects) through the pipeline (to be exact: it doesn’t enforce it, you are free to do it but it’s best no to or at least to limit it as much as possible). The side effects are concentrated into the subscribe() method and the pipeline operator tap() which is dedicated to performing side effects in the pipeline (it’s not always possible to have an entirely pure pipeline, and it is also useful to debug). This makes it easier to reason about complex stream transformation steps, test and isolate side effects.

Immutability

The pipeline operators create a new stream. They apply a transformation on the upstream’s values they receive and push this new value downstream, but they don’t modify the upstream: one subscriber cannot modify the upstream data and affect another subscriber.

Big collection of operators

The Reactive libraries (RXJS in Javascript) come with a big collection of operators already implemented for you. Pretty much all your needs are already catered to:

  • streams creation from different sources such as data, DOM events, Promises: of, from, interval
  • combining streams by different strategies: merge, zip, combineLatest, withLatestFrom, forkJoin
  • filtering: filter
  • transforming data: map, pluck
  • timing operations: debounce, throttle, audit, sample, delay
  • dealing with inner streams: switchMap, mergeMap, concatMap
  • and much more.

And if you found something missing, you can easily implement your own operator: it’s just a function (from RXJS 6, previous versions of RXJS didn’t have the pipe() method and instead had a chaining styling: operators where methods on the Observable prototype, but RXJS 6 made a big refactoring and has now a more functional approach).