Qu'est ce que RxJS

Rxjs (ReactiveX for Javascript) est une librairie javascript qui décrit le concept de programation asynchrone avec des courants observables. C'est à dire des sources de données capables d'émettre de manière aléatoire, continue, synchrone ou asynchrone. Avec RxJS vous pouvez étudier par exemple la position du curseur sur l'écran, le passage d'une voiture à un carrefour ou la position d'une action à la bourse. Rxjs nous offre une API pour créer ces sources, les observer et de manipuler leurs valeurs. Dans cet article je vais vous expliquer les notions de Observable, Observer, Subscription et Operator definies dans RxJS.

Le concept de Pull et Push System

Avant d'aborder les concepts de Observer Pattern, de Iterator Pattern et de la programmation fonctionnelle avec les collections qui sont au coeur de RxJS, je vais vous montrer d'abord les deux manières de communication entre un producteur de données (Producer) et un consommateur de données (Consumer).
Prenons l'exemple de la fonction suivante:

function getFive() {return 5;}

Cette fonction ne fait rien tant qu'on ne l'appelle pas. Elle (Producer) produit la valeur 5 à chaque appel. Elle ne sait même pas quand est ce qu' elle sera appelée. Seul l'appelant (Consumer) décide de l'appeler. Dans ce cas on dit qu'on est dans un Pull System. C'est à dire c'est le consumer qui décide quand recevoir les données. En plus dans le cas d'une fonction, chaque appel produit une donnée. Dans un Push System au contraire c'est le Producer qui décide d'envoyer les données au Consumer. Le Consumer se met à l'écoute et attend les données. Il ne sait pas quand celles-ci viendront et n'a aucun contrôle sur leur arrivée. C'est le cas par exemple d'un promise et d'un Observable. Un Promise return une seule valeur avec qu'on Observable peut émettre plusieurs valeurs.

var promise1 = new Promise(function(resolve, reject) {
  setTimeout(function() {
    resolve('foo');
  }, 300);
});

promise1.then(function(value) {
  console.log(value);
  // expected output: "foo"
});

Le tableau ci-dessous montre le système d'une function, d'un promise, d'un Iterator et d'un Observable

Qu'est ce qu'un Observable

Un Observable est un Push System dans lequel le Producteur produit plusieurs valeurs que le consumer qu'on appelle Observer peut recevoir en souscrivant. Un Observable nofitie des évènements à son Observer. Considérons l'Observable ci-dessous:

var myObservable = Rx.Observable.create(function (observer) {
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300);
  }, 1000);
});

Cet Observable notifie l'arrivée des données 42, 100, 200 de manière synchrone et 300 de manière asynchrone. Un Observable peut produire 3 types de notifications:

  1. next: Pour notifier l'arrivée d'une valeur
  2. complete: Pour notifier la fin
  3. error: Pour notifier une erreur

Un Observable peut notifier autant de nextqu'il veut mais seulement un seul completeou error. L'Observable ci-dessous produits 3 notifications next et 1 notification complete. En cas d'erreur , il va produit 1 notification error

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err);
  }
});

Pour recevoir les notifications de données, nous devons nous souscrire à la méthode subscribede l'Observable. Après la souscription, nous allons commencer par recevoir les valeurs émises par notre Observable.

myObservable.subscribe(function (x) {
  console.log(x);
});
1
2
3

Normalement la méthode subscribeprend en paramètre un Observer qui représente le consumer.

Qu'est ce qu'un Observer

Un Observer est un objet capable d'écouter les notifications d'un Observable. C'est un objet contenant 3 callbacks next, error et complete.

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

Pour souscrire à un Observable on écrit:

observable.subscribe(observer);

En géréral on passe un callback à la méthode subscribe comme ci-dessous:

observable.subscribe(x => console.log(x));

Ce callback représente la notification next par défaut. Vous pouvez aussi passer les 3 callbacks comme paramètre à la méthode subscribe

observable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);

Qu'est ce qu'un Operator

Un Operator est une methode qui permet de manipuler les valeurs d'un Observable et qui retourne un autre Observable sans modifier l'Observable de départ.

var inputObservable = Rx.Observable.from([1, 2, 3, 4]);
inputObservable.map(x=> 10*x).susbcribe(x=> console.log(x));

Dans le code ci-dessus, nous avons utilisé 2 operators: fromet map.
L'opérateur from permet de créer un Observable à partir d'un tableau. L'opérateur map permet d'effectuer une action sur chaque valeur de l'Observable. Dans notre example, l'opérateur map multiplie chaque valeur par 10 sans modifier l'Observable de départ.
.
RxJS définit plusieurs catégories d'operateur : creation, transformation, filtering, combination, multicasting, error handling, utility, etc.
Visitez rxjs pour plus détails sur l'ensemble des opérateur.

Qu'est ce qu'une Subscription

Une Subscriptionest un objet qui représente la resource d'exécution d'un observable. Une Subscription permet de se désinscrire d'un Observable qu'on écoute.

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();

Conclusion

RxJS est une librairie Javascript qui met à disposition une API pour la création et la gestion des Observables. Elle définit les notions de Observable, Observer, Operator et Subscription que j'espère avoir bien expliquées.