L'architecture des applications Open Source The Architecture of
Open Source Applications

Amy Brown and Greg Wilson (eds.)
ISBN 978-1-257-63801-7
License / Acheter / News / Contribuer / FAQ

Chapitre 15. Riak et Erlang/OTP

Francesco Cesarini, Andy Gross, and Justin Sheehy

Riak est une base de données open source, distribuée et tolérante aux pannes qui illustre comment construire des sytèmes à forte charge en utilisant Erlang/OTP. Grâce en grande partie aux capacités de Erlang pour les systèmes massivement scalables, Riak offre des fonctionnalités peu communes aux bases de données comme la haute-disponibilité et ce avec une scabilité linéaire, à la fois en termes de capacité et de débit.

Le couple Erlang/OTP fournit une plate-forme idéale pour développer des systèmes comme Riak car il fournit nativement une communication distribuée inter-noeuds, des queues de messages, des détecteurs de panne, et une abstraction des systèmes client-server. De plus, les patterns les plus utilisés en Erlang ont été implementés en librairies, communément appelés "OTP Behaviors". Ils contiennent un code générique pour gérer la concurrence et la gestion d'erreurs, simplifiant alors la programmation concurrente et évitant ainsi au développeur de tomber dans les pièges inhérents à ce type de programmation. Les "Behaviors" sont surveillés par des superviseurs ("supervisors"), eux-mêmes des "Behaviors", regroupés au sein d'arbres de supervision. Un arbre de supervision est packagé dans une application, étant alors un bloc d'un programme Erlang.

Un système Erlang complet comme Riak est un ensemble d'applications faiblement couplées qui interagissent entre elles. Certaines applications sont écrites par le développeur, certaines sont fournies en standard dans la distribution Erlang/OTP, et d'autres peuvent être des composants open source. Elles sont chargées séquentiellement et démarrées par un script de démarrage généré à partir de la liste des applications et de leurs versions.

Ce qui diffère parmi les systêms sont les applications intégrées à la release qui sont démarrées. Dans la distribution standard de Erlang, les fichiers de démarrage démarrent les applications Kernel and StdLib (Standard Library). Suivant l'installation, l'application SASL (Systems Architecture Support Library) est également démarrée. SASL contient des outils de release et de mise à jour, ainsi que des fonctionnalités de logging. Riak se comporte ainsi, en démarrant les applications spécifiques à Riak ainsi que leurs dépendances, qui incluent Kernel, StdLib et SASL. Un build complet et prêt à l'emploi de Riak embarque ces éléments standards de la distribution Erlang/OTP et les démarre à l'unisson quand la commande riak start est invoqué en ligne de commande. Riak consiste en plusieurs applications complexes, et ce chapitre ne doit pas être considéré comme un guide complet. Mais comme une introduction à OTP où des exemples du code source de Riak sont utilisés. Les figures et exemples ont été simplifés pour les besoins de la démonstration.

15.1. Une introduction rapide à Erlang

Erlang est un langage de programmation fonctionnelle et condurrente qui compile en byte code et s'exécute dans une machine virtuelle. Les programmes consistent en des fonctions qui s'appellent les unes les autres, résultant souvent dans des effets de bord comme des transmissions de messages inter-processus, entrées/sorties ou opérations sur une base de données. Les variables en Erlang ne sont assignables qu'une fois, i.e., une fois leur valeur affectée, elles ne peuvent plus être modifiées. Le langage utilise intensivement le "pattern matching", comme dans l'exemple suivant de calcul de nombre factoriel :

-module(factorial).
-export([fac/1]).
fac(0) -> 1;
fac(N) when N>0 ->
Prev = fac(N-1),
N*Prev.

Dans cet exemple, la première clause définit le factoriel de 0, la seconde le factoriel des nombres positifs. Le corps de chaque clause est une séquence d'expressions, et la dernière expression est le résultat de cette clause. Appeler cette fonction avec un nombre négatif provoquera une runtime error, étant donné qu'aucune clause ne correspond. Ne pas gérer ce cas est un exemple de programmation "non-défensive", pratique encouragée en Erlang.

Dans le module, les fonctions sont appelées de manière habituelle; en dehors, le nom du module doit être mis en préfixe, comme factorial:fac(3). Il est possible de définir des fonctions avec le même nom mais avec un nombre d'arguments différents—appelé arité. Dans la directive d'export du module factorial, la fonction fac d'arité 1 est déclarée par fac/1.

Erlang supporte les tuples (aussi appelés "product types") et les listes. Les tuples sont entourés d'accolades, ainsi {ok,37}. Les éléments des tuples sont accédés par leur position. Les records sont un autre type de données; ils nous permettent de stocker un nombre d'éléments figé qui sont accédés et manipulés par nom. On définit un record de cette manière : -record(state, {id, msg_list=[]}). Pour en créer une instance, on utilise l'expression Var = #state{id=1}, et l'on récupère son contenu en utilisant : Var#state.id. Pour un nombre variable d'éléments, on utilise des listes définies entre crochets : {[}23,34{]}. La notation {[}X|Xs{]} indique une liste non vide avec un début ("head") X et une fin ("tail") tail Xs. Les identifiants commençant par une lettre minuscule indique des atoms, qui n'ont pas de valeur; l'élément ok dans le tuple {ok,37} est un exemple d'un atom. Les atoms sont souvent utilisés pour différencier les retours d'une fonction : comme les résultats ok, il pourrait y avoir des résultats sous la forme {error, "Error String"}.

Les processus dans les systèmes Erlang s'exécutent de manière concurrente dans un espace mémoire dédié, et communiquent entre eux par transmission de messages. Les processus peuvent être utilisés pour une grande variété d'applications, de passerelles à des bases de données, ou comme gestionnaires de protocoles, ou encore pour gérer les traces de logs d'autres processus. Quoique ces processus gère différents types de requêtes, il existe des similitudes dans la manière de traicer ces requêtes.

