12 5월 2016 엔지니어링

Elastic Stack에는 Kafka면 충분합니다 - 1부

By Suyog RaoTal Levy

Elastic StackApache Kafka는 로그/이벤트 처리 영역에서 긴밀한 연결 관계를 맺고 있습니다. 다수의 기업이 대용량 데이터의 저장 및 처리를 위한 전송 계층으로 Kafka를 사용합니다. 신속한 검색 및 분석 기능을 갖춘 Elasticsearch가 유명해지기 전에 이 분야에 소개된 수많은 개발 프로젝트에서 Kafka는 데이터 스테이징과 관련하여 중요한 역할을 수행해왔습니다. 우리는 블로그 포스트 시리즈를 통해 Elastic Stack에 통합되는 Kafka의 설정 및 관리 방법을 자세히 살펴보고자 합니다. 특히, 대용량 환경에서의 Kafka 및 Logstash 운영 경험을 논의할 것입니다.

Note: 참고: 이 게시물에 언급한 Kafka는 0.8.x 버전입니다. 최신 0.9.x 버전의 Kafka에서는 일부 기능이 변경되었으나, 0.8.x가 여전히 매우 인기가 높고 널리 사용되고 있습니다.

기본 사항

먼저 몇 가지 기본 사항을 알아보겠습니다. Kafka 문서에 따르면:

Apache Kafka는 발행-구독 메시지를 분산 커밋 로그로 재해석한 시스템입니다

Kafka는 대용량의 이벤트 데이터를 처리하기 위해 LinkedIn에서 개발되었습니다. 다수의 다른 메시지 브로커들과 마찬가지로, Kafka도 데이터를 토픽으로 그룹화해서 발행-소비 및 큐 의미 체계를 처리합니다. 애플리케이션과 마찬가지로 토픽에 작성하고 토픽에서 소모됩니다. Kafka를 구분짓는 중요한 차이 또는 설계상의 전환은 복잡성이 생산자(producer)에서 소비자(consumers)로 이전되며, 파일 시스템 캐시를 대량으로 사용한다는 점입니다. 이러한 설계 결정은 분산되는 것 부터 시작해서 성공적인 대용량 스트리밍 사용사례를 만들어내고 있습니다.

Logstash는 Java API를 사용하여 Kafka와 원시적으로 통합합니다. 입력출력 플러그인을 둘 다 제공하므로 Logstash에서 Kafka로 직접 쓰고 읽을 수 있습니다. 시작 구성은 매우 단순합니다.

kafka {
   zk_connect => "hostname:port"
   topic_id => "apache_logs"
   ...
}

Kafka는 Apache ZooKeeper에 의존성을 가지므로, Kafka를 실행하려면 ZooKeeper cluster에 대한 액세스가 필요합니다. 자세한 내용은 나중에 설명하겠습니다.

Elastic Stack과 Kafka를 언제 함께 사용해야 하는가?

시나리오 1: 이벤트 스파이크

로그 데이터 또는 이벤트 기반 데이터는 일관적이거나 예측 가능한 볼륨 또는 유동률을 가지는 경우가 드뭅니다. 금요일 밤에 애플리케이션을 업그레이드한 경우를 생각해 봅시다(금요일에 업그레이드해서는 안 되는 이유는 다른 블로그를 참조하십시오). 배포한 애플리케이션에 정보가 과도하게 기록되고 logging 인프라에 장애를 유발하는 안 좋은 버그가 있다고 해 봅시다. 이러한 데이터 스파이크나 포화는 게임이나 전자상거래 산업 분야 등의 멀티테넌트 사용사례에서는 꽤 흔한 일입니다. 이 시나리오에서는 Logstash와 Elasticsearch에서 이러한 급증이 일어나지 않도록 방지하기 위해 Kafka와 같은 메시지 브로커가 사용됩니다.

