밑바닥부터 시작하는 데이터 플랫폼 만들기 — (4)BigQuery최적화 ETL 2차PoC.Dataflow + Airflow(Docker-Compose)

bae.200.ok
11 min readNov 7, 2021

안녕하세요. 배민혁 입니다.

이전 글(BigQuery 최적화 ETL 구성하기. About Apache Beam)에서 Runner를 GCP의 Dataflow로 하여 Apache Beam을 사용하는 내용에 대해서 다루었습니다.

이번 글에서는 Airflow(on Docker-compose)를 통해서 Dataflow를 동작시킨 내용을 기록해보려고 합니다.

1 Airflow

공식문서에 따르면 Airflow는 워크플로우를 빌드하고 실행하는 플랫폼입니다. 여기서 말하는 워크 플로우란 DAG로 정의되며, 의존성과 데이터 흐름을 고려하여 정렬된 Task라고 불리는 개별 작업을 포함합니다.

쉽게 이야기하면, Airflow는 데이터 엔지니어링을 위해서 워크플로우를 생성, 스케쥴링, 모니터링이 가능한 플랫폼 입니다.

Airflow의 자세한 세부사항은 살펴보지 않고, 아키텍처 정도만 보도록 하겠습니다.

1.1 Airflow Architecture

from Airflow Docs

일반적으로 아래 나열된 컴포넌트를 포함하고 있습니다.

  • scheduler: 모든 task와 DAG를 모니터링하고 있다가 종속성이 있는 task가 완료되면 다음 task를 excutor에 제출한다.
  • executor: 실제로 task를 worker에게 제출하는 컴포넌트
  • webserver: 사용자에게 제공하는 인터페이스
  • metadata database: scheduler, executor, webserver에서 상태를 저장하는데 사용

1.2 Airflow(CeleryExecutor) on Docker-Compose

공식문서를 보면 docker-compose.yaml 파일을 제공하고 있습니다. CeleryExecutor를 사용하기 때문에 기본 아키텍처에 flower(모니터링), redis(큐)서비스가 추가되 형태입니다.

2 Airflow + Dataflow 삽질

Airflow를 통해서 GCP의 Dataflow를 진행하려는 목적이 있습니다. 어찌보면 단순한데 이를 진행하는데 여러가지 이슈를 만났습니다.

2.1 구성 환경으로 발생한 이슈

  • Airflow version: 2.2.0
  • apache beam: 2.32.0
  • Docker-Compose: 1.29.2

2.1.1 Docker-Compose: “depends_on.condition: service_completed_successfully”

invalid condition type, service_completed_successfully

저는 처음에 docker-compose 1.27.4 버전으로 환경을 구성하려고 했었습니다. 그러나 위의 이미지와 같은 이슈를 만났습니다. Airflow를 구성하는 docker-compose.yaml파일을 보면 condition이 ‘service_completed_successfully’로 설정되어져 있습니다.

docker-compose의 릴리즈노트를 보면 depends_on의 condition에 서비스가 성공적으로 완료된 상태를 확인하기 위해서 특정 컨디션이 추가되었다고 말하고 있습니다. 해당 상태가 ‘service_completed_successfully’인 것 같습니다.

그래서 docker-compose 버전을 1.29.2로 올려서 진행했고, 컨테이너를 성공적으로 실행할 수 있었습니다.

2.1.2 Airflow: provider packages

Ariflow에서는 provider라는 것을 통해서 기능을 확장할 수 있습니다. Airflow 2로 넘어가면서 모듈 방식으로 구성되었습니다. 그러면서 provider는 추가로 설치해야합니다.

Docker 엔진 상에서 Airflow 를 동작시킬 때에는 provider를 설치하는 방식이 두가지 있습니다.

  • _PIP_ADDITIONAL_REQUIREMENTS 환경 변수를 사용한다. 이것은 테스트 단계에 적절하다.
  • docker image를 구성하는 Dockerfile을 통해서 설치한다.

환경변수를 통해서 진행하는 방식은 아무래도 테스트 단계에 적절하다고 하니 Dockerfile을 별도로 작성하는게 좋겠습니다.

2.1.3 Airflow: PYTHONPATH

dags 디렉토리 하위에 DAG를 작성해도 DAG를 읽어들이지 못하는 이슈(not found module)를 만났습니다. airflow info를 통해서 PYTHONPATH를 확인하면 분명 ‘/opt/airflow/dags’라는 경로가 포함되어진 것으로 보입니다. 계속해서 다른 이유를 찾았으나 알 수 없었습니다.

혹시나 하는 마음으로 컨테이너 내부에서 PYTHONPATH를 확인했는데… 해당 path가 추가 되어져 있지 않았습니다. 결국 docker-compose의 envirionment에 해당 경롤를 PYTHONPATH에 추가하여 해결했습니다.

2.2 GCP: Dataflow(+apache beam)를 구동하면서 발생한 이슈

2.2.1 Airflow: DataflowCreatePythonJobOperator options

Airflow에서 Dataflow를 구동하기 위해서는 DataflowCreatePythonJobOperator를 사용했습니다. 해당 operator의 아규먼트로 는 dataflow_default_options와 options가 있습니다. 어떤 용도인지 헷갈리는 상태였습니다.

공식문서를 통해서 두 가지의 용도를 유추할 수 있었습니다.

  • dataflow_default_options: 공식문서에서 이야기하는 Pipeline options
  • options: argparse를 통해서 정의한 추가 아규먼트

2.2.2 Airflow: Managing Python Pipeline Dependencies

