Comment utiliser Redis pour le traitement de flux en temps réel

Roshan Kumar est chef de produit senior chez Redis Labs.

L'ingestion de données en continu en temps réel est une exigence courante dans de nombreux cas d'utilisation de Big Data. Dans des domaines tels que l'IoT, le commerce électronique, la sécurité, les communications, le divertissement, la finance et la vente au détail, où tout dépend d'une prise de décision rapide et précise basée sur des données, la collecte et l'analyse de données en temps réel sont en fait au cœur de l'entreprise.

Cependant, la collecte, le stockage et le traitement de données en continu en grands volumes et à grande vitesse présente des défis architecturaux. Une première étape importante dans l'analyse des données en temps réel consiste à s'assurer que des ressources réseau, de calcul, de stockage et de mémoire adéquates sont disponibles pour capturer des flux de données rapides. Mais la pile logicielle d'une entreprise doit correspondre aux performances de son infrastructure physique. Sinon, les entreprises seront confrontées à un arriéré massif de données, ou pire, à des données manquantes ou incomplètes.

Redis est devenu un choix populaire pour ces scénarios d'acquisition de données rapides. Plateforme légère de base de données en mémoire, Redis atteint un débit de millions d'opérations par seconde avec des latences inférieures à la milliseconde, tout en utilisant des ressources minimales. Il propose également des implémentations simples, activées par ses multiples structures de données et fonctions.

Dans cet article, je montrerai comment Redis Enterprise peut résoudre les défis courants associés à l'ingestion et au traitement de gros volumes de données à haute vitesse. Nous allons parcourir trois approches différentes (y compris le code) pour traiter un flux Twitter en temps réel, en utilisant respectivement Redis Pub / Sub, Redis Lists et Redis Sorted Sets. Comme nous le verrons, les trois méthodes ont un rôle à jouer dans l'ingestion rapide des données, en fonction du cas d'utilisation.

Les défis de la conception de solutions d'ingestion rapide de données

L'ingestion de données à grande vitesse implique souvent plusieurs types de complexité différents:

  • De gros volumes de données arrivant parfois par rafales. Les données en rafale nécessitent une solution capable de traiter de gros volumes de données avec une latence minimale. Idéalement, il devrait être en mesure d'effectuer des millions d'écritures par seconde avec une latence inférieure à la milliseconde, en utilisant un minimum de ressources.
  • Données provenant de plusieurs sources. Les solutions d'ingestion de données doivent être suffisamment flexibles pour gérer les données dans de nombreux formats différents, en conservant l'identité source si nécessaire et en les transformant ou en les normalisant en temps réel.
  • Données qui doivent être filtrées, analysées ou transmises. La plupart des solutions d'ingestion de données ont un ou plusieurs abonnés qui consomment les données. Il s'agit souvent d'applications différentes qui fonctionnent au même endroit ou à des endroits différents avec un ensemble d'hypothèses variées. Dans de tels cas, la base de données doit non seulement transformer les données, mais également filtrer ou agréger en fonction des exigences des applications consommatrices.
  • Données provenant de sources réparties géographiquement. Dans ce scénario, il est souvent pratique de distribuer les nœuds de collecte de données, en les plaçant à proximité des sources. Les nœuds eux-mêmes font partie de la solution d'acquisition rapide de données, pour collecter, traiter, transférer ou rediriger les données d'ingestion.

Gestion de l'ingestion rapide de données dans Redis

De nombreuses solutions prenant en charge l'ingestion rapide de données aujourd'hui sont complexes, riches en fonctionnalités et sur-conçues pour des besoins simples. Redis, en revanche, est extrêmement léger, rapide et facile à utiliser. Avec des clients disponibles dans plus de 60 langues, Redis peut être facilement intégré aux piles de logiciels populaires.

Redis propose des structures de données telles que des listes, des ensembles, des ensembles triés et des hachages qui offrent un traitement de données simple et polyvalent. Redis fournit plus d'un million d'opérations de lecture / écriture par seconde, avec une latence inférieure à la milliseconde sur une instance cloud de taille modeste, ce qui la rend extrêmement économe en ressources pour de grands volumes de données. Redis prend également en charge les services de messagerie et les bibliothèques clientes dans tous les langages de programmation populaires, ce qui le rend bien adapté pour combiner l'ingestion de données à haute vitesse et l'analyse en temps réel. Les commandes Redis Pub / Sub lui permettent de jouer le rôle de courtier de messages entre les éditeurs et les abonnés, une fonctionnalité souvent utilisée pour envoyer des notifications ou des messages entre les nœuds d'ingestion de données distribuées.