이 아키텍처에서의 처리는 일반적으로 2개의 스테이지로 나뉩니다. ShipperIndexer 스테이지입니다. 다른 데이터 소스로부터 데이터를 수신하는 Logstash 인스턴스는 대용량 처리를 수행하지 않기 때문에 Shipper 라고 합니다. Shipper의 책임은 Kafka 토픽, 즉 생산자로 수신되는 데이터를 즉시 유지하는 것입니다. 반면, 비대한 Logstash 인스턴스는 고유 정체 속도로 데이터를 소비하는 동시에 Grok, DNS 조회, Elasticsearch 인덱싱과 같은 대량 변환을 실행합니다. 이 인스턴스를 Indexer라고 합니다.

Logstash는 일반적으로 Shipper로 사용되었지만, 우리는 특화된 Shipper로 사용 가능한 Elastic Beats 제품 패키지를 사용할 것을 강력하게 권장합니다. 예를 들어, Filebeat는 Logstash 수신기를 통해 파일을 추적하고 Kafka로 전송할 수 있는 리소스에 친화적인 라이트급 에이전트입니다.

Kafka

참고: 현재 Filebeat은 Kafka에 직접 쓰기를 지원하지 않지만, 5.0.0(현재 시험판 단계)부터는 Kafka를 출력 중 하나로 구성할 수 있게 됩니다. 이러한 개선은 beats를 사용하여 데이터를 수집하는 사용사례의 아키텍처를 간소화합니다. 알파 릴리스에서 새로운 기능을 체험해 보고 피드백을 보내주세요. 소문에 따르면 이 테스트에 도움이 되신 분들께 Elastic{ON}17 무료 이용권이 제공된다고 합니다.

시나리오 2: Elasticsearch 연결 불가

다른 시나리오를 생각해 보겠습니다. 현재 다중 노드 Elasticsearch cluster를 1.7에서 전체 cluster 재시작이 필요한 2.3으로 업그레이드할 계획이라고 생각해 봅시다. 또는 Elasticsearch가 예상보다 오래 다운된 경우를 생각해 봅시다. Elasticsearch로 유입되는 데이터 소스 스트리밍이 여러 개이고 기존 데이터 소스를 중단시킬만한 여력이 없다면, Kafka와 같은 메시지 브로커가 도움이 될 것입니다. Logstash shipper 및 indexer 아키텍처에 Kafka를 사용하면 단말 노드에서 데이터를 계속 스트리밍하고 Kafka에 잠시 대기시킬 수도 있습니다. Elasticsearch가 다시 작동하면 Logstash는 중단된 지점부터 다시 시작되면서 데이터 백로그를 따라잡을 수 있습니다. 사실 이러한 방식은 우리 소프트웨어의 탄력적인(Elastic) 성질과 아주 잘 맞습니다. 동일한 Kafka 토픽을 소비하는 Logstash 인스턴스를 추가해서 처리 및 인덱싱 성능을 일시적으로 늘릴 수 있기 때문입니다. Elasticsearch에 노드를 더 추가할 수도 있습니다. 지나친 관리가 필요없는 무한한 확장성은 Elasticsearch의 핵심 기능 중 하나입니다. 데이터 백로그를 따라잡으면 원래 인스턴스 수로 규모를 축소할 수 있습니다.

안티 패턴: Elastic Stack과 Kafka를 함께 사용할 수 없는 경우

Kafka를 함께 사용할 수 있는 경우를 아는 것이 중요한 것처럼 함께 사용할 수 없는 경우를 아는 것도 중요합니다. 모든 것에는 희생이 따릅니다. Kafka는 생산 환경에 존재하는 또 다른 소프트웨어에 지나지 않습니다. 즉 모니터링, 알림에 대한 대응, 업그레이드를 비롯하여 생산 환경에서 소프트웨어를 성공적으로 실행하는 데 필요한 모든 작업을 요구합니다. 여러분이 모든 생산 소프트웨어를 모니터링하는것 처럼 말입니다.