DataflowCreatePythonJobOperator에는 python_file, 즉, Dataflow 상에서 동작할 pipeline을 작성하도록 되어져 있습니다. 저는 해당 pipeline의 소스를 모듈화하여 관리하도록 구성했습니다. 해당 모듈은 dags 디렉토리 하위에 존재하도록 하였습니다. 이렇게 하고 Airflow를 구동하면 모듈을 못읽어 들이는 문제가 발생합니다.

저는 이 문제를 해결하기 위해서 setuptools를 사용했습니다. 조금 더 풀어서 이야기하면 pipeline 소스를 모듈화한 pipelines 모듈을 setuptools로 패키지화하는 방식으로 해결했습니다.

setuptools: python 라이브러리를 확장 및 배포하는데 일반적으로 사용되는 라이브러리

아래는 프로젝트 디텍토리에서 dags, setup.py, pipelines 모듈의 위치를 보여줍니다. 여기서 setup.py는 원하는 모듈을 setuptools로 패키지화 하는데 사용됩니다.

# tree
.
├── dags
│ ├── __init__.py
│ ├── pipelines
│ └── separate_event_data_dag.py
...
└── setup.py

setup.py는 아래처럼 작성됩니다.

from setuptools import setup, find_packagessetup(
name="pipelines",
version="1.0.0",
packages=find_packages(where="dags"),
package_dir={"": "dags"},
)

2.2.3 Apache Beam: Ptransform vs Pardo

pipeline 소스를 모듈화 하다보니 제가 ptranform와 pardo의 용도를 바르게 이해하고 사용하지 못하고 있다는 생각이 들었습니다.

  • ptransform: input과 output 모두 pcollection이다. 즉, input 전체를 처리한다. 인덱싱 불가능(인덱싱을 시도하면 인덱싱 에러 발생)
  • pardo: map each element, 각 row를 대상으로 처리한다. return 할 때에는 list로 처리한다.(return [row])인덱싱 가능

각 class를 정의할 때에 input, output 변수이름을 pcoll 이나 row처럼 사용하면 명시적으로 보이기 때문에 인지하는데에 도움이 됩니다.

여기서 추가로 이야기 할 것이 있습니다. ptransform을 통해서 map을 할 때에는 [](list)에 따라 결과가 달라진다는 점입니다.

pcoll_1 = [{“a“: 1, “b“:2, “c“: 3}, {“a“: 4, “b“: 5, “c“: 6}]
pcoll_2 = [[{“a“: 1, “b“:2, “c“: 3}], [{“a“: 4, “b“: 5, “c“: 6}]]

pcoll_1과 pcoll_2에서 0번째 인덱스 프린트하면 pcoll_1는 `a b c a b c`를, pcoll_2는 `{“a“: 1, “b“:2, “c“: 3}, {“a“: 4, “b“: 5, “c“: 6}`를 보여줍니다.

3 마치며

Airflow를 통해서 Dataflow를 사용하는 데에 크게 문제가 없을 것으로 생각했으나 엄청나게 많은 삽질을 했습니다. 다음 작업은 branching을 하는 것인데 기대가 됩니다. airflow와 dataflow를 통해서 어떻게 효율적으로 할 지 고민이 됩니다.

첫 번째 방법은 event 전체 데이터를 처리할 때에 하나의 파이프라인(하나의 워크플로우) 안에서 각 이벤트별로 나누면서 전처리를 동시에 할 지, 두 번째 방법은 각 이벤트별로 분리하는 파이프라인, 각 이벤트별로 전처리를 하는 파이프라인, 두 단계로 구성하는 것 입니다.

아무래도 첫 번째 방법은 event 전체 데이터를 한 번 읽어서 전처리를 하기 때문에, 읽을 때 발생하는 비용이 적게 든다는 장점이 있습니다. 하지만 파이프라인이 하나로 구성되어져 있기 때문에 일부만 실패하더라도 멱등성을 위해 전체 작업을 롤백하게 되지 않을까 싶습니다. 또한 event 전체 데이터의 양이 많아지면 파이프라인 실행시간이 증가한다는 단점이 있습니다.

두 번째 방법은 event 전체 데이터를 읽은 후에도, 각 이벤트별 데이터를 또 읽어야합니다. 읽는 비용이 추가로 발생하게 되는 것이죠. 그렇지만 이벤트별 전처리하는 파이프라인은 각 이벤트 서로가 서로에게 영향을 주지 않기 때문에 실패한 작업만 다시 실행하면 됩니다. 또한 파이프라인 실행 시간이 비교적 짧을 것으로 예상됩니다.

3.1 현재 branching 전략

airflow를 활용한 전체 워크플로우

고민 끝에 두개의 DAG를 만들어 구성하는 것이 효율적이라 판단하여 위와 같이 구성하였습니다.

event_name 별 braching 및 GCS 저장

첫번째는 BigQuery의 DW가 되는 테이블을 읽어서 event 별 구분 후 GCS로 저장하는 파이프라인입니다. 이벤트 별 데이터를 구분하는 것에 초점이 있습니다.

event_name 별 processing 후 BigQuery 저장

두번째는 GCS에 저장된 각 이벤트 별 데이터를 읽어서 Dataflow를 통해서 프로세싱 하여 BigQuery에 데이터 마트를 구성하는 파이프라인 입니다.

이렇게 구성하면, 특정 이벤트에 대한 프로세싱 파이프라인이 실패하더라도 DW 전체를 읽는 것이 아니라 GCS에 있는 데이터만 읽어서 처리가 가능합니다.

--

--