Redis Enterprise améliore Redis avec une mise à l'échelle transparente, une disponibilité permanente, un déploiement automatisé et la possibilité d'utiliser une mémoire flash rentable comme extension de RAM afin que le traitement de grands ensembles de données puisse être effectué de manière rentable.

Dans les sections ci-dessous, je décrirai comment utiliser Redis Enterprise pour relever les défis courants d'ingestion de données.

Redis à la vitesse de Twitter

Pour illustrer la simplicité de Redis, nous allons explorer un exemple de solution d'ingestion rapide de données qui rassemble les messages d'un fil Twitter. Le but de cette solution est de traiter les tweets en temps réel et de les pousser dans le tube au fur et à mesure de leur traitement.

Les données Twitter ingérées par la solution sont ensuite consommées par plusieurs processeurs sur toute la ligne. Comme le montre la figure 1, cet exemple traite de deux processeurs: le processeur de Tweet anglais et le processeur d'influence. Chaque processeur filtre les tweets et les transmet par ses canaux respectifs à d'autres consommateurs. Cette chaîne peut aller aussi loin que la solution l'exige. Cependant, dans notre exemple, nous nous arrêtons au troisième niveau, où nous regroupons les discussions populaires parmi les anglophones et les principaux influenceurs.

Redis Labs

Notez que nous utilisons l'exemple du traitement des flux Twitter en raison de la vitesse d'arrivée des données et de la simplicité. Notez également que les données Twitter atteignent notre ingestion rapide de données via un seul canal. Dans de nombreux cas, tels que l'IoT, plusieurs sources de données peuvent envoyer des données au récepteur principal.

Il existe trois façons d'implémenter cette solution à l'aide de Redis: l'ingestion avec Redis Pub / Sub, l'acquisition avec la structure de données List ou l'ingestion avec la structure de données Sorted Set. Examinons chacune de ces options.

Ingérer avec Redis Pub / Sub

Il s'agit de la mise en œuvre la plus simple de l'ingestion rapide de données. Cette solution utilise la fonctionnalité Pub / Sub de Redis, qui permet aux applications de publier et de s'abonner à des messages. Comme le montre la figure 2, chaque étape traite les données et les publie sur un canal. L'étape suivante s'abonne au canal et reçoit les messages pour un traitement ou un filtrage ultérieur.

Redis Labs

Avantages

  • Facile à mettre en œuvre.
  • Fonctionne bien lorsque les sources de données et les processeurs sont répartis géographiquement.

Les inconvénients 

  • La solution exige que les éditeurs et les abonnés soient en permanence actifs. Les abonnés perdent des données lorsqu'ils sont arrêtés ou lorsque la connexion est perdue.
  • Cela nécessite plus de connexions. Un programme ne peut pas publier et s'abonner à la même connexion, de sorte que chaque processeur de données intermédiaire nécessite deux connexions - une pour s'abonner et une pour publier. Si vous exécutez Redis sur une plate-forme DBaaS, il est important de vérifier si votre package ou niveau de service a des limites quant au nombre de connexions.

Une note sur les connexions

Si plusieurs clients s'abonnent à un canal, Redis envoie les données à chaque client de manière linéaire, l'un après l'autre. De grandes charges utiles de données et de nombreuses connexions peuvent introduire une latence entre un éditeur et ses abonnés. Bien que la limite stricte par défaut du nombre maximal de connexions soit de 10 000, vous devez tester et évaluer le nombre de connexions appropriées pour votre charge utile.

Redis gère un tampon de sortie client pour chaque client. Les limites par défaut du tampon de sortie client pour Pub / Sub sont définies comme suit:

client-output-buffer-limit pubsub 32 Mo 8 Mo 60

Avec ce paramètre, Redis forcera les clients à se déconnecter dans deux conditions: si la mémoire tampon de sortie dépasse 32 Mo ou si la mémoire tampon de sortie contient 8 Mo de données de manière cohérente pendant 60 secondes.

Ce sont des indications que les clients consomment les données plus lentement qu'elles ne sont publiées. Si une telle situation se présente, essayez d'abord d'optimiser les consommateurs de sorte qu'ils n'ajoutent pas de latence lors de la consommation des données. Si vous remarquez que vos clients sont toujours déconnectés, vous pouvez augmenter les limites de la client-output-buffer-limit pubsubpropriété dans redis.conf. N'oubliez pas que toute modification des paramètres peut augmenter la latence entre l'éditeur et l'abonné. Tout changement doit être testé et vérifié minutieusement.