중앙 집중식 로그 관리에서는 로그를 단말 노드로 가급적 빨리 전송해야 하는 빈 문서가 작성되는 경우가 많습니다. 이는 일부 사용사례에서 간혹 발생하는 경우가 있지만, 자신의 환경에 정말 필요한 요건인지 자문할 필요가 있습니다. 어느 정도의 검색 대기 시간을 기다릴 수 있다면 Kafka 사용을 아예 배제해도 됩니다. 단말 노드에서 파일 콘텐츠를 추적하고 전송하는 Filebeat는 로그 순환에 탄력적입니다. 즉, 애플리케이션이 Logstash/Elasticsearch가 실시간으로 수집할 수 있는 것보다 더 많은 로그를 배출 중이라면 Log4j 또는 logrotate를 사용하여 파일 간에 로그를 순환시킬 수 있지만, 여전히 인덱싱은 필요합니다. 물론 이 로그를 서버 컴퓨터에 저장할 수 있는 충분한 디스크 공간이 필요하다는 별도의 요건이 발생합니다. 즉, 이 시나리오에서는 로컬 파일 시스템이 임시 버퍼 가 됩니다.

Kafka 및 Logstash에 대한 설계 고려 사항

다음은 Logstash에 Kafka를 사용할 때 주의해야 할 설계 고려 사항입니다. Logstash 입력은 상위 레벨의 Kafka 소비자(consumer) API를 사용하고 Logstash 출력은 새 생산자(producer) API를 사용합니다.

토픽

토픽은 메시지의 논리적 그룹입니다. 이 논리적 그룹화는 필요에 따라 다른 소비자로부터 특정 데이터를 격리할 수 있는 수단입니다. 0.8 버전의 Kafka에는 기본 보안 기능이 없어, 어느 소비자나 브로커에서 이용 가능한 모든 토픽에 액세스할 수 있다는 점에 유의하십시오. 필요한 토픽의 수, 데이터 모델링 방식은 데이터에 따라 크게 다릅니다. 다음은 몇 가지 전략입니다.

사용자 기반 데이터 플로우: 이 경우에는 사용자별 토픽이 생성됩니다. Kafka는 ZooKeeper에 존재하는 모든 파티션을 등록하므로, 수천 개의 토픽을 생산해야 하는 비용이 듭니다. 사용자 수가 적다면(예: 부서인 경우) 이 사용자별 분할 전략이 적합합니다.

HOV lane속성 기반 데이터 플로우: logging 및 이벤트 기반 데이터의 경우에도 데이터 볼륨과 예상 검색 대기 시간과 같은 속성에 기반하여 여러 사용자를 하나의 토픽에 그룹화할 수 있습니다. Elasticsearch에 인덱싱되지 않은 채로 대기열에서 소비되는 이벤트 수가 많아질수록, 검색 대기 시간은 길어집니다. 한 가지 해결책은 예상 SLA를 기반으로 "높음", "중간", "낮음" 토픽과 같이 토픽을 생성하는 것입니다. 마찬가지로 데이터 볼륨에 기반합니다. 데이터 급증이 발생하는 소비자/사용자를 알고 있다면 이들에게 새 토픽을 부여하십시오. 멀티테넌트 배포에서 "폭증" 토픽이 존재하며 사용자가 자신의 데이터 볼륨을 위반하거나, 지난 X분/시간 전에 지나치게 많은 데이터를 생산했다면 해당 사용자를 런타임으로 실행되는 이 토픽으로 이동시키는 것이 바람직합니다. 그러면 불필요한 트래픽 없이 다른 토픽을 유지할 수 있고 누구도 속도 저하를 겪지 않게 됩니다. 고속도로에서 대부분의 사람들이 길막힘 없이 빠른 차선을 이용하고 싶어하고, 길이 막혀서 속도가 느려지면 다른 차선으로 옮겨가는 것과 같습니다. Kafka 토픽을 차선으로, 이벤트를 차로 생각하십시오.

