Redis Stream : le Pub/Sub

Partie 1 : principes généraux et Pub/Sub

Introduction

Fin 2018, Redis version 5 sortait et on découvrait alors une nouvelle structure de données : les streams (flux).

Conformément à la philosophie de Redis, ce nouveau type est versatile et se plie à une gamme de problèmes variée. Ainsi vais-je présenter dans cette courte série de billets les streams sous trois facettes : en tant que Pub/Sub (ce billet), en tant que queue de tâches distribuée (messaging queue) et enfin en tant que tampon d’écriture devant une base de données (write buffer).

Avoir une connaissance préalable des patterns classiques d’un système distribué (Pub/Sub et Messaging Queues) n’est pas requis. Les concepts sont introduits au fil de la rédaction (de façon très sommaire, cependant).

Principes généraux des streams

Un stream est une séquence ordonnée d’éléments avec une particularité notable : chaque nouvel élément du stream doit toujours être apposé au bout de la séquence. Il n’est ainsi pas possible d’en ajouter un au début ou au milieu du stream. C’est une structure append-only.

Les éléments du stream sont des structures associatives clé-valeur simples (sans imbrication).

Redis ordonne les éléments d’un stream grâce à un ID unique assigné pour chacun d’entre eux. Cet ID ne peut que croître au fur et à mesure que l’on ajoute des éléments, même s’il n’y a aucun impératif quant à l’intervalle entre deux ID subséquents. Dans la pratique, on laisse Redis fixer automatiquement l’ID d’un nouvel élément. Il prend alors le timestamp Unix du moment, exprimé en millisecondes.

Ajoutons l’élément {"message": "Coucou", "auteur": "sgble"} dans le stream dénommé mon_stream. L’ID spécial * indique à Redis de choisir lui-même un ID, comme évoqué précédemment.

Exemple (CLI Redis) :

127.0.0.1:6379> XADD mon_stream * message Coucou auteur sgble
"1615046454218-0"

127.0.0.1:6379> XADD mon_stream * message "Coucou 2" auteur sgble
"1615046464715-0"

Redis répond avec l’ID qu’il a choisi, soit par défaut le timestamp en millisecondes au moment de l’insertion. Mais on note que les ID ont un suffixe de la forme -0. Ce suffixe permet de gérer le cas des multiples insertions qui auraient lieu sur la même milliseconde (ce qui est bien possible compte tenu la vélocité de Redis !).

Avec XRANGE, on lit un intervalle entier entre deux ID dans un stream. La structure de données est optimisée pour être interrogée de cette façon1 :

127.0.0.1:6379> XRANGE mon_stream 0 +  # intervalle des ID 0 à +
1) 1) "1615046454218-0"
   2) 1) "message"
      2) "Coucou"
      3) "auteur"
      4) "sgble"
2) 1) "1615046464715-0"
   2) 1) "message"
      2) "Coucou 2"
      3) "auteur"
      4) "sgble"

Le + est un ID spécial qui désigne le dernier élément du stream. Dans l’exemple ci-dessus, il est donc question d’obtenir tous les éléments du stream, soit de 0 (ID minimal possible) jusqu’à +.

Pour ne récupérer qu’un seul élément, on indique deux bornes identiques et égales à l’ID ciblé, par exemple :

127.0.0.1:6379> XRANGE mon_stream 1615046464715-0 1615046464715-0
1) 1) "1615046464715-0"
   2) 1) "message"
      2) "Coucou 2"
      3) "auteur"
      4) "sgble"

À première vue, les streams sont une sorte de simple structure pour organiser un journal d’évènements. Nous pourrions nous en tenir là et déjà imaginer quelques cas d’utilisation assimilés à de la journalisation partagée entre plusieurs systèmes, comme l’implémentation d’un système de messagerie ou d’événements, par exemple.

Mais allons plus loin encore et présentons d’autres commandes qui nous permettront de faire bien plus.

Pub/Sub : un système de surveillance

Esquissons une solution pour le problème suivant : un restaurant possède plusieurs chambres et réfrigérateurs dans lesquels des denrées doivent être entreposées dans un certain intervalle de température pour des raisons de qualité et d’hygiène. Chaque emplacement dispose d’un capteur tout-en-un qui peut transmettre à intervalle régulier diverses informations comme la température et le taux d’humidité dans un stream de Redis appelé room_temp.

Pour chaque température rapportée, on veut exécuter certaines actions :

  • vérifier que l’on est dans l’intervalle prévu sur la température et l’humidité ;
  • à défaut, prévenir immédiatement les responsables par email ;
  • enregistrer chaque mesure de façon durable dans une base de données SQL qui sert à tracer des graphiques plus tard.

Implémentons notre solution avec un modèle Pub/Sub (pour publishers et subscribers, soit éditeurs et abonnés en français). Les systèmes équipés de capteurs qui renseignent leur température dans Redis sont les éditeurs, tandis que les programmes chargés d’effectuer les diverses actions qui suivent sont les abonnés. Redis agit en qualité de broker (courtier) en mettant en lien les éditeurs et les abonnées.