Conception de code pour la solution Redis Pub / Sub

Redis Labs

C'est la plus simple des trois solutions décrites dans cet article. Voici les classes Java importantes implémentées pour cette solution. Téléchargez le code source avec une implémentation complète ici: //github.com/redislabsdemo/IngestPubSub.

La Subscriberclasse est la classe principale de cette conception. Chaque Subscriberobjet maintient une nouvelle connexion avec Redis.

Class Subscriber étend JedisPubSub implémente Runnable {

       nom de chaîne privé;

       private RedisConnection conn = null;

       private Jedis jedis = null;

       private String abonnéChannel;

       Public Subscriber (String subscriptionName, String channelName) lève une exception {

              nom = nom_abonné;

              SubscriberChannel = channelName;

              Fil t = nouveau fil (ce);

              t.start ();

       }

       @Passer outre

       public void run () {

              essayer{

                      conn = RedisConnection.getRedisConnection ();

                      jedis = conn.getJedis ();

                      while (vrai) {

                             jedis.subscribe (this, this.subscriberChannel);

                      }

              } catch (Exception e) {

                      e.printStackTrace ();

              }

       }

       @Passer outre

       public void onMessage (chaîne de chaîne, message de chaîne) {

              super.onMessage (canal, message);

       }

}

La Publisherclasse maintient une connexion distincte à Redis pour publier des messages sur un canal.

Public class Publisher {

       RedisConnection conn = null;

       Jedis jedis = null;

       chaîne de chaîne privée;

       public Publisher (String channelName) lève une exception {

              channel = channelName;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();

       }

       public void publish (String msg) lève une exception {

              jedis.publish (chaîne, msg);

       }

}

Les EnglishTweetFilter, InfluencerTweetFilter, HashTagCollectoret les InfluencerCollectorfiltres étendent Subscriber, ce qui leur permet d'écouter les canaux entrants. Étant donné que vous avez besoin de connexions Redis distinctes pour vous abonner et publier, chaque classe de filtre a son propre RedisConnectionobjet. Les filtres écoutent les nouveaux messages de leurs canaux en boucle. Voici l'exemple de code de la EnglishTweetFilterclasse:

public class EnglishTweetFilter étend l'abonné

{

       private RedisConnection conn = null;

       private Jedis jedis = null; 

       private String publisherChannel = null;

public EnglishTweetFilter (String name, String subscriptionChannel, String publisherChannel) lève une exception {

              super (nom, SubscriberChannel);

              this.publisherChannel = publisherChannel;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();           

       }

       @Passer outre

       public void onMessage (String subscriptionChannel, String message) {

              JsonParser jsonParser = nouveau JsonParser ();

              JsonElement jsonElement = jsonParser.parse (message);

              JsonObject jsonObject = jsonElement.getAsJsonObject ();

              // filtrer les messages: publier uniquement les tweets en anglais           

if (jsonObject.get ("lang")! = null &&

       jsonObject.get ("lang"). getAsString (). equals ("en")) {

                      jedis.publish (publisherChannel, message);

              }

       }

}

La Publisherclasse a une méthode de publication qui publie les messages sur le canal requis.

Public class Publisher {

.

.     

       public void publish (String msg) lève une exception {

              jedis.publish (chaîne, msg);

       }

.

}

La classe principale lit les données du flux d'ingestion et les publie sur le AllDatacanal. La méthode principale de cette classe démarre tous les objets de filtre.

classe publique IngestPubSub

