Comment créer des applications de streaming avec état avec Apache Flink

Fabian Hueske est un committer et membre PMC du projet Apache Flink et co-fondateur de Data Artisans.

Apache Flink est un framework permettant d'implémenter des applications de traitement de flux avec état et de les exécuter à grande échelle sur un cluster de calcul. Dans un article précédent, nous avons examiné ce qu'est le traitement de flux avec état, les cas d'utilisation auxquels il répond et pourquoi vous devez implémenter et exécuter vos applications de streaming avec Apache Flink.

Dans cet article, je vais présenter des exemples pour deux cas d'utilisation courants de traitement de flux avec état et discuter de la façon dont ils peuvent être implémentés avec Flink. Le premier cas d'utilisation concerne les applications basées sur les événements, c'est-à-dire les applications qui ingèrent des flux continus d'événements et appliquent une certaine logique métier à ces événements. Le second est le cas d'utilisation de l'analyse en continu, où je présenterai deux requêtes analytiques implémentées avec l'API SQL de Flink, qui agrège les données de streaming en temps réel. Chez Data Artisans, nous fournissons le code source de tous nos exemples dans un référentiel GitHub public.

Avant de plonger dans les détails des exemples, je vais présenter le flux d'événements qui est ingéré par les exemples d'applications et expliquer comment vous pouvez exécuter le code que nous fournissons.

Un flux d'événements de taxi

Nos exemples d'applications sont basés sur un ensemble de données publiques sur les trajets en taxi qui se sont déroulés à New York en 2013. Les organisateurs du Grand Challenge DEBS (ACM International Conference on Distributed Event-Based Systems) 2015 ont réorganisé l'ensemble de données d'origine et l'ont converti en un seul fichier CSV à partir duquel nous lisons les neuf champs suivants.

  • Medallion: identifiant de somme MD5 du taxi
  • Hack_license: un ID de somme MD5 de la licence de taxi
  • Pickup_datetime: heure à laquelle les passagers ont été pris en charge
  • Dropoff_datetime: heure à laquelle les passagers ont été déposés
  • Pickup_longitude: la longitude du lieu de prise en charge
  • Pickup_latitude: la latitude du lieu de prise en charge
  • Dropoff_longitude: la longitude du lieu de dépôt
  • Dropoff_latitude: la latitude du lieu de dépôt
  • Total_amount: total payé en dollars

Le fichier CSV stocke les enregistrements dans l'ordre croissant de leur attribut d'heure de dépôt. Par conséquent, le fichier peut être traité comme un journal ordonné des événements qui ont été publiés à la fin d'un voyage. Afin d'exécuter les exemples que nous fournissons sur GitHub, vous devez télécharger l'ensemble de données du défi DEBS à partir de Google Drive.

Tous les exemples d'applications lisent séquentiellement le fichier CSV et l'ingèrent en tant que flux d'événements de trajet en taxi. À partir de là, les applications traitent les événements comme n'importe quel autre flux, c'est-à-dire comme un flux ingéré depuis un système de publication-abonnement basé sur des journaux, tel qu'Apache Kafka ou Kinesis. En fait, lire un fichier (ou tout autre type de données persistantes) et le traiter comme un flux est la pierre angulaire de l'approche de Flink pour unifier le traitement par lots et par flux.

Exécuter les exemples Flink

Comme mentionné précédemment, nous avons publié le code source de nos exemples d'applications dans un référentiel GitHub. Nous vous encourageons à bifurquer et cloner le référentiel. Les exemples peuvent être facilement exécutés à partir de votre IDE de choix; vous n'avez pas besoin d'installer et de configurer un cluster Flink pour les exécuter. Tout d'abord, importez le code source des exemples en tant que projet Maven. Ensuite, exécutez la classe principale d'une application et indiquez l'emplacement de stockage du fichier de données (voir ci-dessus pour le lien pour télécharger les données) comme paramètre de programme.

Une fois que vous avez lancé une application, elle démarre une instance Flink locale intégrée dans le processus JVM de l'application et soumet l'application pour l'exécuter. Vous verrez un tas d'instructions de journal pendant le démarrage de Flink et la planification des tâches du travail. Une fois l'application en cours d'exécution, sa sortie sera écrite dans la sortie standard.

Créer une application événementielle dans Flink

Voyons maintenant notre premier cas d'utilisation, qui est une application événementielle. Les applications événementielles ingèrent des flux d'événements, effectuent des calculs à mesure que les événements sont reçus et peuvent émettre de nouveaux événements ou déclencher des actions externes. Plusieurs applications événementielles peuvent être composées en les connectant ensemble via des systèmes de journaux d'événements, de la même manière que de grands systèmes peuvent être composés à partir de microservices. Les applications basées sur les événements, les journaux d'événements et les instantanés de l'état des applications (appelés points de sauvegarde dans Flink) constituent un modèle de conception très puissant, car vous pouvez réinitialiser leur état et rejouer leur entrée pour récupérer après un échec, corriger un bogue ou migrer un application à un autre cluster.

