이 문서에서는 데이터 수집 및 색인을 위해 Apache Kafka와 Elasticsearch를 통합하는 방법을 보여드립니다. Kafka의 개요와 생산자 및 소비자 개념에 대해 설명하고, Apache Kafka를 통해 메시지를 수신하고 색인할 로그 인덱스를 생성합니다. 이 프로젝트는 Python으로 구현되었으며, 코드는 GitHub에서 확인할 수 있습니다.
필수 구성 요소
- 도커 및 도커 컴포즈: 컴퓨터에 도커 및 도커 컴포즈가 설치되어 있는지 확인합니다.
- Python 3.x: 생산자 및 소비자 스크립트를 실행합니다.
아파치 카프카 소개
Apache Kafka는 높은 확장성과 가용성, 내결함성을 지원하는 분산형 스트리밍 플랫폼입니다. Kafka에서는 주요 구성 요소를 통해 데이터 관리가 이루어집니다:
- 브로커: 생산자와 소비자 간의 메시지 저장 및 배포를 담당합니다.
- 주키퍼: 클러스터의 상태, 파티션 리더, 소비자 정보를 제어하여 카프카 브로커를 관리하고 조정합니다.
- 주제: 데이터가 게시되고 소비를 위해 저장되는 채널입니다.
- 소비자와 생산자: 생산자가 토픽에 데이터를 전송하는 동안 소비자는 해당 데이터를 검색합니다.

이러한 구성 요소는 함께 작동하여 데이터 스트리밍을 위한 강력한 프레임워크를 제공하는 카프카 생태계를 형성합니다.
프로젝트 구조
데이터 수집 프로세스를 이해하기 위해 데이터 수집 프로세스를 여러 단계로 나누었습니다:
- 인프라 프로비저닝: Kafka, Elasticsearch, Kibana를 지원하기 위한 Docker 환경 설정.
- 프로듀서 만들기: 로그 주제로 데이터를 전송하는 Kafka 프로듀서 구현하기.
- 소비자 생성: Elasticsearch에서 메시지를 읽고 색인하기 위한 Kafka 소비자 개발.
- 수집 유효성 검사: 전송 및 소비된 데이터를 확인하고 유효성을 검사합니다.
Docker Compose를 사용한 인프라 구성
필요한 서비스를 구성하고 관리하기 위해 Docker Compose를 활용했습니다. 아래에는 Apache Kafka, Elasticsearch, Kibana의 통합에 필요한 각 서비스를 설정하여 데이터 수집 프로세스를 보장하는 Docker Compose 코드가 나와 있습니다.
Elasticsearch Labs GitHub 리포지토리에서 직접 파일에 액세스할 수 있습니다.
카프카 프로듀서를 사용한 데이터 전송
프로듀서는 로그 주제에 메시지를 보낼 책임이 있습니다. 메시지를 일괄적으로 전송함으로써 네트워크 사용 효율을 높이고 일괄 전송의 양과 지연 시간을 각각 제어하는 batch_size 및 linger_ms 설정으로 최적화할 수 있습니다. acks='all' 구성은 중요한 로그 데이터에 필수적인 메시지를 영구적으로 저장할 수 있도록 합니다.
프로듀서를 시작할 때 아래와 같이 메시지가 토픽에 일괄적으로 전송됩니다:
Kafka Consumer를 통한 데이터 소비 및 색인화
소비자는 메시지를 효율적으로 처리하도록 설계되어 로그 주제에서 배치를 소비하고 Elasticsearch로 색인합니다. auto_offset_reset='latest' 을 사용하면 소비자가 이전 메시지를 무시하고 가장 최근 메시지부터 처리를 시작하고 max_poll_records=10 은 일괄 처리를 10개의 메시지로 제한합니다. fetch_max_wait_ms=2000 을 사용하면 소비자는 배치 처리 전에 충분한 메시지가 누적될 때까지 최대 2초 동안 기다립니다.
메인 루프에서 소비자는 로그 메시지를 소비하고, 처리하고, 각 배치를 Elasticsearch로 색인하여 지속적인 데이터 수집을 보장합니다.
Kibana에서 데이터 시각화하기
Kibana를 사용하면 Kafka에서 수집되어 Elasticsearch에서 색인된 데이터를 탐색하고 검증할 수 있습니다. Kibana의 개발 도구에 액세스하여 색인된 메시지를 보고 데이터가 예상대로인지 확인할 수 있습니다. 예를 들어, 카프카 프로듀서가 각각 10개의 메시지를 5개의 배치로 전송했다면 인덱스에 총 50개의 레코드가 표시되어야 합니다.
데이터를 확인하려면 개발 도구 섹션에서 다음 쿼리를 사용할 수 있습니다:
대응:

또한, Kibana는 분석을 보다 직관적이고 대화형으로 만드는 데 도움이 되는 시각화 및 대시보드를 생성하는 기능을 제공합니다. 아래에서 처리된 정보에 대한 이해를 높이기 위해 다양한 형식으로 데이터를 보여주는 대시보드와 시각화의 몇 가지 예를 볼 수 있습니다.

Kafka Connect를 통한 데이터 수집
Kafka Connect는 데이터베이스나 파일 시스템과 같은 데이터 소스와 대상(싱크) 간의 통합을 용이하게 하도록 설계된 서비스입니다. 데이터 이동을 자동으로 처리하는 사전 정의된 커넥터로 작동합니다. 저희의 경우, Elasticsearch는 데이터 싱크 역할을 합니다.

Kafka Connect를 사용하면 데이터 수집 프로세스를 간소화할 수 있으므로 데이터 수집 워크플로우를 Elasticsearch에 수동으로 구현할 필요가 없습니다. 적절한 커넥터를 사용하면 최소한의 설정과 추가 코딩 없이도 Kafka Connect를 통해 Kafka 토픽으로 전송된 데이터를 Elasticsearch에서 직접 색인할 수 있습니다.
카프카 커넥트로 작업하기
Kafka Connect를 구현하기 위해 Docker Compose 설정에 kafka-connect 서비스를 추가합니다. 이 구성의 핵심은 데이터 인덱싱을 처리할 Elasticsearch 커넥터를 설치하는 것입니다.
서비스를 구성하고 Kafka Connect 컨테이너를 생성한 후에는 Elasticsearch 커넥터를 위한 구성 파일이 필요합니다. 이 파일에는 다음과 같은 필수 매개변수가 정의되어 있습니다:
connection.url: Elasticsearch용 연결 URL입니다.topics: 커넥터가 모니터링할 카프카 토픽(이 경우 "로그").type.name: Elasticsearch의 문서 유형(일반적으로 _doc).value.converter: Kafka 메시지를 JSON 형식으로 변환합니다.value.converter.schemas.enable: 스키마를 포함할지 여부를 지정합니다.schema.ignore및key.ignore: 인덱싱 중 Kafka 스키마 및 키를 무시하도록 설정합니다.
아래는 curl 명령어로 Kafka Connect에서 Elasticsearch 커넥터를 생성하는 방법입니다:
이 구성을 사용하면 Kafka Connect는 "logs" 항목으로 전송된 데이터를 자동으로 수집하고 Elasticsearch에서 색인하기 시작합니다. 이 접근 방식을 사용하면 추가 코딩 없이도 완전히 자동화된 데이터 수집 및 인덱싱이 가능하므로 전체 통합 프로세스를 간소화할 수 있습니다.
결론
Kafka와 Elasticsearch를 통합하면 실시간 데이터 수집과 분석을 위한 강력한 파이프라인이 만들어집니다. 이 가이드는 향후 더 복잡한 요구 사항에 적응할 수 있도록 Kibana에서 원활한 시각화 및 분석을 통해 강력한 데이터 수집 아키텍처를 구축하기 위한 기초적인 접근 방식을 제공합니다.
또한, Kafka Connect를 사용하면 데이터를 처리하고 색인하기 위한 추가 코드가 필요 없기 때문에 Kafka와 Elasticsearch 간의 통합이 훨씬 더 간소화됩니다. Kafka Connect를 사용하면 최소한의 구성으로 특정 토픽으로 전송된 데이터를 Elasticsearch에서 자동으로 색인할 수 있습니다.