Comme les processus n'existent que dans la machine virtuelle (VM), une seule VM peut exécuter simultanément des millions de processus, une capacité que Riak exploite intensivement. Par exemple, chaque requête à la base de données —lectures, écritures, et suppressions— est modèlisée comme un processus dédié, approche qui n'aurait pas été possible avec la plupart des implémentations de threading au niveau OS.

Les processus sont identifiés par des identifiants de processus, appelés PIDs, mais ils peuvent également être enregistrés sous un alias; les alias ne doivent être utilisés que pour des processus "statiques" à longue durée de vie. Enregistrer un processus sous son propre alias permet aux autres processus de lui transmettre des messages sans même connaître son PID. Les processus sont créés en utilisant la fonction native —built-in function (BIF)— spawn(Module, Function, Arguments). Les BIFs sont des fonctions intégrées à la VM afin de réaliser ce qui serait impossible ou trop lent à exécuter en "pur Erlang". La BIF spawn/3 prend comme paramètres un Module, une Function et une liste d'Arguments. Cet appel retourne le PID du processus fraichement créé ("spawned") et comme effet de bord, crée un nouveau processus qui démarre en exécutant cette fonction du module avec les arguments décrits ci-dessus.

Un message Msg est envoyé à un processus de PID : Pid en utilisant Pid ! Msg. Un processus peut déterminer son propre PID en appelant la BIF self, et peut alors le transmettre aux autres processus afin qu'ils puissent communiquer avec lui. Supposons qu'un processus s'attende à recevoir des messages de la forme {ok, N} et {error, Reason}. Pour les traiter il utilise le code suivant :

receive
{ok, N} ->
N+1;
{error, _} ->
0
end

Le résultat est un nombre déterminé par la clause de pattern-matching. Quand la valeur d'une variable n'est pas nécessaire au pattern matching, le caractère "underscore" peut être utilisé comme joker, ainsi que montré ci-dessus.

La transmission de message entre les processus est asynchrone, et les messages reçus par un processus sont placés dans la mailbox du processus et dans leur ordre d'arrivée. Supposons à présent que l'expression receive ci-dessus est exécutée: si le premier élément dans la mailbox est soit {ok, N} soit {error, Reason}, le résultat correspondant sera retourné. Si le premier message dans la mailbox n'est pas sous cette forme, celui-ci sera conservé dans la mailbox et le second sera traité de la même manière. Si aucun message ne correspond, le bloc receive attendra jusqu'à la réception d'un message adéquat.

Les processus se terminent pour 2 raisons. Si il n'y a plus de code à exécuter, on dit qu'ils se terminent pour une raison normale. Si un processus rencontre une erreur au run-time, on considère qu'il se termine pour une raison anormale. Un processus se terminant n'affectera pas les autres processus à moins qu'ils ne soient liés ensemble. Les processus peuvent se lier entre eux par l'appel de la fonction BIF link(Pid) ou en appelant spawn_link(Module, Function, Arguments). Si un processus se termine, il envoie un signal EXIT aux processus qui lui sont liés. Si la cause de l'arrêt est anormale, le processus s'arrête lui-même, propageant le signal EXIT. En appelant la fonction BIF process_flag(trap_exit, true), les processus peuvent recevoir les signaux EXIT comme des messages Erlang dans leur mailbox et les traiter au lieu de se terminer.

Riak utilise les signaux EXIT pour monitorer la santé des processus réalisant un travail non-critique, initiés par les machines à état fini gérant les requêtes. Quand ces processus se terminent anormalement, le signal EXIT permet de soit ignorer l'erreur, soit de redémarrer le processus.

15.2. Squelettes de Processus

Nous avons introduit précédemment la notion que les processus suivent un pattern commun indépendamment du but pour lequel les processus sont créés. Pour démarrer, un processus doit être "spawned" et, optionnellement, avoir son alias enregistré. La première action du processus fraîchement créé est d'initialiser les données de la boucle du processus. Ces données résultent souvent des arguments fournis à la fonction BIF spawn lors de l'initialisation du processus. Ses données sont stockés dans une variable que l'on considère comme l'état du processus. L'état, souvent stocké dans un record, est passé à une fonction receive-evaluate, exécutant la boucle qui reçoit un message, le traite, met à jour l'état, et le retourne comme argument à un appel "tail-recursive". Si un des messages qu'il traite est un 'stop' message, le processus effectuera le nettoyage adéquat et se terminera.

C'est un thème récurrent des processus et ce, indépendamment de la tâche qui leur est assignée. Ayant cela en tête, regardons les différences entre les processus qui correspondent à ce pattern:

Ainsi, même si un squelette des actions génériques existent, ces actions sont complétées par celles spécifique directement liés aux tâches assignées à chaque processus. En utilisant ce squelette comme un template, les programmeurs peuvent créer des processus Erlang qui agissent en tant que serveurs, machines à état fini , gestionnaires d'évènements et superviseurs. Mais au lieu de réimplémenter ces patterns chaque fois, ceux-ci ont été développés en tant que modules et dénommés "behaviors". Ils font partie de la plate-forme middleware OTP.

15.3. Comportements OTP : "Behaviors"

La core team de développeurs de Riak est réparti sur à peu près une douzaine de zones géographiques. Sans une coordination très fine et des templates à partir desquels travailler, le résultat serait constitué de diverses implémentations client/server différentes ne gérant pas les cas spéciaux marginaux et les erreurs liées à la concurrence. Il n'y aurait probablement pas de manière uniforme de gérer les incidents client et serveur or de garantir que la réponse à une requête soit bien une réponse, et non seulement un message se conformant au protocole des messages internes.

OTP est un ensemble de librairies Erlang et de principes de design fournissant des outils prêt à l'emploi avec lequels développer des systèmes robustes. La plupart de ces patterns et librairies sont fournis sous la forme de "behaviors."

