아파치 에어플로우란 무엇인가요?
Apache Airflow는 워크플로우를 생성, 예약 및 모니터링하도록 설계된 플랫폼입니다. ETL 프로세스, 데이터 파이프라인 및 기타 복잡한 워크플로를 오케스트레이션하는 데 사용되며 유연성과 확장성을 제공합니다. 시각적 인터페이스와 실시간 모니터링 기능을 통해 파이프라인을 보다 쉽고 효율적으로 관리할 수 있으며, 실행 진행 상황과 결과를 추적할 수 있습니다. 다음은 네 가지 주요 기둥입니다:
- 동적: 파이프라인은 파이썬으로 정의되어 동적이고 유연한 워크플로를 생성할 수 있습니다.
- 확장성: 에어플로우를 다양한 환경과 통합할 수 있고, 사용자 지정 오퍼레이터를 생성할 수 있으며, 필요에 따라 특정 코드를 실행할 수 있습니다.
- 우아함: 파이프라인은 깔끔하고 명시적인 방식으로 작성됩니다.
- 확장성: 모듈식 아키텍처는 메시지 대기열을 사용하여 임의의 수의 작업자를 조율합니다.
실제로 Airflow는 다음과 같은 시나리오에서 사용할 수 있습니다:
- 데이터 가져오기: Elasticsearch와 같은 데이터베이스로의 일일 데이터 수집을 오케스트레이션하세요.
- 로그 모니터링: 로그 파일의 수집과 처리를 관리한 다음 Elasticsearch에서 분석하여 오류나 이상 징후를 식별합니다.
- 여러 데이터 소스 통합: 서로 다른 시스템(API, 데이터베이스, 파일)의 정보를 Elasticsearch의 단일 레이어로 결합하여 검색과 보고를 간소화하세요.
공기 흐름에서 DAG(방향성 비순환 그래프) 이해하기
에어플로우에서 워크플로는 DAG(방향성 비순환 그래프)로 표현됩니다. DAG는 작업이 실행되는 순서를 정의하는 구조입니다. DAG의 주요 특징은 다음과 같습니다:
- 독립적인 작업별 구성: 각 작업은 작업의 단위를 나타내며 독립적으로 실행되도록 설계되었습니다.
- 시퀀싱: 작업이 실행되는 순서는 DAG에 명시적으로 정의되어 있습니다.
- 재사용성: DAG는 반복적으로 실행되도록 설계되어 프로세스 자동화를 용이하게 합니다.
공기 흐름 구성 요소
Airflow 에코시스템은 작업을 조율하기 위해 함께 작동하는 여러 구성 요소로 구성되어 있습니다:

- 스케줄러: 스케줄러: DAG를 예약하고 작업자가 실행할 작업을 보내는 역할을 담당합니다.
- 실행자: 작업 실행을 관리하여 작업자에게 위임합니다.
- 웹 서버: DAG 및 작업과 상호 작용하기 위한 그래픽 인터페이스를 제공합니다.
- Dags 폴더: 파이썬으로 작성된 DAG를 저장하는 폴더입니다.
- 메타데이터: 스케줄러와 실행자가 실행 상태를 저장하는 데 사용하는 도구의 저장소 역할을 하는 데이터베이스입니다.
아파치 에어플로우와 Elasticsearch
Apache Airflow와 Elasticsearch를 사용하여 Elasticsearch에서 작업과 색인 결과를 오케스트레이션하는 방법을 시연합니다. 이 데모의 목표는 Elasticsearch 인덱스의 레코드를 업데이트하는 작업 파이프라인을 생성하는 것입니다. 이 인덱스에는 사용자가 평점을 매기고 등급을 지정할 수 있는 영화 데이터베이스가 포함되어 있습니다. 매일 수백 개의 등급이 있는 시나리오를 상상해 보면 등급 기록을 계속 업데이트해야 합니다. 이를 위해 새로운 통합 등급을 검색하고 인덱스의 기록을 업데이트하는 DAG가 매일 실행되도록 개발될 것입니다.
DAG 흐름에는 등급을 가져오는 작업과 결과를 검증하는 작업이 있습니다. 데이터가 존재하지 않으면 DAG는 실패 작업으로 이동합니다. 그렇지 않으면 데이터가 Elasticsearch에서 색인됩니다. 점수 계산을 담당하는 메커니즘이 있는 메서드를 통해 등급을 검색하여 인덱스의 영화 등급 필드를 업데이트하는 것이 목표입니다.
Docker와 함께 Apache Airflow 및 Elasticsearch 사용
컨테이너화된 환경을 만들기 위해 Docker와 함께 Apache Airflow를 사용하겠습니다. "도커에서 에어플로우 실행하기" 가이드의 지침에 따라 에어플로우를 실제로 설정하세요.
Elasticsearch의 경우 Elastic Cloud의 클러스터를 사용하겠지만, 원하는 경우 Docker로 Elasticsearch를 구성할 수도 있습니다. 이미 영화 카탈로그가 포함된 인덱스가 생성되어 영화 데이터가 색인화되었습니다. 이러한 영화의 '등급' 필드가 업데이트됩니다.
DAG 생성
Docker를 통해 설치하면 Airflow가 인식할 수 있도록 DAG 파일을 배치해야 하는 dags 폴더를 포함한 폴더 구조가 생성됩니다.
그 전에 필요한 종속성이 설치되어 있는지 확인해야 합니다. 이 프로젝트의 종속성은 다음과 같습니다:
update_ratings_movies.py 파일을 만들고 작업 코딩을 시작합니다.
이제 필요한 라이브러리를 가져와 보겠습니다:
연결과 외부 API 사용을 추상화하여 Airflow와 Elasticsearch 클러스터 간의 통합을 간소화하는 구성 요소인 ElasticsearchPythonHook을 사용하겠습니다.
다음으로, 주요 인수를 지정하여 DAG를 정의합니다:
dag_idDAG의 이름입니다.start_dateDAG가 시작되는 시기입니다.schedule: 주기를 정의합니다(이 경우 매일).doc_md문서를 가져와 에어플로우 인터페이스에 표시할 수 있습니다.
작업 정의하기
이제 DAG의 작업을 정의해 보겠습니다. 첫 번째 작업은 영화 등급 데이터를 검색하는 작업을 담당합니다. task_id 을 'get_movie_ratings' 으로 설정한 파이썬 오퍼레이터를 사용하겠습니다. python_callable 매개변수는 평점 가져오기를 담당하는 함수를 호출합니다.
다음으로 결과가 유효한지 검증해야 합니다. 이를 위해 BranchPythonOperator와 함께 조건문을 사용하겠습니다. task_id 은 'validate_result' 이 되고 python_callable 은 유효성 검사 함수를 호출합니다. op_args 매개 변수는 이전 작업의 결과인 'get_movie_ratings' 을 유효성 검사 함수에 전달하는 데 사용됩니다.
유효성 검사가 성공하면 'get_movie_ratings' 작업에서 데이터를 가져와서 Elasticsearch로 색인합니다. 이를 위해 새 작업인 'index_movie_ratings' 을 생성하여 PythonOperator를 사용합니다. op_args 매개변수는 'get_movie_ratings' 작업의 결과를 인덱싱 함수에 전달합니다.
유효성 검사 결과 실패로 표시되면 DAG는 실패 알림 작업으로 진행합니다. 이 예에서는 단순히 메시지를 인쇄하지만 실제 시나리오에서는 실패에 대해 알리도록 알림을 구성할 수 있습니다.
마지막으로 작업 종속성을 정의하여 올바른 순서로 실행되도록 합니다:
이제 DAG의 전체 코드를 따르세요:
DAG 실행 시각화
Apache Airflow 인터페이스에서 DAG의 실행을 시각화할 수 있습니다. "DAG" 탭으로 이동하여 생성한 DAG를 찾으면 됩니다.

아래에서 작업의 실행과 각 상태를 시각화하여 확인할 수 있습니다. 특정 날짜의 실행을 선택하면 각 작업의 로그에 액세스할 수 있습니다. index_movie_ratings 작업에서 인덱스에서 인덱싱 결과를 확인할 수 있으며, 성공적으로 완료되었음을 알 수 있습니다.

다른 탭에서는 작업 및 DAG에 대한 추가 정보에 액세스하여 잠재적인 문제를 분석하고 해결하는 데 도움을 받을 수 있습니다.
결론
이 문서에서는 Apache Airflow와 Elasticsearch를 통합하여 데이터 수집 솔루션을 만드는 방법을 보여드렸습니다. DAG를 구성하고, 동영상 데이터 검색, 유효성 검사, 인덱싱을 담당하는 작업을 정의하고, Airflow 인터페이스에서 이러한 작업의 실행을 모니터링하고 시각화하는 방법을 보여드렸습니다.
이 접근 방식은 다양한 유형의 데이터와 워크플로에 쉽게 적용할 수 있으므로 Airflow는 다양한 시나리오에서 데이터 파이프라인을 오케스트레이션하는 데 유용한 도구입니다.
참고 자료
Apache AirFlow
Docker로 Apache Airflow 설치
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
Elasticsearch 파이썬 훅
파이썬 연산자
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html
자주 묻는 질문
아파치 에어플로우란 무엇인가요?
Apache Airflow는 워크플로우를 생성, 예약 및 모니터링하도록 설계된 플랫폼입니다.
아파치 에어플로우의 용도는 무엇인가요?
Apache Airflow는 ETL 프로세스, 데이터 파이프라인 및 기타 복잡한 워크플로우를 오케스트레이션하는 데 사용되며 유연성과 확장성을 제공합니다.
아파치 에어플로우의 주요 구성 요소는 무엇인가요?
Airflow의 주요 구성 요소는 다음과 같습니다: 스케줄러, 실행기, 웹 서버, Dags 폴더 및 메타데이터입니다.
아파치 에어플로우를 엘라스틱서치와 통합할 수 있나요?
예, Apache Airflow는 Elasticsearch와 통합될 수 있습니다. 예를 들어, Apache Airflow를 사용하여 Elasticsearch에서 작업과 색인 결과를 오케스트레이션할 수 있습니다.
아파치 에어플로우에서 DAG란 무엇인가요?
에어플로우에서 워크플로는 DAG(방향성 비순환 그래프)로 표현됩니다. DAG는 작업이 실행되는 순서를 정의하는 구조입니다.




