Conçu pour le temps réel: messagerie Big Data avec Apache Kafka, partie 2

Dans la première moitié de cette introduction de JavaWorld à Apache Kafka, vous avez développé quelques applications de production / consommation à petite échelle à l'aide de Kafka. A partir de ces exercices, vous devez vous familiariser avec les bases du système de messagerie Apache Kafka. Dans cette seconde moitié, vous apprendrez à utiliser des partitions pour répartir la charge et faire évoluer votre application horizontalement, en traitant jusqu'à des millions de messages par jour. Vous apprendrez également comment Kafka utilise les décalages de messages pour suivre et gérer le traitement de messages complexes, et comment protéger votre système de messagerie Apache Kafka contre les pannes en cas de panne d'un consommateur. Nous développerons l'exemple d'application de la partie 1 pour les cas d'utilisation de publication-abonnement et point à point.

Partitions dans Apache Kafka

Les sujets de Kafka peuvent être subdivisés en partitions. Par exemple, lors de la création d'une rubrique nommée Démo, vous pouvez la configurer pour qu'elle comporte trois partitions. Le serveur créerait trois fichiers journaux, un pour chacune des partitions de démonstration. Lorsqu'un producteur publiait un message dans la rubrique, il attribuait un ID de partition à ce message. Le serveur ajouterait alors le message au fichier journal pour cette partition uniquement.

Si vous démarrez ensuite deux consommateurs, le serveur peut affecter les partitions 1 et 2 au premier consommateur et la partition 3 au deuxième consommateur. Chaque consommateur lirait uniquement à partir de ses partitions attribuées. Vous pouvez voir la rubrique Démo configurée pour trois partitions dans la figure 1.

Pour étendre le scénario, imaginez un cluster Kafka avec deux courtiers, logés dans deux machines. Lorsque vous partitionnez la rubrique de démonstration, vous la configurez pour avoir deux partitions et deux répliques. Pour ce type de configuration, le serveur Kafka attribuerait les deux partitions aux deux courtiers de votre cluster. Chaque courtier serait le chef de l'une des partitions.

Lorsqu'un producteur publiait un message, il était envoyé au chef de partition. Le leader prendrait le message et l'ajouterait au fichier journal sur la machine locale. Le deuxième courtier répliquerait passivement ce journal de validation sur sa propre machine. Si le chef de partition tombait en panne, le deuxième courtier deviendrait le nouveau chef et commencerait à traiter les demandes des clients. De la même manière, lorsqu'un consommateur envoyait une requête à une partition, cette requête irait d'abord au chef de partition, qui renverrait les messages demandés.

Avantages du partitionnement

Considérez les avantages du partitionnement d'un système de messagerie basé sur Kafka:

  1. Évolutivité : dans un système avec une seule partition, les messages publiés dans une rubrique sont stockés dans un fichier journal, qui existe sur une seule machine. Le nombre de messages pour une rubrique doit tenir dans un seul fichier journal de validation et la taille des messages stockés ne peut jamais dépasser l'espace disque de cette machine. Le partitionnement d'une rubrique vous permet de mettre à l'échelle votre système en stockant des messages sur différentes machines dans un cluster. Si vous souhaitez stocker 30 gigaoctets (Go) de messages pour la rubrique Démo, par exemple, vous pouvez créer un cluster Kafka de trois machines, chacune avec 10 Go d'espace disque. Ensuite, vous configurez le sujet pour avoir trois partitions.
  2. Équilibrage de la charge du serveur : avoir plusieurs partitions vous permet de répartir les demandes de messages entre les courtiers. Par exemple, si vous avez une rubrique traitant 1 million de messages par seconde, vous pouvez la diviser en 100 partitions et ajouter 100 courtiers à votre cluster. Chaque courtier serait le leader de la partition unique, chargé de répondre à seulement 10 000 demandes de clients par seconde.
  3. Équilibrage de la charge du consommateur : similaire à l'équilibrage de la charge du serveur, l'hébergement de plusieurs consommateurs sur une machine différente vous permet de répartir la charge du consommateur. Supposons que vous vouliez consommer 1 million de messages par seconde à partir d'un sujet avec 100 partitions. Vous pouvez créer 100 consommateurs et les exécuter en parallèle. Le serveur Kafka attribuerait une partition à chacun des consommateurs et chaque consommateur traiterait 10 000 messages en parallèle. Puisque Kafka affecte chaque partition à un seul consommateur, dans la partition, chaque message serait consommé dans l'ordre.