Les "behaviors" OTP adressent ces problèmes en fournissant des modules qui implémentent les patterns de concurrence les plus communs. Sous la couverture, sans que le programmeur ait à en être conscient, les modules assurent que les erreurs et les cas spéciaux soit gérés d'une manière fiable. En résulte que les "behaviors" OTP fournissent un ensemble de bloc standardisés pour le design et la construction de systèmes de qualité industriel.

15.3.1. Introduction

Les behaviors OTP sont founis comme des modules dans l'application stdlib qui fait partie de la distribution Erlang/OTP. Le code spécifique, écrit par le programmeur, est placé dans un module séparé et invoqué à travers un ensemble de fonctions de callback prédéfinies pour chacun des behaviors. Ce "callback module" contiendra tout le code spécifique requis pour réaliser la fonctionnalité désirée.

Les behaviors OTP incluent les processus "workers", qui font réellement les calculs, et les superviseurs, dont la tâche est de monitorer les workers et les autres superviseurs. Les behaviors "worker", souvent représenté schématiquement comme des cercles, incluent les serveurs, les gestionnaires d'évènements et les machines à état fini. Les superviseurs, représentés comme des rectangles, surveillent leurs enfants, à la fois des workers et des superviseurs, créant ainsi un arbre de supervision.

[OTP Riak Supervision Tree]

Figure 15.1: OTP Riak Supervision Tree

Les arbres de supervision sont regroupés dans un behavior appelé une application. Les applications OTP ne sont pas seulement les blocs fondateurs  des systèmes Erlang, mais sont aussi une manière de packager des composants réutilisables. Des systèmes de niveau industriel tel que Riak sont composés d'un ensemble d'applications faiblement couplées et potentiellement distribuées. Certaines de ces applications font partie de la distribution standard de Erlang et d'autres sont des parties réalisant spécifiquement les fonctionnalités de Riak.

Des exemples d'applications OTP incluent Corba ORB ou l'agent SNMP (Simple Network Management Protocol). Une application OTP est un composant réutilisable qui regroupent des modules avec des superviseur et des processus workers. A compter d'ici, quand le terme application sera utilisé, celui-ci signifiera une application OTP.

Les modules behavior contiennent tout le code générique pour chacun des type de behavior. Bien qu'il soit possible d'implémenter son propre module behavior, il est très rare d'avoir à le faire, étant donné que ceux fournis par la distribution Erlang/OTP vont correspondre à la plupart des patterns que vous utiliserez dans votre code. La fonctionnalité générique fournie dans un module behavior inclut les opérations suivantes :

Les données de la boucle sont une variable qui contiendra les données dont le behavior aura besoin entre les invocations. Après l'invocation, une version mise à jour des données est retournée. Ces données mises à jour, souvent mentionnées comme les nouvelles données de la boucle, seront passées comme argument de la prochaine invocation. Les données de la boucle sont souvent appelées l'état du behavior.

La fonctionnalité à inclure dans le module d'une application de type serveur générique pour remplir le behavior spécifique inclue :

15.3.2. Serveurs génériques

Les serveurs génériques qui  implémentent les behaviors client/server sont définis dans le behavior gen_server qui fait partie de l'application stdlib (Standard Library). Pour expliquer les serveurs génériques, nous utiliserons le module riak_core_node_watcher.erl de l'application riak_core. C'est un serveur qui traque et rapporte la disponibilité des sous-services et noeuds dans un cluster Riak. Les en-têtes et directives du module sont les suivantes :

-module(riak_core_node_watcher).
-behavior(gen_server).
%% API
-export([start_link/0,service_up/2,service_down/1,node_up/0,node_down/0,services/0,
services/1,nodes/1,avsn/0]).
%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2, code_change/3]).

-record(state, {status=up, services=[], peers=[], avsn=0, bcast_tref,
bcast_mod={gen_server, abcast}}).

On peut facilement reconnaître un serveur générique par l'utilisation de la directive -behavior(gen_server). Cette directive est utilisé par le compilateur pour s'assurer que toutes les fonctions de callback sont correctement exportées. Le record "state" sera utilisé pour les données de boucle du serveur.

15.3.3. Démarrage du serveur

Avec un behavior gen_server, au lieu d'utiliser les fonctions BIFs spawn et spawn_link, vous utiliserez les fonctions gen_server:start et gen_server:start_link. La différence principale entre spawn et start est la nature synchrone de leur invocation. Utiliser start au lieu de spawn rend le démarrage du processus worker plus déterministe et évite des problèmes de concurrence ("race conditions") imprévus, étant que l'invocation ne retournera pas le PID du processus worker jusqu'à ce qu'il ait été initialisé. Vous pouvez appeler cette fonction selon l'une des signatures suivantes :

gen_server:start_link(ServerName, CallbackModule, Arguments, Options)
gen_server:start_link(CallbackModule, Arguments, Options)

ServerName est un tuple sous la forme {local, Name} ou {global, Name}, précisant alors que Name est un alias global ou local du processus s'il doit être enregistré. Les noms globaux permettent aux serveurs d'être accédés de manière transparent à travers un cluster noeuds Erlang distribués. Si vous ne voulez pas pas enregistrer le processus et plutôt le référencer en utilisant son PID, vous devez omettre l'argument correspond en utilisant à la place la fonction start_link/3 ou start/3. CallbackModule est le nom du module dans lequel les fonctions de rappel spécifiques sont placées, Arguments est un "term" Erlang valide qui est passé à la fonction de rappel init/1, tandis que Options est une liste qui vous permet de spécifiquer des options de gestion de la maémoire tels que fullsweep_after et heapsize, mais également des options de tracing et debug.

Dans notre exemple, nous appelons start_link/4, enregistrant le processus avec le même nom que le module de callback, par l'utilisation de la macro ?MODULE. Cette macro est traduite par le nom du module par le pré-processeur lors de la compilation du code. Nommer votre behavior avec un alias qui est le même que le module où il est implémenté est toujours une bonne pratique. Nous ne passons pas d'arguments en fournissant simplement une liste vide. La liste d'option est également vide :

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