L’architecture illustrée :

Pub/Sub Redis : à gauche, les capteurs (Publishers) émettent en continu leur relevé ; à droite, deux programmes (Subscribers) sont chargés respectivement d'enregistrer les données en BDD et de prévenir les responsables en cas d'anomalie
Pub/Sub Redis : à gauche, les capteurs (Publishers) émettent en continu leur relevé ; à droite, deux programmes (Subscribers) sont chargés respectivement d’enregistrer les données en BDD et de prévenir les responsables en cas d’anomalie

Du côté d’un publisher, une commande XADD ajoute un élément pour rapporter les mesures. Voici un script en Python qui simule les mesures prises toute les 3 secondes en fixant aléatoirement la température et l’humidité pour l’exemple :

import time
import random
import redis

ROOM_ID = 'room-1'  # À changer pour chaque instance du script
STREAM = 'room_state'

r = redis.Redis()

while True:
    temp = random.normalvariate(mu=5, sigma=0.5)
    humid = random.normalvariate(mu=30, sigma=2)
    
    # XADD
    r.xadd(STREAM, {
        'room_id': ROOM_ID,
        'temp': temp,
        'humid': humid
    })
    time.sleep(3)

Afin de suivre les exemples en temps réel, on peut d’ores et déjà laisser une ou plusieurs instances de ce programme tourner en fond. Pour mieux suivre, on veillera à attribuer un ROOM_ID unique à chaque instance.

Enfin, du côté des subscribers, c’est la commande XREAD qui permet de se tenir informé en continu de ce qui survient, elle diffère quelque peu de la commande XRANGE vue précédemment. XREAD permet d’écouter en continu un stream (optionnellement de façon bloquante) en indiquant un ID de début.

L’ID spécial $ désigne le dernier élément exclu, ce qui revient à indiquer que seuls les prochains éléments à survenir après l’instant où l’on commence l’écoute nous intéressent. Il est probable de commencer par écouter à partir de l’ID $ s’il n’existe de dernier ID connu au préalable. Mais autrement, on fixe généralement cet ID avec celui du dernier élément vu, comme nous le voyons !

import time
import redis

STREAM = 'room_state'

r = redis.Redis()
last_timestamp = '$'  # Pas d'ID préalable connu, on commence par $

while True:
    # Ici notre XREAD
    events = r.xread({STREAM: last_timestamp}, block=0)
    
    for ev in events:
        _, data = ev
        timestamp, info = data[0]
        room_id = info[b'room_id']
        temp = float(info[b'temp'])
        humid = float(info[b'humid'])

        if  not 4 <= temp <= 6:
            print(f'ALERT TEMP! {room_id}: {temp}%')

        if not 24 <= humid <= 36:
            print(f'ALERT HUMID! {room_id}: {humid}%')

        last_timestamp = timestamp  # On refixe l'ID à partir duquel on veut écouter

Et bien-sûr, n’oublions pas un autre programme qui lira lui aussi les données de façon concomitante :

import time
import redis

STREAM = 'room_state'

r = redis.Redis()
last_timestamp = '$'

sql = 'INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid);'

while True:
    # Ici notre XREAD
    events = r.xread({STREAM: last_timestamp}, block=0)
    
    for ev in events:
        _, data = ev
        timestamp, info = data[0]
        room_id = info[b'room_id']
        temp = float(info[b'temp'])
        humid = float(info[b'humid'])
        # Simulation d'une exécution SQL
        print('Req:', sql, [timestamp, room_id, temp, humid])
        last_timestamp = timestamp

En faisant tourner ces deux programmes, on pourra bien observer qu’ils reçoivent en temps réel tous les deux les informations.

$ python abonné_alerte.py
%
b'1649107932476-0'
b'1649107933326-0'
b'1649107935481-0'
b'1649107936329-0'
ALERT TEMP! b'room-2': 6.12191050500779%
b'1649107938484-0'
b'1649107939334-0'
...
...
$ python abonné_sql.py
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107932476-0', b'room-2', 4.77177761522936, 30.35734886658456]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107933326-0', b'room-1', 5.708066609483226, 30.516484411754252]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107935481-0', b'room-2', 4.2003788955207435, 28.884695064157125]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107936329-0', b'room-1', 4.9264638938932865, 31.445630632770374]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107938484-0', b'room-2', 6.12191050500779, 31.110434314778836]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107939334-0', b'room-1', 5.403059986341991, 27.360490108746863]
...
...

Si nous coupons le ou les programmes qui alimentent le flux de données, les deux programmes qui écoutent le stream se contentent de rester à l’écoute. Inversement, si rien n’écoute le stream, il continue malgré tout de s’alimenter des données reçues par les éditeurs, Redis enregistre donc bien les données qui pourront être interrogées ultérieurement.

Les publishers et les subscribers sont découplés grâce à l’intermédiaire qu’est Redis : les éditeurs et les abonnés n’ont aucun besoin de se connaître entre eux, ni de savoir ce qui existe dans le système.