Deux façons de partitionner

Le producteur est responsable de décider vers quelle partition un message ira. Le producteur a deux options pour contrôler cette affectation:

  • Partitionneur personnalisé : vous pouvez créer une classe implémentant l' org.apache.kafka.clients.producer.Partitionerinterface. Cette personnalisation Partitionerimplémentera la logique métier pour décider où les messages sont envoyés.
  • DefaultPartitioner : si vous ne créez pas de classe de partitionneur personnalisée, la org.apache.kafka.clients.producer.internals.DefaultPartitionerclasse sera utilisée par défaut . Le partitionneur par défaut est assez bon dans la plupart des cas, offrant trois options:
    1. Manuel : lorsque vous créez un ProducerRecord, utilisez le constructeur surchargé new ProducerRecord(topicName, partitionId,messageKey,message)pour spécifier un ID de partition.
    2. Hashing (sensible à la localité) : lorsque vous créez un ProducerRecord, spécifiez a messageKey, en appelant new ProducerRecord(topicName,messageKey,message). DefaultPartitionerutilisera le hachage de la clé pour s'assurer que tous les messages pour la même clé vont au même producteur. C'est l'approche la plus simple et la plus courante.
    3. Pulvérisation (Équilibrage de charge aléatoire) : Si vous ne voulez pas contrôler les messages de partition, appelez simplement new ProducerRecord(topicName, message)pour créer votre fichier ProducerRecord. Dans ce cas, le partitionneur enverra des messages à toutes les partitions en mode tourniquet, assurant une charge serveur équilibrée.

Partitionner une application Apache Kafka

Pour l'exemple simple producteur / consommateur de la partie 1, nous avons utilisé a DefaultPartitioner. Nous allons maintenant essayer de créer un partitionneur personnalisé à la place. Pour cet exemple, supposons que nous ayons un site de vente au détail que les consommateurs peuvent utiliser pour commander des produits partout dans le monde. Sur la base de l'utilisation, nous savons que la plupart des consommateurs se trouvent aux États-Unis ou en Inde. Nous voulons partitionner notre application pour envoyer des commandes des États-Unis ou de l'Inde à leurs propres consommateurs respectifs, tandis que les commandes de partout ailleurs iront à un troisième consommateur.

Pour commencer, nous allons créer un CountryPartitionerqui implémente l' org.apache.kafka.clients.producer.Partitionerinterface. Nous devons implémenter les méthodes suivantes:

  1. Kafka appellera configure () lorsque nous initialiserons la Partitionerclasse, avec une Mapdes propriétés de configuration. Cette méthode initialise des fonctions spécifiques à la logique métier de l'application, comme la connexion à une base de données. Dans ce cas, nous voulons un partitionneur assez générique qui prend countryNamecomme propriété. Nous pouvons ensuite utiliser configProperties.put("partitions.0","USA")pour mapper le flux de messages vers des partitions. À l'avenir, nous pourrons utiliser ce format pour changer les pays qui auront leur propre partition.
  2. L' ProducerAPI appelle partition () une fois pour chaque message. Dans ce cas, nous l'utiliserons pour lire le message et analyser le nom du pays à partir du message. Si le nom du pays est dans le countryToPartitionMap, il reviendra partitionIdstocké dans le Map. Sinon, il hachera la valeur du pays et l'utilisera pour calculer la partition vers laquelle il doit aller.
  3. Nous appelons close () pour arrêter le partitionneur. L'utilisation de cette méthode garantit que toutes les ressources acquises lors de l'initialisation sont nettoyées lors de l'arrêt.