La différence évidente entre les fonctions start_link et start est que start_link lie le processus à son parent, le plus souvent un superviseur, tandis que start ne le fait pas. Cela mérite une indication car c'est de la responsabilité du behavior OTP behavior que se lier au superviseur. Les fonctions start sont souvent utilisées lors de tests de behavior depuis le shell, et ainsi une erreur de typage provoquant le crash du processus du shell n'affectera pas le behavior. Toutes les formes de fonctions start et start_link retournent {ok, Pid}.

Les fonctions start et start_link vont créer un nouveau processus qui appelera la fonction de callback init(Arguments) du CallbackModule, avec les Arguments fournis. La fonction init doit initialiser les LoopData du serveur et doit retourner un tuple sous la forme {ok, LoopData}. LoopData contient la première instance des données de boucle  qui seront transmises entre les fonctions de callback. Si vous voulez stocker certains des arguments que vous avez transmis à la fonction init, vous pouvez le faire dans la variable LoopData. LoopData dans le serveur surveillant les noeuds Riak est le résultat de l'appel à la fonction schedule_broadcast/1 avec un record de type state dont les champs portent les valeurs par défaut :

init([]) ->

%% Watch for node up/down events
net_kernel:monitor_nodes(true),

%% Setup ETS table to track node status
ets:new(?MODULE, [protected, named_table]),