일반적으로 데이터 소스별로 토픽을 분리하면 소스를 격리할 수 있습니다. Kafka에서는 토픽별 파티션 수를 구성할 수 있습니다. 토픽별 Logstash 인스턴스 규모를 조정할 수 있다는 의미이기도 합니다. 특정 소스가 향후 대용량으로 확대될 것으로 예상된다면, 미리 파티션을 확대할 수 있습니다.

토픽은 데이터를 처음으로 게시할 때 수시로 만들거나 사전에 수동으로 만들 수 있습니다.

kafka-topics.sh --zookeeper zk_host:port --create --topic user1
    --partitions 8 --replication-factor 1

파티션

이제 파티션에 대해 말해 보겠습니다. Kafka 문서에 따르면:

"파티션 수는 토픽을 샤드에 할당하는 로그 수를 통제합니다. 파티션 수에 대한 몇가지 영향이 있습니다. 첫째, 각 파티션은 한 서버에만 적용 가능합니다. 따라서 파티션이 20개이면 전체 데이터 세트(와 읽기 및 쓰기 부하)는 20개 이하의 서버(복제본 제외)로 처리됩니다. 마지막으로, 파티션 수는 소비자의 최대 병렬 처리에 영향을 미칠 수 있습니다."

본질적으로 파티션 수가 많을수록 데이터를 소비할 때의 처리량은 늘어납니다. Kafka는 생산자의 관점에서 파티션에 속하게 될 데이터를 통제할 수 있는 옵션을 제공합니다. 기본적으로, Logstash를 사용할 때 데이터는 라운드 로빈 방식으로 파티션에 할당됩니다. Logstash 구성에서 message_key를 지정하면 파티션에 데이터를 할당하는 방법을 통제할 수 있습니다. 경우에 따라, ZooKeeper의 제약을 완화할 수 있는 적은 수의 토픽/파티션을 사용하는 것이 효율적일 수 있지만, message_keyuser_id를 통해 다중 사용자를 고정 파티션으로 그룹화하는 방법이 사용됩니다. 키가 지정되면 키 해시를 사용하여 파티션이 선택됩니다.

Kafka에서 알아두어야 할 또 다른 중요한 성질은 메시지의 순서입니다. Kafka는 단일 파티션에서만 메시지 순서를 보장합니다. 따라서 데이터 소스에서 전송된 메시지에 키가 없으면 데이터는 여러 파티션으로 분배되고 Kafka는 소비 순서를 보장하지 않습니다. 로그 사용사례와 같이 데이터 변경이 불가능한 경우에는 허용 가능한 성질입니다. 강력한 순서 지정이 필요한 경우에는 데이터가 단일 파티션으로 전송되도록 해야 합니다.

소비자 그룹: 확장성과 내결함성

유사한 토픽에서 전송된 데이터를 처리하는 다수의 Kafka 소비자는 cluster에서 고유 이름으로 지정된 소비자 그룹 을 형성합니다. Kafka로 게시된 메시지는 그룹에 속한 여러 인스턴스로 분배되지만, 각 메시지는 그룹에 속한 단 한 명의 소비자에 의해 처리되므로 중첩이 존재하지 않습니다. Kafka를 읽는 Logstash 인스턴스는 logstash라고 하는 기본 그룹 ID를 가지는 소비자 그룹을 형성합니다. 언제든지 Logstash 인스턴스를 스핀업해서 구독한 토픽에 대한 읽기 처리량을 확대할 수 있습니다. 기본적으로, 새로 시작된 Logstash 인스턴스는 logstash 소비자 그룹을 포함합니다. 이 프로세스에서 새 소비자가 소비자 그룹에 포함되면 Kafka에서 리밸런싱 작업(rebalancing) 이 트리거됩니다. 앞서 Logstash는 상위 레벨의 Kafka 소비자를 사용하므로, 리밸런싱(rebalancing) 로직은 Kafka 라이브러리로 위임됩니다. 이 프로세스는 Zookeeper에서 이용 가능한 메타데이터에 기반하여 현재 소비자를 파티션으로 자동으로 재할당합니다. 다중 Logstash 인스턴스를 사용하는 또 다른 이유는 내결함성의 증대입니다. 한 인스턴스가 다운되면 Kafka는 프로세스 리밸런싱을 진행해서 기존의 Logstash 인스턴스 할당을 분산시킵니다.