Notez que lorsque Kafka appelle configure(), le producteur Kafka transmettra à la Partitionerclasse toutes les propriétés que nous avons configurées pour le producteur . Il est essentiel de lire uniquement les propriétés qui commencent par partitions., de les analyser pour obtenir le partitionIdet de stocker l'ID dans countryToPartitionMap.

Vous trouverez ci-dessous notre implémentation personnalisée de l' Partitionerinterface.

Liste 1. CountryPartitioner

 public class CountryPartitioner implements Partitioner { private static Map countryToPartitionMap; public void configure(Map configs) { System.out.println("Inside CountryPartitioner.configure " + configs); countryToPartitionMap = new HashMap(); for(Map.Entry entry: configs.entrySet()){ if(entry.getKey().startsWith("partitions.")){ String keyName = entry.getKey(); String value = (String)entry.getValue(); System.out.println( keyName.substring(11)); int paritionId = Integer.parseInt(keyName.substring(11)); countryToPartitionMap.put(value,paritionId); } } } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.availablePartitionsForTopic(topic); String valueStr = (String)value; String countryName = ((String) value).split(":")[0]; if(countryToPartitionMap.containsKey(countryName)){ //If the country is mapped to particular partition return it return countryToPartitionMap.get(countryName); }else { //If no country is mapped to particular partition distribute between remaining partitions int noOfPartitions = cluster.topics().size(); return value.hashCode()%noOfPartitions + countryToPartitionMap.size() ; } } public void close() {} } 

La Producerclasse de la liste 2 (ci-dessous) est très similaire à notre producteur simple de la partie 1, avec deux changements marqués en gras:

  1. Nous définissons une propriété de configuration avec une clé égale à la valeur de ProducerConfig.PARTITIONER_CLASS_CONFIG, qui correspond au nom complet de notre CountryPartitionerclasse. Nous avons également défini countryNamesur partitionId, mappant ainsi les propriétés auxquelles nous voulons passer CountryPartitioner.
  2. We pass an instance of a class implementing the org.apache.kafka.clients.producer.Callback interface as a second argument to the producer.send() method. The Kafka client will call its onCompletion() method once a message is successfully published, attaching a RecordMetadata object. We'll be able to use this object to find out which partition a message was sent to, as well as the offset assigned to the published message.

Listing 2. A partitioned producer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");  configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName()); configProperties.put("partition.1","USA"); configProperties.put("partition.2","India");  org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, null, line); producer.send(rec, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset()); ; } }); line = in.nextLine(); } in.close(); producer.close(); } } 

Assigning partitions to consumers

The Kafka server guarantees that a partition is assigned to only one consumer, thereby guaranteeing the order of message consumption. You can manually assign a partition or have it assigned automatically.

If your business logic demands more control, then you'll need to manually assign partitions. In this case you would use KafkaConsumer.assign() to pass a list of partitions that each consumer was interested in to the Kakfa server.

Having partitions assigned automatically is the default and most common choice. In this case, the Kafka server will assign a partition to each consumer, and will reassign partitions to scale for new consumers.

Supposons que vous créez un nouveau sujet avec trois partitions. Lorsque vous démarrez le premier consommateur pour la nouvelle rubrique, Kafka attribuera les trois partitions au même consommateur. Si vous démarrez ensuite un deuxième consommateur, Kafka réaffectera toutes les partitions, attribuant une partition au premier consommateur et les deux partitions restantes au deuxième consommateur. Si vous ajoutez un troisième consommateur, Kafka réassignera les partitions, de sorte que chaque consommateur se voit attribuer une seule partition. Enfin, si vous démarrez les quatrième et cinquième consommateurs, trois des consommateurs auront une partition attribuée, mais les autres ne recevront aucun message. Si l'une des trois partitions initiales tombe en panne, Kafka utilisera la même logique de partitionnement pour réaffecter la partition de ce consommateur à l'un des consommateurs supplémentaires.