{ok, schedule_broadcast(#state{})}.

Bien que le processus superviseur puisse appeler la fonction start_link/4, un processus différent appelle la fonction init/1 : celui qui vient juste d'être créé. Comme le but de ce serveur est de relever, enregistrer, et communiquer la disponibilité des sous-services de Riak, l'initialisation demande au runtime Erlang de le notifier de tels évènements, et consistue une table pour y stocker ces informations. Ceci nécessite d'être réaliser dès l'initialisation, étant donné que des appels au serveur échoueront si cette structure n'existe pas encore. Ne faites que le nécessaire et minimiser les opérations dans votre fonction init, puisque l'invocation d'init est effectué en synchrone ce qui empêche alors tous les autres processus de démarrer jusqu'au retour de la fonction.

15.3.4. Transmission de messages

Si vous voulez envoyer un message synchrone message à votre serveur, vous utiliserez la fonction gen_server:call/2. Les appels asynchrones sont réalisés en utilisant la fonction gen_server:cast/2. Prenons deux fonctions de l'API service de Riak; nous founirons le reste du code plus tard. Elles sont appelées par des processus clients et résultent en l'envoi d'un message synchrone au processus serveur enregistré avec le même nom que le module. Notez que la validation des données envoyées au serveur devrait être effectuée côté client. Si le client envoie des informations incorrectes, le serveur s'arrêtera.

service_up(Id, Pid) ->
gen_server:call(?MODULE, {service_up, Id, Pid}).

service_down(Id) ->
gen_server:call(?MODULE, {service_down, Id}).

Dès la réception des messages, le processus gen_server appelle la fonction de callback handle_call/3 traitant les messages dans le même ordre qu'ils ont été envoyés :

handle_call({service_up, Id, Pid}, _From, State) ->
%% Update the set of active services locally
Services = ordsets:add_element(Id, State#state.services),
S2 = State#state { services = Services },

%% Remove any existing mrefs for this service
delete_service_mref(Id),

%% Setup a monitor for the Pid representing this service
Mref = erlang:monitor(process, Pid),
erlang:put(Mref, Id),
erlang:put(Id, Mref),

%% Update our local ETS table and broadcast
S3 = local_update(S2),
{reply, ok, update_avsn(S3)};

handle_call({service_down, Id}, _From, State) ->
%% Update the set of active services locally
Services = ordsets:del_element(Id, State#state.services),
S2 = State#state { services = Services },

%% Remove any existing mrefs for this service
delete_service_mref(Id),

%% Update local ETS table and broadcast
S3 = local_update(S2),
{reply, ok, update_avsn(S3)};

Notez la valeur de retour de la fonction de callback. Le tuple contient l'atom de contrôle reply, indiquant au code générique du gen_server que le second élément du tuple (qui dans les deux cas est l'atom ok) est la réponse revnoyée au client. Le troisième élément du tuple est le nouveau State, qui, dans une nouvelle itération du serveur, est passée comme troisième argument à la fonction handle_call/3; dans les deux cas présents l'état est mis à jour pour refléter le nouvel ensemble de services disponibles. L'argument _From est un tuple contenant un message unique de référence et le PID du processus client. Le tuple en entier est utilisé dans les fonctions de la librairie dont nous ne discuterons pas dans ce chapitre. Dans la majorité des cas, vous n'en aurez pas besoin.

Le module gen_server contient des mécanismes et des gardes-fous natifs qui s'exécutent sous la surface. Si votre client envoie un message synchrone à votre serveur et que vous n'obtenez pas de réponse dans les cinq secondes, le processus exécutant la fonction call/2 est terminé. Vous pouvez surcharger cela en utilisant gen_server:call(Name, Message, Timeout)Timeout est une valeur en millisecondes ou l'atom infinity.

A l'origine, le mécanisme de timeout fut mis en place pour se prémunir des deadlocks, en s'assurant que des serveurs qui s'appelleraient accidentellement les uns les autres, soient terminés après un timeout par défaut. Le rapport de crash est alors tracé, ce qui permet de debugger et fixer cette erreur. La plupart des applications fonctionneront correctement avec un timeout de cinq secondes, mais sous de très fortes charges, vous pourriez avoir à tuner finement cette valeur, et peut-être même à utiliser infinity; ce choix est dépendant de l'application. Tout le code critique dans Erlang/OTP utilise infinity. Dans le code de Riak, selon les endroits, des valeurs différentes sont utilisés pour le timeout : infinity est fréquemment utilisé entre des composants internes couplés, tandis que le Timeout, fixé par un paramètre utilisateur, est utilisé dans des cas où l'application cliente communiquant avec Riak a spécifié que l'opération est autorisé à expirer.

Lors de l'utilisation de la fonction gen_server:call/2, d'autres gardes-fous gèrent le cas où un message est envoyé à un serveur inexistant et le cas où un serveur crashe avant d'avoir renvoyé sa réponse. Dans les 2 cas, le processus appelant s'arrêtera. En Erlang "pur", envoyer un message qui n'est jamais traité (pattern-matching) dans la clause receive est un bug qui peut provoquer une fuite mémoire. Deux stratégies distinctes sont utilisées par Riak pour pallier à cela, chacune  emploie les clauses de matching "catchall". Aux endroits où le message peut être envoyé par un utilisateur, un message non reconnu pourra être silencieusement oublié. Là où un tel message ne peut venir que des composants internes à Riak, ce cas est considéré comme bug, ce qui déclenchera alors un rapport de crash interne pour alerter de l'erreur, et un redémarrage du processus worker qui l'avait reçu.

Transmettre des messages asynchrones fonctionne de la même manière. Les messages sont envoyés de manière asynchrone au serveur générique et gérés par la fonction de callback handle_cast/2. La fonction doit retourner un tuple de la forme {reply, NewState}. Les appels asynchrones sont utilisés lorsque la réponse du serveur n'est pas nécessaire et qu'il n'y a que peu de risque à produire plus de messages que le serveur ne peut consommer. Dans les cas où la réponse n'est pas nécessaire mais que nous souhaitons attendre jusqu'à ce que le message ait été géré avant de retransmettre une nouvelle requête, on utilisera alors la fonction gen_server:call/2, qui renvoie l'atom ok en retour. Imaginez un processus qui génère des entrées de base de données à un taux plus élevé que celui que Riak peut consommer. En utilisant des appels asynchrones, nous risquons de remplir la mailbox du processus et donc de consommer toute la mémoire du noeud Erlang. Afin de réguler la charge, Riak utilise les propriétés de sérialisation de messages des appels synchrones des gen_server, qui ne traitent la prochaine requête que lorsque la précédente a été complètement traitée. Cette approche évite d'ajouter un code de gestion de congestion : an supplément pour pour permettre la concurrence, les processus gen_server peuvent être utiliser pour introduire des points de sérialisation.

15.3.5. Arrêt du serveur

Comment arrêter le serveur ? Dans les fonctions de callback handle_call/3 et handle_cast/2, au lieu de retourner {reply, Reply, NewState} ou {noreply, NewState}, vous pouvez retourner respectivement {stop, Reason, Reply, NewState} ou {stop, Reason, NewState}. Quelque chose doit déclencher cette valeur de retour, souvent un message d'arrêt envoyé au serveur. Dès la réception du tuple de stop contenant Reason et State, le code générique exécute la fonctiond de callback terminate(Reason, State).

La fonction terminate est le meilleur endroit où insérer du code de nettoyage du State du server et toute autre données persistantes utilisées par le système. Dans notre exemple, nous envoyons un dernier message à nos pairs afin qu'ils sachent que ce moniteur de noeuds est désormais arrêté. Dans cet exemple, la variable State contient un record avec les champs status et peers :

terminate(_Reason, State) ->
%% Let our peers know that we are shutting down
broadcast(State#state.peers, State#state { status = down }).

L'utilisation des fonctions de callback d'un behavior comme des fonctions de la librairie et les invoquer depuis d'autres endroits du programme est une très mauvaise pratique. Par exemple,  vous ne devez jamais appeler riak_core_node_watcher:init(Args) depuis un autre module afin de récupérer l'état initial des données de boucle. Ceci doit être effectué via un appel synchrone au serveur. Des appels aux fonction de callback ne doivent provenir que des modules de la librairie du behavior, appels résultant d'un évènement dans le système, et jamais directement de l'utilisateur.

15.4. D'autres "Worker Behavior"

Un large nombre d'autres "worker behavior" peuvent être et ont été implémentés avec les mêmes idées.

15.4.1. Les machines à état fini

Les machines à état fini (FSMs), implémentés dans le module behavior gen_fsm, sont un composant critique lors de l'implémentation de piles de protocoles dans les systèmes de télécommunications (domaine pour lequel Erlang a été inventé). Les états sont définis comme des fonctions de callback nommées d'après l'état, et qui retournent un tuple contenant le prochain State et les données de boucle mises à jour. Vous pouvez envoyer des évènement à ces états de manière synchrone ou asynchrone. Le callback module d'une machine à état fini doit également exporter les fonctions standards de callback que sont init, terminate et handle_info.

Evidemment, les machines à états finis ne sont pas spécifiques aux télécoms. Dans Riak, elles sont utilisées dans les gestionnaires de requêtes. Lorsqu'un client envoie une requête comme get, put, or delete, le processus écoutant cette requête va créer un nouveau processus implémentant le behavior gen_fsm correspondant. Par exemple, riak_kv_get_fsm est responsable de la gestion d'une requête get, en récupérant les données et en les renvoyant vers le processus client. Le processus FSM va passer à travers diférents états selon qu'il détermine auprès de quels noeuds demander les données, qu'il envoie les messages à ces noeuds, et qu'il reçoit les données, des erreurs, ou des timeouts en réponse.

15.4.2. Les gestionnaires d'évènements

Les gestionnaires d'évènements sont un autre behavior implémenté dans le module gen_event. L'idée est de créer un point central qui reçoit les évènements d'un certain type. L'envoi des évènements peut être synchrone ou asynchrone et un ensemble d'actionsprédéfini est appliqué lors de leur réception . A la réception d'un évènement, on peut, par exemple, le tracer dans un fichier, envoyer une alarme sous forme d'un SMS, ou le consolider en stastistiques. Chacune de ces actions est définie dans un module de callback distinct avec ses propres données de boucle, conservées entre les appels. Des Handlers peuvent être ajoutés, supprimés, ou mis à jour pour chaque gestionnaire d'évènements. Ainsi, en pratique, pour chaque gestionnaire d'évènements il peut y avoir plusieurs modules de callback, et différentes instances de ces modules peuvent exister dans différents gestionnaires. Les Handlers incluent les processus recevant les alarmes, les traces de données en live, les évènements liés aux évènement ou de simples logs.

Dans Riak, une des utilisations du behavior gen_event est la gestion des souscriptions aux "ring events", i.e., les changements d'adhésion des noeuds ou l'assignation de partition dans un cluster Riak. Les processus sur un noeud Riak peuvent enregistrer une fonction dans une instance duriak_core_ring_events, qui implémente le behavior gen_event. Quand le processus central gérant le ring de ce noeud change le record d'adhésion pour le cluster tout entier, cela déclenche un évènement qui fait que chaque module de callback appelera la fonction enregistrée. De cette manière, il est plus simple pour les différentes parties de Riak de répondre à des changements dans l'une des structures de données centrales de Riak sans avoir à ajouter une complexité au gestionnaire central de cette structure.

Les patterns les plus communs de concurrence et de communication sont gérés par l'un de ces 3 behavior primaires que nous venons de présenter : gen_server, gen_fsm, and gen_event. Cependant, dans de grands systèmes, il est possible que des patterns spécifiques à l'application apparaissent au fur et à mesure du temps et nécessite la création de nouveaux behaviors. Riak inclue un tel behavior, riak_core_vnode, qui formalise la manière d'implémenter un noeud virtuel. Les noeuds virtuels sont une abstraction de stockage primaire dans Riak, exposant une interface uniforme du stockage clé-valeur aux FSMs correspondants. L'interface du module de callback est spécifiée par l'utilisation de la fonction behavior_info/1,  de cette manière :

behavior_info(callbacks) ->
[{init,1},
{handle_command,3},
{handoff_starting,2},
{handoff_cancelled,1},
{handoff_finished,2},
{handle_handoff_command,3},
{handle_handoff_data,2},
{encode_handoff_item,2},
{is_empty,1},
{terminate,2},
{delete,1}];

Cet exemple montre la fonction behavior_info/1 du riak_core_vnode. La liste de tuples {CallbackFunction, Arity} définit le contrat que le module de callback doit respecter. Des implémentations concrètes de noeud virtuel doivent exporter ces fonctions, ou le compilateur émettra un avertissement. Implémenter vos propres behavior OTP est relativement relatively simple. En plus de définir vos fonctions de callback, en utilisant les modules proc_lib et sys modules, vous devez les démarrer avec des fonctions particulilères, gérer les messages du système et monitorer le parent du processus dans le cas où il se terminerait.

15.5. Les Superviseurs

La tâche du behavior superviseur est de monitorer ses processus enfants et, selon des règles préconfigurées, d'effectuer les actions nécessaires lorsque ceux-ci se terminent. Les enfants peuvent être à la fois d'autres supeviseurs et processus worker. Ceci permet au code de Riak de se concentrer sur le cas correct, qui permet au superviseur de gérer les bugs logiciels, les données corrompues ou les erreurs du système d'une manière cohérente à tout le système. Dans le monde Erlang, cette approche de programmation non défensive est souvent mentionnée comme la stratégie "let it crash". Les processus enfants qui constitue l'arbre de supervision peuvent inclure à la fois des superviseurs et des processus workers. Les processus Worker sont des behaviors OTP incluant gen_fsm, gen_server et gen_event. L'équipe de Riak, n'ayant pas à gérer les cas à la marge, peut travailler avec une base de code plus petite. Cette base de code, de par son utilisation des behaviors, est plus simple appréhender, étant donné qu'elle ne traite que de code spécifique. Riak possède un superviseur de haut niveau comme dans la plupart des applications Erlang, et a également des superviseurs de plus bas niveau pour gérer des groupes de processus par fonctionnalité, comme par exemple, les noeuds virtuels de Riak, les socket listenersTCP, et les gestionnaires de requête-réponse.

15.5.1. Les fonctions de callback des Superviseurs

Pour démontrer comment le behavior superviseur est implémenté, nous utiliserons le module riak_core_sup.erl. Ce core superviseur est le superviseur de plus haut niveau de l'application core de Riak. Il démarre un ensemble statique de workers et superviseurs, ainsi qu'un nombre dynamique de workers gérant les liaisons HTTP et HTTPS de l'API RESTful du noeud, nombre défini dans des fichiers configuration spécifiques à l'application. De la même manière que pour les gen_servers, tous les modules de callback de superviseur doivent inclure la directive  -behavior(supervisor). . Ils sont démarrés par l'utilisation des fonctions start ou start_link qui prennent en argument optionnel le ServerName, le CallBackModule, et un Argument qui sera passé à la fonction de callback init/1.

En regardant les premières lignes de code du module riak_core_sup.erl, au delà de la directive behavior et d'une macro que l'on décrira plus tard, on peut apercevoir la fonction start_link/3 :

-module(riak_core_sup).
-behavior(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

Démarrer un superviseur va délencher la création ("spawning") d'un nouveau processus, et l'appel de la fonction de callback init/1 du module de callback riak_core_sup.erl. Le ServerName est un tuple sous la forme {local, Name} ou {global, Name}, où Name est le nom enregistré du superviseur. Dans notre exemple, à la fois le nom enregistré et le module sont l'atom riak_core_sup, provenant de la macro ?MODULE. Nous passons une liste vide comme argument de init/1, qui traitera comme une valeur nulle. La fonction init est la seule fonction de callback d'un superviseur. Elle doit retourner un tuple au format :

{ok,  {SupervisorSpecification, ChildSpecificationList}}

SupervisorSpecification est un 3-tuple {RestartStrategy, AllowedRestarts, MaxSeconds} contenant les informations sur la manière de gérer les crashes de processus et leur redémarrages. RestartStrategy est une des trois options de configurations indiquant comment les processus "frères" seront affectés en cas d'arrêt anormal :

AllowedRestarts précise combien de fois les enfants du superviseur peuvent se terminer dans l'intervalle MaxSeconds avant que le superviseur ne s'arrête lui-même (et ses enfants). Lorsq'un processus se termine, il envoie un signal EXIT à son superviseur qui, selon la stratégie de redémarrage, gère l'arrêt en conséquence. Le superviseur se termine après avoir attient le nombre de maximum de redémarrages autorisés s'assure que les redémarrages cycliques et d'autres problèmes non résolus à son niveau sont remontés au niveau supérieur. Il y a des chances que le problème se situe dans un processus d'un autre sous-arbre, ce qui permet alors au superviseur du niveau supérieur d'arrêter le sous-arbre de supervision et de le redémarrer.

En examinant la dernière ligne de la fonction de callback init/1 du module riak_core_sup.erl, on peut remarquer que ce supervisuer particulier a une stratégie de one-for-one, signifiant que les processus sont indépendants les uns des autres. Le superviseur acceptera un maximum de dix redémarrages afin de se redémarrer lui-même.

ChildSpecificationList spécifie quels enfants le superviseur doit démarrer et surveiller, ainsi que les informations sur la manière de l'arrêter et de le redémarrer. Elle est représentée par une liste de tuples sous la forme suivante :

{Id, {Module, Function, Arguments}, Restart, Shutdown, Type, ModuleList}

Id est un identifiant unique de ce superviseur. Le tuple Module, Function, et Arguments représente une fonction exportée qui déclenche l'appel de la fonction start_link du behavior, qui retournera un tuple sous la forme {ok, Pid}. La stratégie Restart décrit ce qui doit arriver selon le type d'arrêt du processus, qui peut être :

Shutdown est une valeur en millisecondes indiquant le temps au bout duquel le behavior est autorisé à exécuter la fonction terminate lorsqu'il s'arrête en raison d'un redémarrage ou d'une extinction. L'atom infinity peut également être utilisé, mais pour les behaviors autres que les superviseurs, il est fortement déconseillé. Type est soit l'atom worker, en référence aux serveurs génériques, gestionnaires d'évènement et machines à état fini, soit l'atom supervisor. Associé avec ModuleList, une liste de modules implémentant le behavior, ils sont utilisé pour contrôler et suspendre les processus durant les procédures de mise à jour à chaud du logiciel. Seuls les behavior existant ou implémentés par l'utilisateur peuvent faire partie de cette liste et ainsi être inclus dans un arbre de supervision.

A présent, nous devrions être capable de décrire une stratégie de redémarrage définissant les dépendances inter-processus, les seuils de tolérance aux parnnes et les procédures d'escalade sur la base d'une architecture commune. Nous devrions également être capable de comprendre ce qui se passe dans la fonction init/1 de l'exemple du module riak_core_sup.erl. Premièrement, étudions la macro CHILD. Elle crée une spécification pour un seul processus fils, en utilisant le nom du module de callback comme Id, le rendant permanent et indiquant un temps d'arrêt de 5 secondes. Les fils  peuvent être des workers ou des superviseurs. Regardons l'exemple, et voyons ce que nous pouvons en extraire :

-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

init([]) ->
RiakWebs = case lists:flatten(riak_core_web:bindings(http),
riak_core_web:bindings(https)) of
[] ->
%% check for old settings, in case app.config
%% was not updated
riak_core_web:old_binding();
Binding ->
Binding
end,

Children =
[?CHILD(riak_core_vnode_sup, supervisor),
?CHILD(riak_core_handoff_manager, worker),
?CHILD(riak_core_handoff_listener, worker),
?CHILD(riak_core_ring_events, worker),
?CHILD(riak_core_ring_manager, worker),
?CHILD(riak_core_node_watcher_events, worker),
?CHILD(riak_core_node_watcher, worker),
?CHILD(riak_core_gossip, worker) |
RiakWebs
],
{ok, {{one_for_one, 10, 10}, Children}}.

La plupart des Children démarrés par ce superviseur sont des workers définis statiquement (ou dans le cas du vnode_sup, un superviseur). L'exception est la partie RiakWebs, dont la définition est dynamique et dépendante de la partie HTTP du fichier de configuration de Riak.

A l'exception des applications de type librairie, toutes les applications OTP, celles de Riak y comprises, auront leur propre arbre de supervision. Dans Riak, plusieurs applications de haut-niveau sont exécutés dans un noeud Erlang, telles que riak_core pour les algorithmes de systèmes distribués riak_kv pour la sémantique de stockage clé/valeur, webmachine pour l'HTTP, et bien d'autres. Nous avons vu l'arbre étendu sous riak_core pour présenter la supervision à plusieurs niveaux qui est opérée par Riak. Une des principaux bénéfices de cette structure est qu'un sous-système peut s'interrompre  (à cause d'un bug, d'un problème environnemental, ou d'une action intentionnelle) et alors, seul ce sous-arbre sera arrêté, dans un premier temps.

Le superviseur redémarrera les processus nécessaire et le système global n'en sera pas affecté. En pratique nous avons constaté que ce fonctionnement  est correct pour Riak. Un utilisateur pourrait trouver comment interrompre un noeud virtuel, mais celui-ci sera redémarré  par le superviseur riak_core_vnode_sup. S'ils réussissent à interrompre ce dernier, le superviseur riak_core le redémarrera, propageant l'arrêt au superviseur de plus haut niveau. Cette isolation des incidents et le mécanisme de reprise permettent aux développeurs de Riak (et Erlang) de construire assez simplement des systèmes résilients.

La valeur de ce modèle de supervision a été démontré lorsqu'un utilisateur industriel créa un environnement extrême pour déterminer où échoueraient plusieurs systèmes de bases de données . Cet environnement générait des pics de charge gigantesques et des conditions favorables aux incidents. Ils furent confus de constater que Riak ne s'arrêterait simplement pas, même sous les pires conditions. Plus précisément, bien sûr, ils étaient en capacité d'interrompre les processus ou les sous-systèmes individuellement et de différentes manières—mais les superviseurs nettoyaient et redémarraient le nécessaire pour remettre le système global en état de fonctionnement.

15.5.2. Applications

Le behavior application que nous introduit plus tôt est utilisé pour empaqueter des modules ou ressources Erlang en composants réutilisables. Dans OTP, il existe deux genres d'applications. La forme la plus commune, appelée applications normales, démarre un arbre de supervision et tout les processus workers statiques nécessaires. Les applications "Library" comme la "Standard Library", intégrée à la distribution Erlang, contient des modules "library" mais ne démarrant pas d'arbre de supervision. Cela ne veut pas dire que le code ne contient pas de processus ou d'arbres de supervision. Cela signifie juste qu'ils seront démarrés comme une partie de l'arbre appartenant à une autre application.

Un système Erlang consistera en un ensemble d'applications faiblement couplées. Certaines sont écrites par les développeurs, d'autres sont mises à disposition en opensource, et d'autres font partie intégrante de la distribution Erlang/OTP. Le système d'exécution Erlang et ses outils traitent toutes les applications de manière similaire, indépendamment du fait qu'elles fassent partie ou non de la distribution Erlang.

15.6. Replication and Communication in Riak

Riak was designed for extreme reliability and availability at a massive scale, and was inspired by Amazon's Dynamo storage system [DHJ+07]. Dynamo and Riak's architectures combine aspects of both Distributed Hash Tables (DHTs) and traditional databases. Two key techniques that both Riak and Dynamo use are consistent hashing for replica placement and a gossip protocol for sharing common state.

Consistent hashing requires that all nodes in the system know about each other, and know what partitions each node owns. This assignment data could be maintained in a centrally managed configuration file, but in large configurations, this becomes extremely difficult. Another alternative is to use a central configuration server, but this introduces a single point of failure in the system. Instead, Riak uses a gossip protocol to propagate cluster membership and partition ownership data throughout the system.

Gossip protocols, also called epidemic protocols, work exactly as they sound. When a node in the system wishes to change a piece of shared data, it makes the change to its local copy of the data and gossips the updated data to a random peer. Upon receiving an update, a node merges the received changes with its local state and gossips again to another random peer.

When a Riak cluster is started, all nodes must be configured with the same partition count. The consistent hashing ring is then divided by the partition count and each interval is stored locally as a {HashRange, Owner} pair. The first node in a cluster simply claims all the partitions. When a new node joins the cluster, it contacts an existing node for its list of {HashRange, Owner} pairs. It then claims (partition count)/(number of nodes) pairs, updating its local state to reflect its new ownership. The updated ownership information is then gossiped to a peer. This updated state then spread throughout the entire cluster using the above algorithm.

By using a gossip protocol, Riak avoids introducing a single point of failure in the form of a centralized configuration server, relieving system operators from having to maintain critical cluster configuration data. Any node can then use the gossiped partition assignment data in the system to route requests. When used together, the gossip protocol and consistent hashing enable Riak to function as a truly decentralized system, which has important consequences for deploying and operating large-scale systems.

15.7. Conclusions and Lessons Learned

Most programmers believe that smaller and simpler codebases are not only easier to maintain, they often have fewer bugs. By using Erlang's basic distribution primitives for communication in a cluster, Riak can start out with a fundamentally sound asynchronous messaging layer and build its own protocols without having to worry about that underlying implementation. As Riak grew into a mature system, some aspects of its networked communication moved away from use of Erlang's built-in distribution (and toward direct manipulation of TCP sockets) while others remained a good fit for the included primitives. By starting out with Erlang's native message passing for everything, the Riak team was able to build out the whole system very quickly. These primitives are clean and clear enough that it was still easy later to replace the few places where they turned out to not be the best fit in production.

Also, due to the nature of Erlang messaging and the lightweight core of the Erlang VM, a user can just as easily run 12 nodes on 1 machine or 12 nodes on 12 machines. This makes development and testing much easier when compared to more heavyweight messaging and clustering mechanisms. This has been especially valuable due to Riak's fundamentally distributed nature. Historically, most distributed systems are very difficult to operate in a "development mode" on a single developer's laptop. As a result, developers often end up testing their code in an environment that is a subset of their full system, with very different behavior. Since a many-node Riak cluster can be trivially run on a single laptop without excessive resource consumption or tricky configuration, the development process can more easily produce code that is ready for production deployment.

The use of Erlang/OTP supervisors makes Riak much more resilient in the face of subcomponent crashes. Riak takes this further; inspired by such behaviors, a Riak cluster is also able to easily keep functioning even when whole nodes crash and disappear from the system. This can lead to a sometimes-surprising level of resilience. One example of this was when a large enterprise was stress-testing various databases and intentionally crashing them to observe their edge conditions. When they got to Riak, they became confused. Each time they would find a way (through OS-level manipulation, bad IPC, etc) to crash a subsystem of Riak, they would see a very brief dip in performance and then the system returned to normal behavior. This is a direct result of a thoughtful "let it crash" approach. Riak was cleanly restarting each of these subsystems on demand, and the overall system simply continued to function. That experience shows exactly the sort of resilience enabled by Erlang/OTP's approach to building programs.

15.7.1. Acknowledgments

This chapter is based on Francesco Cesarini and Simon Thompson's 2009 lecture notes from the central European Functional Programming School held in Budapest and Komárno. Major contributions were made by Simon Thompson of the University of Kent in Canterbury, UK. A special thank you goes to all of the reviewers, who at different stages in the writing of this chapter provided valuable feedback.