이 모든 기능은 Logstash 입력의 consumer_threads 설정과 깊은 연관성이 있습니다. 이 설정은 Kafka 파티션에서 소비되는 스레드 수를 제어합니다. 파티션 수와 같은 스레드 수를 유지하는 것이 완벽한 균형을 이루는 데 가장 이상적입니다. 스레드가 파티션보다 많으면 일부 스레드는 하는 일이 없어집니다. 스레드가 파티션보다 적으면 일부 스레드가 하나 이상의 파티션에서 소비하게 됩니다.

16개의 파티션을 가진 토픽 apache_logs 의 시나리오를 살펴봅시다. 이 구성에서는 8코어 컴퓨터에서 1개의 Logstash 인스턴스를 스핀업할 수 있습니다.

input {
   kafka {
   zk_connect => "kafka:2181"
   group_id => "logstash"
   topic_id => "apache_logs"
   consumer_threads => 16
  }
}

또는 2개의 컴퓨터에서 consumer_threads 가 각각 8로 설정된 2개의 Logstash 인스턴스를 스핀업할 수도 있습니다. 두 번째 배포 방식이 더 나은 선택입니다. 컴퓨터의 CPU를 모두 활용하지만 치명적인 오류가 발생했을 때 내결함성을 증대시킬 수도 있기 때문입니다.

일반적으로, 파티션 수는 Logstash 스레드/인스턴스 수의 배수가 되도록 해야 합니다. 인스턴스가 균형을 이루게 합니다. Elasticsearch의 샤드 할당과 마찬가지로, 분할(Partitioning)은 나중에 처리 성능(Logstash 인스턴스)을 증대시킬 수 있는 방법입니다.

직렬화 형식

Kafka는 바이트 배열을 사용하여 대기열의 메시지를 유지합니다. 따라서 Kafka에서는 어느 형식도 처리할 수 있지만, 일반적으로는 간편하고 신속한 직렬화 형식을 사용하는 것이 바람직합니다. Kafka는 출력에 value_serializer, 입력에 decoder_class 를 지정해서 직렬화 메시지 형식을 처리하는 기능을 제공합니다. 능숙한 Logstash 사용자라면 지금 바로 코덱 에 대해 생각해봐야 합니다. Kafka 토픽을 드나드는 메시지 표현을 관리할 때 Logstash 코덱과 Kafka 직렬 변환기를 둘 다 활용할 수 있습니다.

Kafka 에코시스템과 관련된 다른 Logstash 코덱은 plain, avro, avro_schema_registry입니다.

자체 serializer/deserializer를 작성하려면 선호하는 JVM 언어를 사용하면 됩니다. 이 클래스들은 Logstash 클래스 경로에 속하지 않으므로 Java classpath 경로에 적절한 라이브러리를 명시적으로 추가해야 합니다.

export CLASSPATH=$CLASSPATH:/path/to/kafkaserializers.jar; bin/logstash -f ..

Logstash 입력의 메시지 디코딩은 단일 스레드이므로, json과 같은 값비싼 serialization 형식은 파이프라인의 전체적인 성능을 감소시킨다는 데 유의하십시오.

결론

이 게시물에서는 Kafka의 기본 개념과 Elastic Stack과의 사용에 관해 알아보았습니다. 다음 게시물에서는 작동에 관한 내용으로 넘어가 Logstash의 Kafka 실행에 관한 팁을 제공해 드리겠습니다. 질문이 있으시면 포럼 또는 트위터를 통해 자유롭게 문의하시기 바랍니다.

업데이트: 더 필요한 것이 있나요? 운영과 모니터링 팁에 대한 2번째 포스트를 읽어보세요.