22 juin 2016 Technique

Un soupçon de Kafka pour la suite Elastic : Partie 2

Par Suyog Rao

Bienvenue dans la deuxième partie de notre article consacré à Apache Kafka à la Suite Elastic. Dans la première partie, nous avons présenté des cas d'utilisation de Kafka pour la Suite Elastic et évoqué la conception d'un système s'appuyant sur des flux de données temporels et d'utilisateurs. Dans cet article, nous allons nous concentrer sur les aspects opérationnels et vous expliquer comment exécuter Kafka et Logstash en production afin d'ingérer de grandes quantités de données.

Planification de la capacité

Avant que nous n'attaquions le cœur du sujet, n'oubliez pas que nous parlons principalement de Kafka 0.8 et de Logstash 2.x, les versions stables actuelles. Il existe des versions plus récentes de Kafka, notamment la version 0.9 et plus proche de nous, la version 0.10, mais les principes que nous allons aborder s'appliquent à toutes les versions de Kafka. Sans plus attendre, voyons les différents systèmes en jeu.

Apache ZooKeeper : Kafka dépend de ZooKeeper (ZK) Les brokers en ont besoin pour former un cluster, la configuration des topics est stockée dans les nœuds ZK, etc. De plus, dans la version 2.x de Logstash, les offsets de l'entrée sont stockés dans ZK à mesure de leur validation. Dans les nouvelles versions de Kafka, les clients sont découplés : consommateurs et producteurs n'ont plus besoin de communiquer avec ZooKeeper. Dans Kafka 0.9 et 0.10, les offsets sont par défaut stockés dans des topics et non pas dans ZK. Pour autant, Zookeeper reste nécessaire à l'exécution des brokers de Kafka. De manière générale, nous vous conseillons d'exécuter 3 instances de ZK sur du matériel distinct pour obtenir une gestion de quorum saine. Pour plus d'informations sur l'utilisation de ZK, consultez cet excellent article issu de la documentation de Kafka. D'après notre expérience, ZK ne nécessite que peu de gestion une fois configuré. Il vous suffit de vous assurer que les instances sont actives et surveillées.

Kafka Brokers : le nombre de brokers Kafka dont vous avez besoin dépend généralement de votre stratégie de conservation et de réplication des données. Plus vous ajoutez de brokers, plus vous pouvez stocker de données dans Kafka. En matière de ressources, les E/S sont généralement le facteur limitant de Kafka. Les performances dépendront de la vitesse du disque et du cache du système de fichiers. Les lecteurs SSD et caches de système de fichiers de qualité permettent de traiter sans forcer des millions de messages par seconde. Vous pouvez utiliser topbeat pour surveiller ces informations.

Logstash : de combien d'instances de Logstash avez-vous besoin pour traiter les données dans Kafka ? Il n'y a pas de réponse universelle à cette question. Pour être franc, de nombreuses variables entrent en ligne de compte. De combien de filtres disposez-vous ? Quels coûts représentent-ils ? N'oubliez pas, il est très facile d'aboutir à un modèle Grok complexe de traitement des données comprenant plusieurs conditions. Quel volume de données attendez-vous ? Quelles sont vos sorties ? Comme vous pouvez le voir, vous devez recueillir de nombreuses informations pour arriver à une conclusion. Par ailleurs, il est bien souvent nécessaire de concentrer la planification de la capacité sur les sorties (systèmes externes) et non sur Logstash ! Ceci étant dit, sachez que vous pouvez étendre Logstash et Elasticsearch horizontalement sans difficulté. Nous vous conseillons donc de commencer petit et d'ajouter des données et des instances de LS à mesure que la quantité de données augmente.

En particulier, vous pouvez regrouper plusieurs instances des données de Kafka consommées par Logstash dans des groupes de consommateurs. Chaque groupe partage la charge, et les instances gèrent les données de manière exclusive : les messages ne seront consommés qu'une seule fois par un client du groupe. Cette conception se prête parfaitement à notre proposition précédente : commencez petit et développez-vous pas à pas. Les topics vous permettent de concevoir votre workflow de telle façon que les données qui ont besoin des transformations les plus complexes ou qui doivent être stockées sur une sortie plus lente sont isolées des données traitées plus rapidement. N'oubliez pas que dans Logstash, une seule sortie lente peut bloquer toutes les autres sorties configurées pour s'exécuter après celle-ci.