Dans cet article, nous examinerons une application événementielle qui soutient un service, qui surveille les heures de travail des chauffeurs de taxi. En 2016, la NYC Taxi and Limousine Commission a décidé de limiter les heures de travail des chauffeurs de taxi à 12 heures de travail et d'exiger une pause d'au moins huit heures avant le début du prochain quart de travail. Un quart de travail commence avec le début de la première course. Dès lors, un conducteur peut commencer de nouveaux trajets dans un délai de 12 heures. Notre application suit les trajets des conducteurs, marque l'heure de fin de leur fenêtre de 12 heures (c'est-à-dire l'heure à laquelle ils peuvent commencer le dernier trajet) et signale les trajets qui ont enfreint le règlement. Vous pouvez trouver le code source complet de cet exemple dans notre référentiel GitHub.

Notre application est implémentée avec l'API DataStream de Flink et un fichier KeyedProcessFunction. L'API DataStream est une API fonctionnelle et basée sur le concept de flux de données typés. A DataStreamest la représentation logique d'un flux d'événements de type T. Un flux est traité en lui appliquant une fonction qui produit un autre flux de données, éventuellement d'un type différent. Flink traite les flux en parallèle en distribuant des événements pour diffuser des partitions et en appliquant différentes instances de fonctions à chaque partition.

L'extrait de code suivant montre le flux de haut niveau de notre application de surveillance.

// ingérer un flux de courses en taxi.

DataStream rides = TaxiRides.getRides (env, inputPath);

Flux de données notifications = manèges

   // partitionner le flux par l'identifiant du permis de conduire

   .keyBy (r -> r.licenseId)

   // Surveillez les événements de parcours et générez des notifications

   .process (nouveau MonitorWorkTime ());

// imprimer les notifications

notifications.print ();

L'application commence à ingérer un flux d'événements de trajet en taxi. Dans notre exemple, les événements sont lus à partir d'un fichier texte, analysés et stockés dans des TaxiRideobjets POJO. Une application réelle ingérerait généralement les événements d'une file d'attente de messages ou d'un journal d'événements, comme Apache Kafka ou Pravega. L'étape suivante consiste à saisir les TaxiRideévénements par licenseIdle pilote. L' keyByopération partitionne le flux sur le champ déclaré, de sorte que tous les événements avec la même clé sont traités par la même instance parallèle de la fonction suivante. Dans notre cas, nous partitionnons sur le licenseIdterrain car nous voulons surveiller le temps de travail de chaque conducteur individuel.

Ensuite, nous appliquons la MonitorWorkTimefonction sur les TaxiRideévénements partitionnés . La fonction suit les trajets par conducteur et surveille leurs quarts de travail et les temps de pause. Il émet des événements de type Tuple2, où chaque tuple représente une notification composée de l'ID de licence du pilote et d'un message. Enfin, notre application émet les messages en les imprimant sur la sortie standard. Une application du monde réel écrirait les notifications sur un message externe ou un système de stockage, comme Apache Kafka, HDFS ou un système de base de données, ou déclencherait un appel externe pour les expulser immédiatement.

Maintenant que nous avons discuté du flux global de l'application, examinons la MonitorWorkTimefonction, qui contient la plupart de la logique métier réelle de l'application. La MonitorWorkTimefonction est un état KeyedProcessFunctionqui ingère des TaxiRideévénements et émet des Tuple2enregistrements. L' KeyedProcessFunctioninterface propose deux méthodes pour traiter les données: processElement()et onTimer(). La processElement()méthode est appelée pour chaque événement arrivant. La onTimer()méthode est appelée lorsqu'un temporisateur précédemment enregistré se déclenche. L'extrait de code suivant montre le squelette de la MonitorWorkTimefonction et tout ce qui est déclaré en dehors des méthodes de traitement.

classe statique publique MonitorWorkTime

    étend KeyedProcessFunction {

  // constantes de temps en millisecondes

  privé statique final long ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 heures

  privé statique final long REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 heures

  privé statique final long CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 heures

 formateur de dateTimeFormatter transitoire privé;

  // handle d'état pour stocker l'heure de début d'un quart de travail

  ValueState shiftStart;

  @Passer outre

  public void open (configuration conf) {

    // enregistrer le handle d'état

    shiftStart = getRuntimeContext (). getState (

      nouveau ValueStateDescriptor («shiftStart», Types.LONG));

    // initialise le formateur d'heure

    this.formatter = DateTimeFormat.forPattern ("aaaa-MM-jj HH: mm: ss");

  }

  // processElement () et onTimer () sont décrits en détail ci-dessous.

}