{

.

       public void start () lève une exception {

       .

       .

              éditeur = nouvel éditeur («AllData»);

              englishFilter = new EnglishTweetFilter ("English Filter", "AllData",

                                           «EnglishTweets»);

              influencerFilter = new InfluencerTweetFilter ("Influencer Filter",

                                           «AllData», «InfluencerTweets»);

              hashtagCollector = new HashTagCollector («Hashtag Collector», 

                                           «EnglishTweets»);

              influencerCollector = new InfluencerCollector («Influencer Collector»,

                                           «InfluencerTweets»);

       .

       .

}

Ingérer avec les listes Redis

La structure de données List de Redis facilite et simplifie la mise en œuvre d'une solution de mise en file d'attente. Dans cette solution, le producteur pousse chaque message à l'arrière de la file d'attente, et l'abonné interroge la file d'attente et extrait les nouveaux messages de l'autre extrémité.

Redis Labs

Avantages

  • Cette méthode est fiable en cas de perte de connexion. Une fois que les données sont poussées dans les listes, elles y sont conservées jusqu'à ce que les abonnés les lisent. Cela est vrai même si les abonnés sont arrêtés ou perdent leur connexion avec le serveur Redis.
  • Les producteurs et les consommateurs n'ont besoin d'aucun lien entre eux.

Les inconvénients

  • Une fois les données extraites de la liste, elles sont supprimées et ne peuvent plus être récupérées. À moins que les consommateurs ne conservent les données, elles sont perdues dès qu'elles sont consommées.
  • Chaque consommateur a besoin d'une file d'attente distincte, ce qui nécessite le stockage de plusieurs copies des données.

Conception de code pour la solution Redis Lists

Redis Labs

Vous pouvez télécharger le code source de la solution Redis Lists ici: //github.com/redislabsdemo/IngestList. Les principales classes de cette solution sont expliquées ci-dessous.

MessageListincorpore la structure de données de la liste Redis. La push()méthode pousse le nouveau message vers la gauche de la file d'attente et pop()attend un nouveau message depuis la droite si la file d'attente est vide.

public class MessageList {

       nom de chaîne protégé = «MaListe»; // Nom

.

.     

       public void push (String msg) lève une exception {

              jedis.lpush (nom, msg); // Poussée à gauche

       }

       public String pop () lève l'exception {

              return jedis.brpop (0, nom) .toString ();

       }

.

.

}

MessageListenerest une classe abstraite qui implémente la logique d'écouteur et d'éditeur. Un MessageListenerobjet n'écoute qu'une seule liste, mais peut publier sur plusieurs canaux ( MessageFilterobjets). Cette solution nécessite un MessageFilterobjet distinct pour chaque abonné dans le canal.

La classe MessageListener implémente Runnable {

       nom de chaîne privé = null;

       Private MessageList inboundList = null;

       Map outBoundMsgFilters = new HashMap ();

.

.     

       public void registerOutBoundMessageList (MessageFilter msgFilter) {

              if (msgFilter! = null) {

                      if (outBoundMsgFilters.get (msgFilter.name) == null) {

                             outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

       @Passer outre

       public void run () {

.

                      while (vrai) {

                             Chaîne msg = inboundList.pop ();

                             processMessage (msg);

                      }                                  

.

       }

.

       protected void pushMessage (String msg) lève l'exception {

              Set outBoundMsgNames = outBoundMsgFilters.keySet ();

              for (Nom de la chaîne: outBoundMsgNames) {

                      MessageFilter msgList = outBoundMsgFilters.get (nom);

                      msgList.filterAndPush (msg);

              }

       }

}

MessageFilterest une classe parent facilitant la filterAndPush()méthode. À mesure que les données transitent par le système d'acquisition, elles sont souvent filtrées ou transformées avant d'être envoyées à l'étape suivante. Les classes qui étendent la MessageFilterclasse remplacent la filterAndPush()méthode et implémentent leur propre logique pour pousser le message filtré vers la liste suivante.

Classe publique MessageFilter {

       MessageList messageList = null;

.

.

       public void filterAndPush (String msg) lève l'exception {

              messageList.push (msg);

       }

.

.     

}

AllTweetsListenerest un exemple d'implémentation d'une MessageListenerclasse. Cela écoute tous les tweets de la AllDatachaîne et publie les données sur EnglishTweetsFilteret InfluencerFilter.

public class AllTweetsListener étend MessageListener {

.

.     

       public static void main (String [] args) lève l'exception {

              MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (nouveau

              EnglishTweetsFilter («EnglishTweetsFilter», «EnglishTweets»));

              allTweetsProcessor.registerOutBoundMessageList (nouveau

                             InfluencerFilter («InfluencerFilter», «Influencers»));

              allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFilters'étend MessageFilter. Cette classe implémente une logique pour sélectionner uniquement les tweets marqués comme tweets anglais. Le filtre rejette les tweets non anglais et pousse les tweets anglais à la liste suivante.

public class EnglishTweetsFilter étend MessageFilter {

       public EnglishTweetsFilter (String name, String listName) lève l'exception {

              super (nom, nom de liste);

       }

       @Passer outre

       public void filterAndPush (String message) lève l'exception {

              JsonParser jsonParser = nouveau JsonParser ();

              JsonElement jsonElement = jsonParser.parse (message);

              JsonArray jsonArray = jsonElement.getAsJsonArray ();

              JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

              if (jsonObject.get ("lang")! = null &&

jsonObject.get ("lang"). getAsString (). equals ("en")) {

                             Jedis jedis = super.getJedisInstance ();

                             if (jedis! = null) {

                                    jedis.lpush (super.nom, jsonObject.toString ());

                             }

              }

       }

}