Elasticsearch : comme nous l'avons vu, Elasticsearch est une solution véritablement évolutive que vous pouvez étendre très simplement. La planification de la capacité pour Elasticsearch nécessite un article de blog à part entière et n'entre pas dans le cadre de celui-ci. Nous vous recommandons donc de parcourir les articles suivants, qui traitent du dimensionnement d'Elasticsearch : Sizing Elasticsearch (Dimensionnement d'Elasticsearch), Considérations sur les performances d'indexation dans Elasticsearch 2.0 et quelques autres.    

Conservation des données

Si votre instance de Kafka est à court d'espace disque, la durée de conservation des logs de Kafka est peut-être trop importante. Kafka permet de configurer cette durée selon deux critères : l'âge et la taille, dont les paramètres sont respectivement log.retention.bytes et log.retention.hours. Lorsque l'un de ces deux critères est satisfait, le broker Kafka commence à supprimer les messages, en commençant par le plus ancien, que Logstash l'ait consommé ou non.

Il est tentant de baser la récupération et la rétention des informations dans Elasticsearch sur les outils de rétention de Kafka. Toutefois, d'après notre expérience, il est préférable d'utiliser un outil comme Curator pour gérer les index temporels d'Elasticsearch et de configurer une stratégie de backup pour restaurer les index en cas de défaillance grave. La plupart du temps, les données conservées dans Kafka sont du contenu brut, non filtré et destiné à de multiples usages. Il est donc déconseillé de les associer de trop près à un composant en aval.

Gestion des offsets et garanties de livraison des messages

Extrait de la documentation de Kafka :

Chaque message des partitions se voit affecter un numéro séquentiel appelé offset qui l'identifie de manière unique. L'offset dépend du consommateur : en règle générale, le consommateur fait progresser son offset de manière linéaire à mesure qu'il parcourt les messages.

L'entrée de Kafka suit les informations relatives aux offsets à l'aide de ZooKeeper. Lorsque Logstash extrait les messages du topic et les traite, il procède régulièrement à une validation auprès de ZK. Ce processus est nommé pointage de contrôle ou validation. Par défaut, ZK réalise un pointage de contrôle toutes les minutes. Vous pouvez modifier cette fréquence à l'aide du paramètre auto_commit_interval_ms. Attention : en allongeant cette durée, vous augmentez les risques de perte de données en cas d'arrêt inopiné de Logstash ou de défaillance du processus. En revanche, en réduisant cette durée, vous augmentez le nombre d'écritures par client et pourriez donc submerger le cluster ZK.

ISi vous redémarrez Logstash, il commencera par lire les informations relatives aux offsets stockées dans ZK, puis récupérera les messages du point de validation précédent. Kafka est conçu pour suivre une sémantique de type au moins une fois : les messages ne sont jamais perdus, mais peuvent être livrés plusieurs fois. Cela signifie qu'il est possible que Logstash connaisse une défaillance alors que l'offset est encore en mémoire et non validé. Dans ce type de situation, il peut donc arriver que les messages soient livrés une nouvelle fois, ou pour le dire plus simplement, en double. Si cela peut poser problème, vous pouvez éviter cette duplication potentielle en générant et utilisant un identifiant unique dans un des champs de vos messages. Que vous imaginiez votre propre code pour créer ces identifiants ou utilisiez le filtre uuid de Logstash, vous devrez mettre en place cet élément avant que les messages n'arrivent dans Kafka. Du côté « expéditeur » de Logstash, vous pouvez mapper cet identifiant d'événement à l'option document_id du plug-in de sortie d'Elasticsearch. Cela signifie qu'Elasticsearch écrasera le document indexé qui dispose du même identifiant, ce qui est généralement préférable à la génération de plusieurs documents disposant du même contenu !

Cela peut également être utile si vous devez rejouer le contenu en cas de perte de données en aval. Vous pouvez utiliser un groupe de consommateurs différent pour relire les données au rythme de votre choix.

input {
  kafka {
    zk_connect => "kafka:2181"
    group_id => "logstash"
    topic_id => "apache_logs"
    consumer_threads => 16
  }
}
....
output {
  elasticsearch {
    document_id => "%{my_uuid}"
  }
}

my_uuid est un champ existant de l'événement.

Surveillance : comment puis-je détecter un retard ?

Un des points importants à surveiller lors de l'utilisation de Kafka réside dans le nombre de messages sauvegardés et en attente de consommation par Logstash. De nombreux outils permettent de surveiller ces informations. En voici quelques-uns :

Outil CLI intégré à Kafka

Outil simple en ligne de commande permettant de vérifier les offsets. Vous pouvez l'exécuter régulièrement par le biais d'une tâche cron et générer des alertes à l'aide du logiciel de votre choix.

/usr/bin/kafka-consumer-offset-checker --group logstash --topic apache_logs --zookeeper localhost:2181

Exemple de réponse :

Group    Topic       Pid Offset  logSize Lag     Owner
logstash apache_logs 0   145833  300000  154167  none
logstash apache_logs 1   145720  300000  154280  none
logstash apache_logs 2   145799  300000  154201  none
logstash apache_logs 3   146267  300000  153733  none

La colonne Lag indique le nombre de messages en retard.

JMX

JMX permet de mettre très simplement en place une surveillance de Kafka via JConsole. Pour surveiller Logstash avec JMX, activez les options Java supplémentaires suivantes avant de démarrer Logstash :

export LS_JAVA_OPTS="
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.port=3000
 -Dcom.sun.management.jmxremote.ssl=false
 -Dcom.sun.management.jmxremote.authenticate=false"

Si votre système s'exécute sur AWS, n'oubliez pas d'utiliser le nom d'hôte externe ou l'adresse IP du serveur.

-Djava.rmi.server.hostname=ec2-107-X-X-X.compute-1.amazonaws.com

La Suite Elastic

Eh bien oui, vous pouvez surveiller Kafka directement avec la Suite Elastic. Vous vous en doutez, il s'agit de l'option que nous privilégions ! Dans ce cas précis, nous utilisons le Beat conçu à cet effet par Dale McDiarmid : Kafkabeats. Ce Beat recueille les informations relatives aux offsets et d'autres informations, puis les stocke dans Elasticsearch. Vous pouvez ensuite analyser le retard des consommateurs avec Kibana. Associé à topbeat qui capture des statistiques système comme le débit du disque, ainsi que l'utilisation du processeur et de la mémoire, il constitue une solution efficace de surveillance de Kafka. Vous avez ainsi réuni au même endroit toutes les données permettant de convaincre votre patron de remplacer vos vieux disques rotatifs par des disques SSD flambant neufs ! Au fait, la version 5.0.0 fait encore mieux. Toutes ces informations essentielles (surveillance des applications et du système) sont réunies dans un seul Beat, nommé Metricbeat. Malin, non ?

Bien, revenons à Kafkabeat. Voici comment vous lancer :

  1. Clonez https://github.com/gingerwizard/kafkabeat
  2. Exécutez make dans le répertoire kafkabeat
  3. Déployez Kafkabeat sur vos brokers Kafka et exécutez-le avec la commande ./kafkabeat -c kafkabeat.yml
Vous collecterez ainsi des informations relatives aux offsets pour tous les topics de Kafka de ce broker et les indexerez dans Elasticsearch avec la structure de document suivante :
"@timestamp": "2016-06-22T01:00:43.033Z",
  "beat": {
    "hostname": "Suyogs-MBP-2",
    "name": "Suyogs-MBP-2"
  },
  "type": "consumer",
  "partition": 0,
  "topic": "apache_logs_test",
  "group": "logstash",
  "offset": 3245
  "lag": 60235
}

Une fois que les données sont dans Elasticsearch, vous pouvez les visualiser très simplement avec Kibana. Je viens de mettre en ligne Kibana 5.0.0-alpha3 qui permet de créer un tableau de bord pour générer des graphiques illustrant le retard des consommateurs au fil du temps.

Kibana_Kafka2.png

Kafka Manager

Kafka Manager est un outil avec interface graphique et open source qui permet de gérer intégralement Kafka. Vous pouvez créer des topics, suivre des indicateurs, gérer les offsets et bien plus encore. Attention, la compilation et la génération de cet outil sont assez longues. Toutefois, si vous avez besoin d'une solution de gestion complète de Kafka, n'hésitez pas à y jeter un œil ! Une fois l'outil démarré, suivez les instructions pour créer un cluster à surveiller en le faisant pointer vers votre instance de ZK.

Vue des consommateurs

kafka UI.png

Vue des topics

Kafka UI 2.png

Conclusion

Dans cet article, nous vous avons fourni des conseils pour mieux utiliser Kafka et Logstash, et vous permettre d'ingérer les données issues de plusieurs sources dans Elasticsearch. Mais ce n'est pas tout !

L'année dernière, Kafka a publié les versions 0.9.0 et plus récemment 0.10.0. Cette dernière version contient de nouvelles fonctionnalités comme une sécurité intégrée, une nouvelle implémentation de consommateurs, des quotas de données, etc. Nous avons mis à jour l'entrée et la sortie de Logstash pour vous permettre de profiter de ces fonctions ! Dans notre prochain article, nous étudierons ces nouvelles fonctions de Kafka et en particulier la mise en place d'une sécurité de bout en bout avec Kafka et la Suite Elastic.

À la prochaine !