Delta Live Table 이란?
Databricks 에서 데이터 파이프라인을 관리하고 자동화하는 서비스이다. Delta Lake 및 Spark 와 연동되어 스트리밍 방식과 배치 방식을 모두 제공한다. DLT 는 console에서 오토 스케일링, 스케줄링, 알림 기능을 제공한다. Delta Live Table 으로 파이프라인을 구성할 때는 databricks 에서 제공하는 dlt 라이브러리를 사용해야 하며, 일반 spark cluster 가 아닌 Delta Live Table Pipeline 에서 구성 및 실행해야 한다는 특징이 있다.
Delta Live Table 가 등장하게된 배경은 Python 뿐만 아니라 SQL 쿼리로도 '스트리밍' 처리를 할 수 있게 한다는 아이디어에서 나왔다고 한다. 엔지니어 뿐만 아니라 분석가들도 쉽게 처리를 할 수 있게 위함이라고 생각된다. DLT 라이브러리가 지원하는 선언적인 문법으로 파이프라인을 구성할 수 있어서 간단한 파이프라인 구성에서 효율적이다. 그렇지만 pure spark 에서 제공하는 모든 기능을 유연하게 사용하기에는 한계점이 존재한다.
또한 Delta Live Table 은 이름에서 Table 이라고 하여 단순히 '테이블'이라고 인지할 수 있지만 이보다는 '파이프라인'을 개발하는 서비스라고 이해하는 것이 관점이 적합한 것 같다.
Batch 방식으로 데이터 적재하기
Delta Live Table 도 Spark Engine 위에서 동작하므로 Spark가 지원하는 배치 방식과 구조적 스트리밍을 지원한다. csv 파일을 bronze table 에 배치 적재하는 delta live table 예시 코드는 다음과 같다.
import dlt
from pyspark.sql.types import StructType, StringType, DateType, StructField
from datetime import datetime
today_dir = datetime.today().strftime("%Y%m%d")
rules = {
"valid_violation_county" : "violation_county IS NOT NULL"
}
schema = StructType([
StructField("summons_number", StringType(), True),
StructField("plate_id", StringType(), True),
StructField("registration_state", StringType(), True),
StructField("plate_type", StringType(), True),
StructField("issue_date", DateType(), True),
StructField("violation_code", StringType(), True),
StructField("vehicle_body_type", StringType(), True),
StructField("vehicle_make", StringType(), True),
StructField("issuing_agency", StringType(), True),
StructField("street_code1", StringType(), True),
StructField("street_code2", StringType(), True),
StructField("street_code3", StringType(), True),
StructField("vehicle_expiration_date", StringType(), True),
StructField("violation_location", StringType(), True),
StructField("violation_precinct", StringType(), True),
StructField("issuer_precinct", StringType(), True),
StructField("issuer_code", StringType(), True),
StructField("issuer_command", StringType(), True),
StructField("issuer_squad", StringType(), True),
StructField("violation_time", StringType(), True),
StructField("time_first_observed", StringType(), True),
StructField("violation_county", StringType(), True),
StructField("violation_in_front_of_or_opposite", StringType(), True),
StructField("house_number", StringType(), True),
StructField("street_name", StringType(), True),
StructField("intersecting_street", StringType(), True),
StructField("date_first_observed", StringType(), True),
StructField("law_section", StringType(), True),
StructField("sub_division", StringType(), True),
StructField("violation_legal_code", StringType(), True),
StructField("days_parking_in_effect", StringType(), True),
StructField("from_hours_in_effect", StringType(), True),
StructField("to_hours_in_effect", StringType(), True),
StructField("vehicle_color", StringType(), True),
StructField("unregistered_vehicle", StringType(), True),
StructField("vehicle_year", StringType(), True),
StructField("meter_number", StringType(), True),
StructField("feet_from_curb", StringType(), True),
StructField("violation_post_code", StringType(), True),
StructField("violation_description", StringType(), True),
StructField("no_standing_or_stopping_violation", StringType(), True),
StructField("hydrant_violation", StringType(), True),
StructField("double_parking_violation", StringType(), True),
StructField("latitude", StringType(), True),
StructField("longitude", StringType(), True),
StructField("community_board", StringType(), True),
StructField("community_council", StringType(), True),
StructField("census_tract", StringType(), True),
StructField("bin", StringType(), True),
StructField("bbl", StringType(), True),
StructField("nta", StringType(), True)
])
@dlt.table(
comment="raw data from parking",
schema=schema
)
@dlt.expect_all(rules)
def car_bronze_table():
return (
spark.read.csv(f"/mnt/parking-small/{today_dir}", header=True, schema=schema)
)
코드를 살펴보면 spark 에서의 batch 방식과 동일하게 작성하면 된다. dlt 문법에서 @dlt.table 어노테이션을 지정한 후, car_bronze_table 이라는 함수를 작성하게 되면 car_bronze_table 이라는 테이블이 생성된다.
제대로 적재가 되었는지 확인해보기 위해 원본 경로에서 dataframe 의 row 수를 읽어보면 다음과 같다.
그후 위에서 생성한 car_bronze_table 의 row 수를 count 하면 아래와 같다. 이로써 Delta Live Table을 통해 모든 데이터가 source 경로로부터 적재가 된 것을 확인할 수 있다.
한가지 주의할 점은, batch 방식으로 적재하는 경우 write mode 는 기본적으로 "overwrite" 가 된다. 예를 들어, 위의 Delta Live Table 파이프라인을 여러번 실행 시킨 경우 source로부터 읽어들인 데이터가 증분 적재 방식으로 쌓이는 것이 아니라 처음부터 끝까지 다시 덮어쓰는 방식으로 테이블에 적재된다. 따라서 증분 적재에는 적합하지 않을 수 있다.
Delta Live Table에서 Streaming Live Table 로 스트리밍 데이터 적재
'스트리밍 라이브 테이블' 로 생성하는 예시 코드는 다음과 같다. 이 코드의 경우, 내부적으로는 Databricks의 AutoLoader 기능이 작동하게 된다.
rules = {
"valid_violation_county" : "violation_county IS NOT NULL"
}
@dlt.table(
comment="Using AutoLoader"
)
schema = StructType([
StructField("summons_number", StringType(), True),
StructField("plate_id", StringType(), True),
StructField("registration_state", StringType(), True),
StructField("plate_type", StringType(), True),
StructField("issue_date", DateType(), True),
StructField("violation_code", StringType(), True),
StructField("vehicle_body_type", StringType(), True),
StructField("vehicle_make", StringType(), True),
StructField("issuing_agency", StringType(), True),
StructField("street_code1", StringType(), True),
StructField("street_code2", StringType(), True),
StructField("street_code3", StringType(), True),
StructField("vehicle_expiration_date", StringType(), True),
StructField("violation_location", StringType(), True),
StructField("violation_precinct", StringType(), True),
StructField("issuer_precinct", StringType(), True),
StructField("issuer_code", StringType(), True),
StructField("issuer_command", StringType(), True),
StructField("issuer_squad", StringType(), True),
StructField("violation_time", StringType(), True),
StructField("time_first_observed", StringType(), True),
StructField("violation_county", StringType(), True),
StructField("violation_in_front_of_or_opposite", StringType(), True),
StructField("house_number", StringType(), True),
StructField("street_name", StringType(), True),
StructField("intersecting_street", StringType(), True),
StructField("date_first_observed", StringType(), True),
StructField("law_section", StringType(), True),
StructField("sub_division", StringType(), True),
StructField("violation_legal_code", StringType(), True),
StructField("days_parking_in_effect", StringType(), True),
StructField("from_hours_in_effect", StringType(), True),
StructField("to_hours_in_effect", StringType(), True),
StructField("vehicle_color", StringType(), True),
StructField("unregistered_vehicle", StringType(), True),
StructField("vehicle_year", StringType(), True),
StructField("meter_number", StringType(), True),
StructField("feet_from_curb", StringType(), True),
StructField("violation_post_code", StringType(), True),
StructField("violation_description", StringType(), True),
StructField("no_standing_or_stopping_violation", StringType(), True),
StructField("hydrant_violation", StringType(), True),
StructField("double_parking_violation", StringType(), True),
StructField("latitude", StringType(), True),
StructField("longitude", StringType(), True),
StructField("community_board", StringType(), True),
StructField("community_council", StringType(), True),
StructField("census_tract", StringType(), True),
StructField("bin", StringType(), True),
StructField("bbl", StringType(), True),
StructField("nta", StringType(), True)
])
@dlt.expect_all(rules)
def car_bronze_auto():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.schema(schema)
.load("/mnt/parking-small2")
)
위의 Delta Live Table 파이프라인을 실행한 결과는 아래와 같다. 파이프라인을 실행 시키는 모드는 trigger 와 continuous 로 2가지가 있으며 trigger 는 일회성으로 동작하고 , continuous는 계속해서 인스턴스가 떠있으며 스트리밍 데이터를 계속 받아오는 방식이다. 아래 결과의 경우 trigger 방식으로 실행하였다.
Batch 방식과 다르게 Streaming Live Table의 경우 write mode 의 기본 설정 값이 "append only" 이므로 증분적재가 필요한 경우, 적재를 한 시점까지를 기억하고, 적재하지 않은 새로운 데이터에 대해 테이블에 적재할 수 있다.
데이터 수집에서 효율적인 데이터 모니터링 기능
Delta Live Table 을 실제 프로젝트에서 파이프라인 구축 시에 사용하려고 고려했던 이유 중 하나는 DLT 에서 제공하는 데이터 모니터링 기능이 었다.
위의 두 예시에서도 확인할 수 있는데, 어떠한 규칙으로 데이터를 모니터링할 것인지 key : value 형식으로 지정할 수 있다.
또한 이 유효성 검사 기준들에 대해서 위반하는 경우 어떻게 처리할지에 대해서도 지정할 수 있다.
rules = {
"valid_violation_county" : "violation_county IS NOT NULL"
}
@dlt.expect_all(rules)
Delta Live Table 에서 조건들을 주고, 데이터 퀄리티를 체크한 것을 아래와 같이 모니터링 할 수 있다.
데이터를 모니터링하고, 규칙을 위반하는 데이터 (record) 를 어떻게 처리할지의 조건들은 아래와 같다.
1. expect
- expect의 경우 조건을 위반하는 record가 있더라도 모니터링 결과에만 반영되고, 실제 target table에는 위반된 데이터도 적재한다.
@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")
2. expect_or_drop
- 조건을 위반하는 record는 drop 하여, target table에 적재하지 않는다.
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
3. expect_or_fail
- 조건을 위반하는 record가 들어온 경우 해당 파이프라인의 실행을 멈추고, 트랜잭션을 roll back 시킨다.
@dlt.expect_or_fail("valid_count", "count > 0")
4. expectation 여러개 두기
- 위의 조건에서 여러개의 제약조건을 dictionary 타입으로 지정하는 방식은 다음과 같다.
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Reference
https://docs.databricks.com/en/delta-live-tables/expectations.html
'Data Engineering > Databricks & Delta Lake' 카테고리의 다른 글
[Databricks] 데이터브릭스의 Unity Catalog 기능에 대해 살펴보기 (0) | 2024.03.31 |
---|---|
[TIL] Delta Table에 upsert 하기 (0) | 2024.02.14 |
[Databricks] 데이터브릭스 무료 버전 사용하는 방법 (Databricks Community Edition) (0) | 2023.10.29 |
[AWS/Databricks] Databricks Workspace Private Link로 구성하기 - Private Link를 사용해야 하는 이유 & VPC Endpoint 유형 (0) | 2023.07.03 |
[Databricks] DLT Pipeline 으로 AWS DMS CDC 구현하기 (0) | 2023.06.22 |