La fonction déclare quelques constantes pour les intervalles de temps en millisecondes, un formateur d'heure et un handle d'état pour l'état à clé qui est géré par Flink. L'état géré est périodiquement contrôlé et automatiquement restauré en cas d'échec. L'état de clé est organisé par clé, ce qui signifie qu'une fonction conservera une valeur par poignée et clé. Dans notre cas, la MonitorWorkTimefonction maintient une Longvaleur pour chaque clé, c'est-à-dire pour chacune licenseId. L' shiftStartétat stocke l'heure de début du quart de travail d'un conducteur. Le descripteur d'état est initialisé dans la open()méthode, qui est appelée une fois avant le traitement du premier événement.

Maintenant, regardons la processElement()méthode.

@Passer outre

public void processElement (

    Balade en taxi,

    Contexte ctx,

    Collectionneur out) jette une exception {

  // recherche l'heure de début du dernier quart

  Long startTs = shiftStart.value ();

  if (startTs == null ||

    startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // c'est le premier tour d'un nouveau quart de travail.

    startTs = ride.pickUpTime;

    shiftStart.update (startTs);

    long endTs = startTs + ALLOWED_WORK_TIME;

    out.collect (Tuple2.of (ride.licenseId,

      "Vous êtes autorisé à accepter de nouveaux passagers jusqu'à" + formatter.print (endTs)));

    // enregistrer la minuterie pour nettoyer l'état en 24h

    ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

  } else if (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

    // ce trajet a commencé après la fin du temps de travail autorisé.

    // c'est une violation de la réglementation!

    out.collect (Tuple2.of (ride.licenseId,

      «Ce trajet a enfreint la réglementation sur le temps de travail.»));

  }

}

La processElement()méthode est appelée pour chaque TaxiRideévénement. Tout d'abord, la méthode récupère l'heure de début du décalage du pilote à partir de la poignée d'état. Si l'état ne contient pas d'heure de début ( startTs == null) ou si la dernière équipe a commencé plus de 20 heures ( ALLOWED_WORK_TIME + REQ_BREAK_TIME) avant la course actuelle, la course actuelle est la première course d'une nouvelle équipe. Dans les deux cas, la fonction commence un nouveau quart de travail en mettant à jour l'heure de début du quart à l'heure de début du trajet en cours, émet un message au conducteur avec l'heure de fin du nouveau quart de travail et enregistre une minuterie pour nettoyer le état dans 24 heures.

Si le trajet en cours n'est pas le premier trajet d'un nouveau poste, la fonction vérifie s'il enfreint la réglementation du temps de travail, c'est-à-dire s'il a commencé plus de 12 heures après le début du poste actuel du conducteur. Si tel est le cas, la fonction émet un message pour informer le conducteur de la violation.

La processElement()méthode de la MonitorWorkTimefonction enregistre une minuterie pour nettoyer l'état 24 heures après le début d'un quart de travail. Il est important de supprimer un état qui n'est plus nécessaire pour empêcher l'augmentation de la taille des états en raison d'une fuite d'état. Un minuteur se déclenche lorsque l'heure de l'application dépasse l'horodatage du minuteur. À ce stade, la onTimer()méthode est appelée. Comme pour l'état, les minuteries sont conservées par clé et la fonction est placée dans le contexte de la clé associée avant que la onTimer()méthode ne soit appelée. Par conséquent, tous les accès d'état sont dirigés vers la clé qui était active lorsque la minuterie a été enregistrée.

Jetons un coup d'œil à la onTimer()méthode de MonitorWorkTime.

@Passer outre

public void onTimer (

    longue minuterie,

    OnTimerContext ctx,

    Collectionneur out) jette une exception {

  // supprime l'état de décalage si aucun nouveau décalage n'a déjà été commencé.

  Long startTs = shiftStart.value ();

  if (startTs == timerTs - CLEAN_UP_INTERVAL) {

    shiftStart.clear ();

  }

}

La processElement()méthode enregistre les minuteries pendant 24 heures après le début d'un quart de travail pour nettoyer un état qui n'est plus nécessaire. Le nettoyage de l'état est la seule logique onTimer()implémentée par la méthode. Lorsqu'un chronomètre se déclenche, nous vérifions si le conducteur a entamé une nouvelle équipe entre-temps, c'est-à-dire si l'heure de début de l'équipe a changé. Si ce n'est pas le cas, nous effaçons l'état de décalage pour le conducteur.