밑바닥부터 시작하는 데이터 플랫폼 만들기 — (3)BigQuery최적화 ETL 1차 PoC. About Apache Beam

bae.200.ok
10 min readSep 26, 2021

안녕하세요. 배민혁 입니다.

이전 글(BigQuery 최적화 ETL 구성하기. Dataproc(spark) vs Dataflow(beam))에서 BigQuery 최적화를 위한 솔루션을 검토했습니다. 따져본 결과 Dataflow를 사용하기로 했습니다.

이번 글에서는 Apache Beam에 대한 소개 및 1차 PoC 진행과 관련된 내용을 다뤄보려 합니다.

1. Apache Beam?

Apache Beam은 Batch, Stream 데이터 처리 작업을 모두 지원하는 통합 프로그래밍모델 입니다. 이름이 Beam인 이유도 B(Batch) + eam(Stream)의 통합이라고 합니다.

Beam이 아니라면 Spark(batch) + Flink(Stream) 두 가지 파이프라인을 구현해야합니다. 그에 반해 Beam은 내부적으로 동일한 모델을 사용하기 때문에 동일한 방식으로 처리가 가능합니다. 즉, 배치와 스트림을 모두 처리하는 하나의 Beam Runner가 존재하기 때문에 다른 로직을 별도로 작성할 필요가 없습니다.

  • Beam으로 파이프라인을 정의하고, 분산 처리를 지원하는 runner(Apache Flink, Apache Spark, Google Cloud Dataflow 등)에서 이디서든 실행하면됩니다.
  • java, python, go를 지원하고 있습니다.

1.1. Beam의 개발 목적

Batch 및 Stream 처리에 대해서 통합을 하고 싶었습니다. 이것을 목적으로 해서 탄생한 것이 Beam입니다. 그렇기 때문에 파이프라인의 효율을 이야기 할 때에 Beam의 성능보다 Runner의 성능이 더 중요합니다.

1.2. Beam Basic

  • Pipeline: 데이터 프로세싱의 처음부터 끝까지. 즉, input → transform → output을 이야기합니다.
  • PCollection: Spark의 RDD 또는 Dataframes와 동일. 파이프라인이 작동하는 분산 데이터 세트를 나타냅니다.
  • PTransform: 데이터 프로세싱과 관련된 operation
  • baem runtime is session based. 그렇기때문에 입력 및 출력이 영구적이지 않습니다.

1.2.1. PCollection의 특징

  • Immutability: 변경 대상의 PCollection에 덮어쓰는 형식이 아닙니다. PCollection에 PTransform을 적용하면 새로운 Pcollection이 생성됩니다.
  • Element Type: PCollection의 element의 타입은 Any입니다. 모든 elements들은 같은 타입을 가지고 있어야합니다.
  • Operation Type: PCollection은 세분화된 작업(grained operation)을 지원하지 않습니다. 쉽게 이야기해서 PCollection의 특정 elements에 대해서 변환을 지원하지 않고 전체에 대해서만 transform을 지원합니다.
  • Timestamps: PCollection의 각 element는 timestamp를 가지고 있습니다
  • bounded PCollection: 모두 동일한 timestamp를 가집니다.
  • unbouned PCollection: PCollection을 만드는 소스에 의해서 할당됩니다.

2. Beam Architecture

  • 여러 SDK를 지원합니다. 이는 곧 Pipeline을 개발하기 위한 언어에 불과합니다.
  • Beam Runner API를 통해서 Runner/Ececutors에서 사용 가능한 형태로 변환합니다.
  • 각 Runner/Executor에서는 SDK에서 작성한 언어를 동작시킬 수 있는 worker로 파이프라인들 동작시킵니다.

→ Runner에 구애받지 않고 파이프라인 작성이 가능합니다.
→ Beam = 프로그래밍 모델, 파이프라인을 작성하기 위한 모델일 뿐
→ Spark, Flink = 실행 엔진

3. Beam Basic Pipeline

기본적인 파이프라인은 위의 이미지처럼 “데이터 읽어오기 ▶︎변환 ▶︎ 쓰기”의 과정을 거칩니다. 여기서 읽기와 쓰기는 Text, Avro, Parquet, TFRecord, PubSub을 지원합니다.

Transform을 위한 API는 Map, FlatMap, Filter, Flatten를 지원합니다.

4. ParDo Transfrom

아래에 공유할 소스에 ParDo를 많이 사용하고 있습니다. ParDo Transform이란 클래스는 Map, FlatMap의 상위 클래스로 병렬처리를 위한 변환을 제공합니다. 필요시 리소스를 늘려서 병렬처리하도록 구성되어져 있습니다.

ParDo Transform은 Map-Reduce 프로그래밍 방식에서 “Map” 단계와 유사하여 각 element에 function을 수행하고 PCollection에 element를 입력합니다.

5. event_name 별 테이블 구성는 Pipeline 만들기

전체적인 파이프 라인은 아래 그림과 같습니다.

Event별 테이블 구성도

5.1. Pipeline 생성을 위한 options 정의하기

일단, 아래 소스를 위한 여러 package를 import 해줍니다. 이후 pipline 생성을 위한 options를 정의합니다.

  • *_location: 해당 key에 대한 value 값들은 dataflow runner가 pipeline을 생성하면서 파일을 생성합니다. 해당 파일들은 pipeline을 진행하면서 참조하게 되는데요. 그 파일들이 단계별로 저장되는 저장소를 정의해주어야 하는데 GCS에 저장하도록 되어져 있습니다.
  • save_main_session: Beam의 runtime은 session base이기 때문에 해당 옵션이 없으면 global하게 import 하거나 정의된 상수를 읽어오지 못합니다. 해당 옵션을 추가해야합니다.

5.2. Input 데이터 source 정의하기

정의하는 파이프라인의 Input 데이터는 Bigquery의 테이블입니다.

5.3. main pipeline 정의하기

주된 파이프라인 소스부터 가볍게 설명하자면

  1. Bigquery에서 데이터를 읽고 (Extract)
  2. 읽은 데이터를 filter, processing 처리를 합니다. (Transform)
  3. 처리된 데이터를 Bigquery에 저장합니다. (Load)

5.3.1. ParDo

DoFn은 PCollection을 처리하는 로직을 포함하여 ParDo에 넘겨지는 객체입니다. process 메소드를 오버라이드하여 사용합니다.

위의 소스를 보면 함수호출 결과를 ParDo의 인자로 넘기는 코드를 볼 수가 있습니다.(ex. FilterRow, GetValueRow, FilterLatestDriving)

위의 함수는 event_name에 따라 row를 필터링 하는 DoFn입니다. process를 오버라이드 했습니다.

5.3.2. BigQueryTable 정의하기

event_name에 따라 output table을 지정해야합니다. 위의 코드는 모든 event_name에 따라 분기처리는 지정하지 않고 하나의 event_name만을 고려해서 작성된 코드입니다.

5.3.3. WriteToBigQuery Options

Bigquery 에 쓰기 위해서는 여러가지 옵션이 존재합니다. 그 중 눈여겨 볼 옵션은 create_disposition, write_disposition 입니다.

  • create_disposition: CREATE_IF_NEEDED은 결과를 저장할 테이블이 없다면 생성하는 옵션입니다.
  • write_disposition: WRITE_APPEND은 테이블에 데이터를 추가하는 방식입니다.

공식문서를 살펴보면 더 많은 옵션을 확인할 수 있습니다.

6. Bigquery + Beam 제약사항

: [WriteToBigquery 시 DateTime, Time datatype 미지원]

저는 이 부분에서 엄청 삽질을 했습니다. WriteToBigquery를 하기 위해서 table schema를 지정해야하는데 여기서 Python의 ateTime, Time datatype을 지원하지 않습니다. Stack Overflow를 확인하면 다음과 같은 방식을 제안하고 있습니다.

  • STRING으로 datetime을 저장 후, 실제 사용시에는 Bigquery side 작업을 한다.

제가 해 본 삽질은 아래와 같습니다.

  • Python DateTime > Bigquery DATETIME: Not Support
  • Python DateTime > Bigquery TIMESTAMP: OK
  • Python String(DateTime Format) > Bigquery STRING: OK

7. Bigquery + Beam(SDK Python) 느낀점

7.1. Java를 쓰는게 좋겠다.

저희 조직은 Python 친화적이여서 Python으로 PoC를 진행했습니다. 그렇지만, GCP Docs에서도 Beam을 사용할 때에는 Java SDK를 권장하고 있습니다. 실제로 PoC를 해보니 Java SDK를 쓰는게 좋겠다고 느껴지는 순간들이 있었습니다. 아래에 생각나는대로 기록해보았습니다. 그렇지만, 많이 사용해 본 것이 아니기 때문에 바로 Java SDK로 변경하지는 않으려 합니다.

  • SQLTransform 사용시 Python SDK는 미지원하는 것들이 있다고 느껴졌습니다.
  • Beam은 자료가 많이 부족한 편입니다. 그럼에도 불구하고 Python보다는 Java가 많다고 느껴졌습니다.

7.2. Spark를 쓰는게 나을지도?

이전 글에서 여러 사항을 고려하면서 Dataflow를 사용하기로 결정했습니다. 아무래도 적은 인원으로 사용, 관리 측면에서 서버리스라는 사항이 가장 매력적이었습니다.

그럼에도 불구하고 Spark를 쓰면 좋겠다고 생각했던 부분은 오히려 부족한 자료로 삽질하는 비용이 더 크다고 느껴졌기 때문입니다. 삽질은 물론 많은 경험을 하게 합니다만, 빠르게 개발해야하는 입장에서는 오히려 많은 자료가 더 도움이 된다고 느껴졌습니다.

7.3. Beam SQL은 가볍게 사용하는 정도처럼 느껴진다.

이 부분도 삽질을 정말 많이 했던 부분입니다. Beam SQL은 Zetasql, calcite sql을 사용할 수 있습니다. 저는 SQL을 통한 타입 변환을 주로 시도를 했었는데, 지원하지 않는 SQL이라는 에러 메세지를 종종 볼 수 있었습니다.

많은 예시를 보면 가벼운 집계를 위한 용도로만 사용하는 것처럼 보였습니다.

--

--