국내에 Databricks 관련 블로그 글이나 문서가 많지 않아서 직접 공부하거나 테스트 해본 내용들을 블로그에 조금씩 정리해보려고 한다..!
Databricks DLT Table 이란 ?
데이터브릭스에서 안정적으로 배치 및 스트리밍 데이터를 위한 파이프라인을 구축하고 관리할 수 있도록 만들어 놓은 기능이다. 스트리밍 데이터에 강하다보니, CDC가 필요한 경우에도 주기적으로 스트리밍 데이터를 당겨오면 Delta Live Table에 변경 사항을 반영할 수 있다. 문제점은 .. 편리한 대신에 가격이 비싸다고 한다
https://www.databricks.com/kr/product/delta-live-tables
Databricks DLT로 CDC 구현하기
Auto Loader 를 사용하여 Cloud에 저장되어 있는 스토리지에서 바로 스트리밍 데이터를 읽어오는 DLT Pipeline을 구축하는 예제이다 Source Database 는 Replica set 으로 구축한 MongoDB 클러스터이다. 목표하고자 하는 작업은 데이터 full load + CDC 작업이다. 이를 위해 AWS에서 MongoDB -> S3 Bucket 으로 DMS 를 통해 document 형식으로 데이터 마이그레이션을 진행했다.
전체 코드는 아래와 같다.
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
@dlt.table
def mongodb_cdc_raw():
return spark.readStream.format("cloudFiles")\
.option("header", "true")\
.option("cloudFiles.format", "csv")\
.schema("Op STRING, dmsTimestamp TIMESTAMP, _id STRING, _doc STRING")\
.load("s3://BUCUCKET_DIR")
dlt.create_streaming_live_table(
name="mongodb",
comment="Data from DMS for the table: mongodb_cdc_raw"
)
dlt.apply_changes(
target = "mongodb",
source = "mongodb_cdc_raw",
keys = ["_id"],
sequence_by= col("dmsTimestamp"),
apply_as_deletes = expr("Op = 'D'"),
except_column_list = ["Op", "dmsTimestamp"],
stored_as_scd_type = 1
)
코드를 자세히 살펴보자 !!
@dlt.table
def mongodb_cdc_raw():
return spark.readStream.format("cloudFiles")\
.option("header", "true")\
.option("cloudFiles.format", "csv")\
.schema("Op STRING, dmsTimestamp TIMESTAMP, _id STRING, _doc STRING")\
.load("s3://BUCUCKET_DIR")
@dlt.table 으로, python 의 데코레이터 패턴이다. 데코레이터 패턴은 객체의 결합을 통해 기능을 유연하게 확장할 수 있는 디자인 패턴으로, 추가하고 싶은 기능을 Decorater 클래스로 정의하고, 필요한 기능을 추가적으로 조합하여 사용하는 패턴이다.
즉, 위의 경우 dlt 타입으로 생성할 table을 정의하는 함수를 파라미터로 주어서 조합하여 사용하게 된다. 위의 경우 함수의 이름인 mongodb_cdc_raw() 가 그대로 테이블 이름으로 생성되게 된다.
또한 주의할 점은 현재 pyspark를 통해 source에서 읽어오는 스트림 데이터의 형식이 .csv 이므로, schema를 명시적으로 주어야 한다.
dlt.create_streaming_live_table(
name="mongodb",
comment="Data from DMS for the table: mongodb_cdc_raw"
)
mongodb 라는 dlt table을 만드는 부분이다. 경우에 따라 raw 데이터를 가공하는 테이블이 되므로 silver layer 라고 볼 수도 있다. dms 에서 당겨오는 raw data들이 mongodb_cdc_raw에 적재되고, mongodb_cdc_raw 에서 mongodb 로 반영이 된다.
dlt.apply_changes(
target = "mongodb",
source = "mongodb_cdc_raw",
keys = ["_id"],
sequence_by= col("dmsTimestamp"),
apply_as_deletes = expr("Op = 'D'"),
except_column_list = ["Op", "dmsTimestamp"],
stored_as_scd_type = 1
)
apply_changes() 는 databricks dlt에서 MERGE INTO 와 유사한 기능을 하는 함수이다. target 은 apply change가 되고 결과값을 담는 테이블이 되고, source 는 원천 데이터를 가지고 있는 테이블이다.
여기서 필수적인 값들은 keys와 sequence_by 이다. keys 는 unique 한 값을 갖는 PK가 되어야 한다.
또한 dmsTimestamp의 경우에는 CDC가 반영되기 위해서, Database 내부에서 연산(INSERT, DELETE, UPDATE...)이 실행되어야 할 시간, 혹은 순서가 된다. 당연한 말이지만, 연산을 실행할 순서를 알아야 테이블의 무결성을 해치지 않고, 문제없이 실행할 수 있다 !
stored_as_scd_type 에서 SCD (Slowly Changing Dimensions) 의 타입을 지정할 수 있다.
- type 1 : 새로운 데이터를 적재할 때 기존 데이터를 overwite 하여, 과거의 값은 없애고 현재의 값만 적용시키는 타입
- type 2 : 행 단위로 관리. Flag로 새로 들어왔다는 status 를 나타낸다.
- type 3 : 이전에 적용한 열을 추가해 주는 방법 ex) 부서, 이전 부서
'Data Engineering > Databricks & Delta Lake' 카테고리의 다른 글
[Databricks] Delta Live Table 로 파이프라인 개발 & 데이터 퀄리티 모니터링하기 (0) | 2024.04.14 |
---|---|
[Databricks] 데이터브릭스의 Unity Catalog 기능에 대해 살펴보기 (0) | 2024.03.31 |
[TIL] Delta Table에 upsert 하기 (0) | 2024.02.14 |
[Databricks] 데이터브릭스 무료 버전 사용하는 방법 (Databricks Community Edition) (0) | 2023.10.29 |
[AWS/Databricks] Databricks Workspace Private Link로 구성하기 - Private Link를 사용해야 하는 이유 & VPC Endpoint 유형 (0) | 2023.07.03 |