Conçu pour le temps réel: messagerie Big Data avec Apache Kafka, partie 1

Lorsque le mouvement du Big Data a commencé, il était principalement axé sur le traitement par lots. Les outils de stockage de données et d'interrogation distribués tels que MapReduce, Hive et Pig ont tous été conçus pour traiter les données par lots plutôt qu'en continu. Les entreprises exécutaient plusieurs tâches chaque nuit pour extraire les données d'une base de données, puis analyser, transformer et éventuellement stocker les données. Plus récemment, les entreprises ont découvert la puissance de l'analyse et du traitement des données et des événements au fur et à mesure qu'ils se produisent , pas seulement une fois toutes les quelques heures. Cependant, la plupart des systèmes de messagerie traditionnels ne s'adaptent pas à la gestion du Big Data en temps réel. Les ingénieurs de LinkedIn ont donc créé et open-source Apache Kafka: un cadre de messagerie distribué qui répond aux exigences du Big Data en évoluant sur du matériel de base.

Au cours des dernières années, Apache Kafka a émergé pour résoudre une variété de cas d'utilisation. Dans le cas le plus simple, il peut s'agir d'un simple tampon pour stocker les journaux d'application. Combiné à une technologie telle que Spark Streaming, il peut être utilisé pour suivre les modifications de données et agir sur ces données avant de les enregistrer dans une destination finale. Le mode prédictif de Kafka en fait un outil puissant pour détecter les fraudes, comme vérifier la validité d'une transaction par carte de crédit lorsqu'elle se produit, et ne pas attendre le traitement par lots des heures plus tard.

Ce didacticiel en deux parties présente Kafka, en commençant par l'installer et l'exécuter dans votre environnement de développement. Vous aurez un aperçu de l'architecture de Kafka, suivi d'une introduction au développement d'un système de messagerie Apache Kafka prêt à l'emploi. Enfin, vous allez créer une application de producteur / consommateur personnalisée qui envoie et consomme des messages via un serveur Kafka. Dans la seconde moitié du didacticiel, vous apprendrez à partitionner et à grouper les messages et à contrôler les messages qu'un consommateur Kafka consommera.

Qu'est-ce qu'Apache Kafka?

Apache Kafka est un système de messagerie conçu pour s'adapter au Big Data. Semblable à Apache ActiveMQ ou RabbitMq, Kafka permet aux applications construites sur différentes plates-formes de communiquer via la transmission de messages asynchrones. Mais Kafka diffère de ces systèmes de messagerie plus traditionnels de plusieurs manières:

  • Il est conçu pour évoluer horizontalement, en ajoutant plus de serveurs de base.
  • Il offre un débit beaucoup plus élevé pour les processus producteurs et consommateurs.
  • Il peut être utilisé pour prendre en charge des cas d'utilisation par lots et en temps réel.
  • Il ne prend pas en charge JMS, l'API middleware orientée message de Java.

L'architecture d'Apache Kafka

Avant d'explorer l'architecture de Kafka, vous devez connaître sa terminologie de base:

  • Un producteur est un processus qui peut publier un message dans une rubrique.
  • un consommateur est un processus qui peut s'abonner à un ou plusieurs sujets et consommer des messages publiés sur des sujets.
  • Une catégorie de rubrique est le nom du fil dans lequel les messages sont publiés.
  • Un courtier est un processus exécuté sur une seule machine.
  • Un cluster est un groupe de courtiers travaillant ensemble.

L'architecture d'Apache Kafka est très simple, ce qui peut améliorer les performances et le débit de certains systèmes. Chaque sujet de Kafka est comme un simple fichier journal. Lorsqu'un producteur publie un message, le serveur Kafka l'ajoute à la fin du fichier journal pour sa rubrique donnée. Le serveur attribue également un offset , qui est un numéro utilisé pour identifier en permanence chaque message. À mesure que le nombre de messages augmente, la valeur de chaque décalage augmente; par exemple, si le producteur publie trois messages, le premier peut obtenir un décalage de 1, le second un décalage de 2 et le troisième un décalage de 3.

Lorsque le consommateur Kafka démarre pour la première fois, il enverra une demande d'extraction au serveur, lui demandant de récupérer tous les messages pour un sujet particulier avec une valeur de décalage supérieure à 0. Le serveur vérifiera le fichier journal de ce sujet et renverra les trois nouveaux messages . Le consommateur traitera les messages, puis enverra une demande de messages avec un décalage supérieur à 3, et ainsi de suite.