À noter : la commande XREAD permet d’écouter sur plus d’un stream à la fois, permettant ainsi de souscrire à plusieurs topics, en jargon Pub/Sub. Ici nous n’écoutons qu’un seul stream pour nos besoins, cependant.

Nous laissons à Redis le soin d’horodater les enregistrements lui-même par le biais de l’attribution automatique des ID en timestamp. Cependant, il convient de retenir que l’horodatage de Redis ne correspond pas à la date de l’évènement, mais plutôt à la date d’enregistrement de l’évènement dans Redis. La différence est subtile mais importante dans un système réel où on ne peut faire le pari d’un système globalement sans latence. Dans notre présentation, nous confondons les deux pour simplifier les exemples.


  1. L’implémentation de Redis utilise des arbres Radix qui permettent d’obtenir rapidement un intervalle complet d’éléments, vous trouverez plus d’informations dans cette présentation donnée par Salvatore Sanfilippo, le créateur de Redis : https://www.youtube.com/watch?v=Ty1rQuRJijk


Redis permet d’implémenter un Pub/Sub minimaliste et de surcroît une architecture découplée entre plusieurs éditeurs et plusieurs abonnés. Les subscribers peuvent recevoir le même message (broadcast), comme cela a été illustré.

Mais nous pourrions envisager et souhaiter un comportement essentiellement différent : plusieurs consommateurs (nouvelle terminologie pour subscriber dans ce contexte) qui consultent le même flux de messages, mais sans que l’un d’eux ne reçoive un même message qu’un autre. Cela est fort utile pour distribuer des tâches (souvent coûteuses en temps) parmi plusieurs workers parallèles qui font tous la même chose.

Toujours avec les streams, Redis a bien d’autres commandes en stock qui nous permettront d’implémenter un tel dispositif. Ce sera ainsi l’objet de notre prochain billet : les queues partagées.

5 commentaires

Saurais-tu en dire plus sur les avantages qu’offre Redis par rapport à d’autres systèmes tels que RabbitMQ ou Kafka ? Ou même sans rentrer dans les détails de comparaison, dans quels cas d’utilisation dirais-tu que Redis Pub/Sub semble plus approprié ?

D’après ce que j’ai lu dans le passé, un désavantage d’utiliser Redis est qu’il peut arriver que des messages soient perdus, sans être délivrés au subscribers. Pour l’exemple que tu donnes, ce n’est sans doute pas un drame mais pour d’autres cas, ce n’est pas acceptable. Mais alors je me demande pourquoi utiliser Redis plutôt que ces systèmes plus évolués qui garantissent que rien ne sera perdu.

Saurais-tu en dire plus sur les avantages qu’offre Redis par rapport à d’autres systèmes tels que RabbitMQ ou Kafka ? Ou même sans rentrer dans les détails de comparaison, dans quels cas d’utilisation dirais-tu que Redis Pub/Sub semble plus approprié ?

D’après ce que j’ai lu dans le passé, un désavantage d’utiliser Redis est qu’il peut arriver que des messages soient perdus, sans être délivrés au subscribers. Pour l’exemple que tu donnes, ce n’est sans doute pas un drame mais pour d’autres cas, ce n’est pas acceptable. Mais alors je me demande pourquoi utiliser Redis plutôt que ces systèmes plus évolués qui garantissent que rien ne sera perdu.

Migwel

Attention, il faut bien s’assurer que l’on ne confond pas les streams (qui sont décrits ici) avec les canaux PUB/SUB de Redis qui sont beaucoup plus anciens, beaucoup plus légers et effectivement ne donnent aucune garantie que les messages publiés seront lus.

À te lire je ne suis pas tout à fait sûr que tu parles des premiers et qu’il s’agit plutôt des seconds que tu mentionnes, parce qu’ici, à moins de tomber sur un cas vraiment très, très dégradé (où les événements ne sont jamais consommés jusqu’à saturer la mémoire), les streams peuvent garantir que les messages seront reçus "au moins une fois".

+0 -0

Attention, il ne faut bien s’assurer que l’on ne confond pas les streams (qui sont décrits ici) avec les canaux PUB/SUB de Redis qui sont beaucoup plus anciens, beaucoup plus légers et effectivement ne donnent aucune garantie que les messages publiés seront lus.

À te lire je ne suis pas tout à fait sûr que tu parles des premiers et qu’il s’agit plutôt des seconds que tu mentionnes, parce qu’ici, à moins de tomber sur un cas vraiment très, très dégradé (où les événements ne sont jamais consommés jusqu’à saturer la mémoire), les streams peuvent garantir que les messages seront reçus "au moins une fois".

nohar

Aaaah. En effet, tu as raison, j’ignorais totalement l’existence d’un système Pub/Sub plus ancien donc merci pour les clarifications !

+0 -0
Connectez-vous pour pouvoir poster un message.
Connexion

Pas encore membre ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte