22 6월 2016 엔지니어링

Elastic Stack에는 Kafka면 충분합니다 - 2부

By Suyog Rao

Apache Kafka 및 Elastic Stack 블로그 포스트 2부입니다. 지난 포스트에서는 Elastic Stack을 위한 Kafka의 사용사례를 소개하고 시간 기반 및 사용자 기반 데이터 흐름을 위한 시스템 설계에 대해 설명했습니다. 이번 글에서는 작동 측면, 즉, Kafka 및 Logstash를 프로덕션에서 실행하여 대량 데이터를 취하는 팁을 소개합니다.

용량 계획

더 깊이 들어가기 전에 안정적인 최신 버전인 Kafka 0.8과 Logstash 2.x를 다루게 됨을 알려드립니다. 새로운 버전의 Kafka, 0.9와 0.10이 최근 나왔지만 여기서 설명하는 핵심 개념은 모든 버전의 Kafka에 적용 가능합니다. 먼저 다음의 여러 관련 시스템에 대해 설명하는 것으로부터 시작하겠습니다.

Apache ZooKeeper: Kafka는 ZooKeeper (ZK)에 종속성을 갖습니다. 브로커가 클러스터 구성을 위해 이를 필요로 하며, 항목 구성이 ZK 노드 등에 저장됩니다. 또한 Logstash 2.x 버전에서 입력 오프셋은 확인되는 대로 ZK에 저장됩니다. 최신 버전의 Kafka는 클라이언트 - 소비자(consumers)와 생산자(producers) - 를 분리하여 ZooKeeper와 통신할 필요가 없도록 했습니다. Kafka 0.9 및 0.10에서는 오프셋이 ZK가 아닌 토픽(topic)에 기본적으로 저장됩니다. 두 방법 모두 ZooKeeper가 있어야만 Kafka 브로커를 실행할 수 있습니다. 일반적으로 3개의 ZK 인스턴스를 별도 하드웨어에서 모두 실행하여 쿼럼 구성(quorum configuration)을 하는 것이 좋습니다. ZK 작동에 대한 자세한 내용은 Kafka 문서의 이 훌륭한 섹션 을 참조하십시오. 당사 경험에 의하면 ZK 자체는 설정된 후 많은 지원을 필요로 하지 않습니다. 사용자는 인스턴스가 실행되고 모니터링 되는지 확인하기만 하면 됩니다.

Kafka Brokers: 일반적으로 필요한 Kafka 브로커의 수는 데이터 보존 및 복제 전략에 따라 달라집니다. 많은 수의 브로커를 추가할수록 많은 양의 데이터를 Kafka에 저장할 수 있습니다. 리소스 측면에서 Kafka는 대개 IO에 구속됩니다. 디스크 속도와 파일 시스템 캐시에 따라 성능이 제한되며, 뛰어난 SSD 드라이브와 파일 시스템 캐시는 초당 수백만 개의 메시지를 손쉽게 지원할 수 있도록 합니다. 사용자는 topbeat 를 통해 이러한 정보를 모니터링할 수 있습니다.

Logstash: Kafka의 데이터를 처리하려면 몇 개의 Logstash 인스턴스가 필요할까요? 이는 다양한 변수에 따라 달라지므로 정확한 수치를 예상하기가 어려운 것이 사실입니다. 몇 개의 필터를 사용 중인가? 필터의 가격이 어떻게 되는가? 등의 질문에 대한 답이 필요합니다. 가장 간단한 방법은 데이터 처리를 위해 여러 개의 조건문을 사용하는 복잡한 Grok 패턴을 사용해 끝내버리는 것입니다. 예상되는 데이터 용량이 어느 정도입니까? 각 출력이 무엇입니까? 보시는 것처럼 수치를 제공하려면 먼저 많은 정보를 수집해야 합니다. 용량 계획 시 중점을 두어야 하는 것은 Logstash 자체가 아닌 출력(외부 시스템)인 경우가 많습니다! 그런 까닭에 Logstash와 Elasticsearch는 수평적인 확장이 쉽습니다. 따라서 작은 규모로 시작해서 데이터가 증가함에 따라 노드 또는 추가 LS 인스턴스를 추가하는 것이 좋습니다

특히 Logstash가 소비하는 Kafka 데이터의 경우, 여러 개의 인스턴스를 소비자 그룹별로 묶을 수 있습니다. 각 그룹은 로드를 공유하며, 인스턴스가 데이터를 독점 처리합니다. 즉, 메시지가 그룹 내 한 클라이언트에 의해 한 번만 소비됩니다. 이 설계는 '작은 규모로 시작하여 반복해서 확장한다'는 원래의 명제를 제대로 충족합니다. topic을 사용하면 복잡한 변환이 필요한 데이터나 빠르게 이동하는 데이터를 분리하여 느린 출력에 저장되어야 하는 작업의 흐름들을 설계할 수 있습니다. Logstash에서는 하나의 느린 출력이 그 이후에 실행되도록 구성된 다른 모든 출력을 차단할 수 있음을 기억하십시오.

Elasticsearch: 앞에서 설명한 것처럼 Elasticsearch는 확장이 쉽다는 점에서 실제로 탄력적(elastic)입니다. Elasticsearch의 용량 계획은 블로그 게시글 하나가 필요한 주제로 이 글의 범위를 벗어납니다. Elasticsearch의 확장 및 크기 조정 개념을 다루는 게시글인Elasticsearch 크기 조정, Elasticsearch 색인화에 대한 성능 고려 사항기타 사항을 읽어보시기 바랍니다.    

데이터 보존

Kafka 인스턴스를 위한 디스크 공간이 부족한 경우, Kafka 로그 보존 시간이 너무 긴 것이 원인일 가능성이 높습니다. log.retention.byteslog.retention.hours 브로커 설정을 사용하여 경과 시간과 크기의 두 가지 기준에 따라 데이터 보존을 구성할 수 있습니다. 이들 두 기준 중 하나가 충족되면 Logstash가 소비(consumed)했는지 여부에 관계 없이 가장 오래된 것부터 시작해서 Kafka 브로커가 메시지를 삭제하기 시작합니다.

Kafka의 보존 도구(retention tools)를 사용하면 Elasticsearch를 위한 데이터 복구 및 보존을 효과적으로 설계할 수 있습니다. Curator 와 같은 도구를 사용해서 Elasticsearch의 시간 기준 인덱스들을 관리하고 재해 발생 시 인덱스 복원을 위한 스냅샷 전략까지 구성할 수 있습니다. 대부분의 경우, Kafka의 데이터는 필터링되지 않은 원시 컨텐츠이고 여러 출력을 가지고 있으므로 하나의 다운스트림 구성 요소와 긴밀히 연결하지 않는 것이 좋습니다.

오프셋 관리 및 메시지 전달 보장

Kafka 문서에서 발췌:

파티션의 메시지에는 해당 파티션에서 각 메시지를 고유하게 식별하는 오프셋이라는 순차적 ID 번호가 각각 할당됩니다. 오프셋은 소비자에 의해 제어됩니다. 일반적으로 소비자가 메시지를 읽으면서 연속적으로 오프셋을 진행시킵니다.

Kafka 입력은 ZooKeeper를 통해 오프셋 정보를 추적합니다. Logstash는 토픽에서 메시지를 가져와 처리하면서 정기적으로 ZK에 커밋합니다. 이 프로세스를 체크 포인팅 또는 커밋이라고 합니다. 기본적으로 Logstash는 매분 ZK에 체크 포인트를 설정합니다. 이 빈도는 auto_commit_interval_ms 설정을 통해 제어할 수 있습니다. 이 시간을 길게 설정하면 Logstash를 강제로 중지할 때 또는 프로세스가 충돌할 때 데이터가 손실될 수 있습니다. 반대로 이 시간을 짧게 설정하면 클라이언트당 쓰기 횟수가 늘어나 ZK 클러스터에 과부하가 걸릴 수 있습니다.

Logstash를 재시작하면 먼저 ZK에 저장된 오프셋 정보를 읽고 이전 커밋 지점에서 메시지를 가져오기 시작합니다. Kafka는 최소 1회(At-least-once) 의미 체계를 따르도록 설계되었습니다. 메시지 무손실이 보장되지만 메시지가 재전달될 수 있습니다. 즉, 오프셋이 아직 메모리에 있고 커밋되지 않은 상태에서 Logstash와 충돌하는 경우가 있을 수 있습니다. 이 경우, 메시지가 재전달되거나 복제될 가능성이 있습니다 . 적용하고자 하는 사례에서 이것이 문제가 되는 경우, 메시지에 고유한 ID 필드를 생성/사용하여 복제 가능성을 임시로 막을 수 있습니다. 고유한 코드를 작성하여 이러한 ID를 생성하거나 Logstash의 uuid 필터를 사용이 가능하며, 이 모든 경우 메시지가 Kafka에 입력되기 전에 이를 수행해야 합니다. Logstash의 "배송자(shipper)" 측에서 이 이벤트 ID를 Elasticsearch 출력 플러그인의 document_id 옵션에 매핑할 수 있습니다. 즉, 색인 시 같은 ID를 가진 문서를 Elasticsearch가 덮어쓰며, 이는 같은 컨텐츠의 문서를 여러 개 생산하는 것보다 대개 낫습니다!

또한 이는 데이터 다운스트림을 잃은 경우에 컨텐츠를 재생하려고 할 때 유용합니다. 다른 소비자 그룹을 사용하여 그 그룹의 페이스 대로 데이터를 재생할 수 있습니다.

input {
  kafka {
    zk_connect => "kafka:2181"
    group_id => "logstash"
    topic_id => "apache_logs"
    consumer_threads => 16
  }
}
....
output {
  elasticsearch {
    document_id => "%{my_uuid}"
  }
}

my_uuid 필드가 이벤트에 존재하는 경우 설정.

Kafka 사용 시 모니터링할 중요한 것 중 하나는 Logstash의 소비를 기다리는 백업 메시지 수입니다. 이 정보를 모니터링하는 다양한 도구가 있으며, 그 중 일부는 아래와 같습니다.

Kafka와 함께 제공되는 CLI 도구

오프셋 점검을 위한 간단한 커맨들 라인 도구입니다. cronjob을 이용해 정기적인 자동 작업으로 실행할 수 있고 필요한 알림 소프트웨어를 통해 알림을 받을 수 있습니다.

/usr/bin/kafka-consumer-offset-checker --group logstash --topic apache_logs --zookeeper localhost:2181

샘플 응답:

Group    Topic       Pid Offset  logSize Lag     Owner
logstash apache_logs 0   145833  300000  154167  none
logstash apache_logs 1   145720  300000  154280  none
logstash apache_logs 2   145799  300000  154201  none
logstash apache_logs 3   146267  300000  153733  none

Lag(지연) 열에는 지연되고 있는 메시지 수가 표시됩니다.

JMX

Kafka는 JMX를 통해 JConsole에서 쉽게 모니터링할 수 있습니다. JMX를 연결하여 Logstash를 모니터링하려면 Logstash를 시작하기 전에 이러한 추가 Java 옵션을 설정하면 됩니다.

export LS_JAVA_OPTS="
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.port=3000
 -Dcom.sun.management.jmxremote.ssl=false
 -Dcom.sun.management.jmxremote.authenticate=false"

AWS에서 실행하는 경우에는 서버의 IP 또는 외부 호스트 이름을 반드시 사용해야 합니다.

-Djava.rmi.server.hostname=ec2-107-X-X-X.compute-1.amazonaws.com

Elastic Stack

네, 그렇습니다. Elastic Stack 자체를 사용해서 Kafka를 모니터링할 수 있습니다. 이는 저희가 가장 좋아하는 옵션입니다! 이 특정한 사례에서는 Dale McDiarmid가 작성한 Kafkabeat라는 전용 Beat를 사용합니다.이 Beat는 오프셋 및 기타 항목 정보를 수집하여 Elasticsearch에 저장합니다. 그런 다음 Kibana를 사용하여 소비자 지연(Lag)을 분석할 수 있습니다. 디스크 처리량, CPU 및 메모리와 같은 시스템 수준 통계를 수집하는 topbeat과 함께 Kafka 모니터링을 위한 강력한 솔루션이 있습니다. 따라서 이제 모든 데이터가 한 곳에 있으므로 오래된 회전 디스크를 최신 SSD로 교체하도록 상사를 설득할 수 있습니다! 그리고 5.0.0에서는 더욱 좋은 점이 있습니다. 애플리케이션 및 시스템 수준 모니터링을 한 이 모든 중요 정보가 Metricbeat라는 하나의 Beat로 통합됩니다. 정말 편리할 것 같지 않습니까?

좋습니다. 이제 Kafkabeat로 돌아가겠습니다. 시작하는 방법은 다음과 같습니다.

  1. https://github.com/gingerwizard/kafkabeat를 Clone 합니다.
  2. kafkabeat 디렉터리에서 make를 실행합니다.
  3. Kafkabeat를 Kafka 브로커에 배포하고 다음과 같이 실행합니다:./kafkabeat -c kafkabeat.yml
그러면 이 브로커와 색인에 대한 모든 Kafka 항목에서 오프셋 정보를 다음과 같은 문서 구조로 Elasticsearch에 수집합니다.
"@timestamp": "2016-06-22T01:00:43.033Z",
  "beat": {
    "hostname": "Suyogs-MBP-2",
    "name": "Suyogs-MBP-2"
  },
  "type": "consumer",
  "partition": 0,
  "topic": "apache_logs_test",
  "group": "logstash",
  "offset": 3245
  "lag": 60235
}

데이터가 Elasticsearch에 있으면 Kibana로 쉽게 시각화할 수 있습니다. 필자는 막 출시된 Kibana 5.0.0-alpha3을 사용하여 timestamp에 따라 소비자lag 필드를 그래프로 보여주는 대시보드를 생성했습니다.

Kibana_Kafka2.png

Kafka Manager

Kafka 전체를 관리하기 위한 오픈 소스 UI 도구 입니다. 토픽 생성, 메트릭 추적, 오프셋 관리 등 작업을 수행할 수 있습니다. 이 도구는 컴파일하고 빌드하는 데 시간이 좀 걸립니다. 하지만 Kafka를 위한 통합 관리 솔루션이 필요한 경우 사용해볼 가치가 있습니다! Kafka Manager 도구를 시작한 후 지침에 따라 ZK 인스턴스를 모니터링할 새 클러스터를 생성하십시오.

소비자(consumer) 뷰

kafka UI.png

토픽 뷰

Kafka UI 2.png

결론

이 게시물에서는 Kafka 및 Logstash를 작동하여 여러 소스의 데이터를 Elasticsearch로 수집하기 위한 팁을 설명했습니다. 하지만 더 많은 내용이 있습니다!

작년에 Kafka 0.9.0 버전이 출시되었고, 최근 0.10.0 이 출시되었습니다. 최근 버전은 보안기능 내장, 새 소비자 구현, 데이터 할당량 등 새로운 기능이 갖춰져 있습니다. Logstash 입력 및 출력 API가 업데이트되었으므로 Logstash로 이러한 기능을 사용할 수 있습니다! 다음 포스트에서는 특히 Kafka와 Elastic Stack을 사용한 종단 간 보안을 포함하여 Kafka의 새로운 기능을 다룰 예정입니다.

그럼 다음 글에서 뵙겠습니다!