Dans Kafka, le client est responsable de la mémorisation du nombre de décalages et de la récupération des messages.Le serveur Kafka ne suit ni ne gère la consommation des messages. Par défaut, un serveur Kafka conservera un message pendant sept jours. Un fil d'arrière-plan sur le serveur vérifie et supprime les messages de sept jours ou plus. Un consommateur peut accéder aux messages tant qu'ils se trouvent sur le serveur. Il peut lire un message plusieurs fois et même lire les messages dans l'ordre inverse de réception. Mais si le consommateur ne parvient pas à récupérer le message avant la fin des sept jours, il manquera ce message.

Benchmarks Kafka

L'utilisation de la production par LinkedIn et d'autres entreprises a montré qu'avec une configuration appropriée, Apache Kafka est capable de traiter des centaines de gigaoctets de données par jour. En 2011, trois ingénieurs de LinkedIn ont utilisé des tests de référence pour démontrer que Kafka pouvait atteindre un débit beaucoup plus élevé qu'ActiveMQ et RabbitMQ.

Configuration rapide et démo d'Apache Kafka

Nous allons créer une application personnalisée dans ce didacticiel, mais commençons par installer et tester une instance Kafka avec un producteur et un consommateur prêts à l'emploi.

  1. Visitez la page de téléchargement de Kafka pour installer la version la plus récente (0.9 à ce jour).
  2. Extrayez les binaires dans un software/kafkadossier. Pour la version actuelle, c'est software/kafka_2.11-0.9.0.0.
  3. Modifiez votre répertoire actuel pour qu'il pointe vers le nouveau dossier.
  4. Démarrez le serveur Zookeeper en exécutant la commande: bin/zookeeper-server-start.sh config/zookeeper.properties.
  5. Démarrez le serveur en exécutant Kafka: bin/kafka-server-start.sh config/server.properties.
  6. Créer un sujet de test que vous pouvez utiliser pour les tests: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Démarrer un simple consommateur de console qui peut consommer des messages publiés sur un sujet donné, comme javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning.
  8. Démarrage d' une simple console de producteur qui peut publier des messages au sujet de test: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. Essayez de taper un ou deux messages dans la console du producteur. Vos messages doivent apparaître dans la console consommateur.

Exemple d'application avec Apache Kafka

Vous avez vu comment Apache Kafka fonctionne hors de la boîte. Ensuite, développons une application de producteur / consommateur personnalisée. Le producteur récupérera l'entrée utilisateur de la console et enverra chaque nouvelle ligne sous forme de message à un serveur Kafka. Le consommateur récupère les messages pour une rubrique donnée et les imprime sur la console. Les composants producteur et consommateur dans ce cas sont vos propres implémentations de kafka-console-producer.shet kafka-console-consumer.sh.

Commençons par créer une Producer.javaclasse. Cette classe client contient une logique permettant de lire l'entrée utilisateur à partir de la console et d'envoyer cette entrée sous forme de message au serveur Kafka.

Nous configurons le producteur en créant un objet à partir de la java.util.Propertiesclasse et en définissant ses propriétés. La classe ProducerConfig définit toutes les différentes propriétés disponibles, mais les valeurs par défaut de Kafka sont suffisantes pour la plupart des utilisations. Pour la configuration par défaut, il suffit de définir trois propriétés obligatoires:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

Custom key/value objects

Similar to StringSerializer, Kafka provides serializers for other primitives such as int and long. In order to use a custom object for our key or value, we would need to create a class implementing org.apache.kafka.common.serialization.Serializer. We could then add logic to serialize the class into byte[]. We would also have to use a corresponding deserializer in our consumer code.

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, line); producer.send(rec); line = in.nextLine(); } in.close(); producer.close(); } } 

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

Dans le cas de l'exemple d'application, nous savons que le producteur utilise ByteArraySerializerpour la clé et StringSerializerpour la valeur. Du côté client, nous devons donc utiliser org.apache.kafka.common.serialization.ByteArrayDeserializerpour la clé et org.apache.kafka.common.serialization.StringDeserializerpour la valeur. La définition de ces classes en tant que valeurs pour KEY_DESERIALIZER_CLASS_CONFIGet VALUE_DESERIALIZER_CLASS_CONFIGpermettra au consommateur de désérialiser les byte[]types encodés envoyés par le producteur.

Enfin, nous devons définir la valeur du GROUP_ID_CONFIG. Il doit s'agir d'un nom de groupe au format chaîne. J'expliquerai plus en détail cette configuration dans une minute. Pour l'instant, il suffit de regarder le consommateur Kafka avec les quatre propriétés obligatoires définies: