728x90
https://docs.databricks.com/en/delta/merge.html#language-python
데이터를 증분적재해야 하는 경우 merge() 혹은 MERGE INTO sql 문을 사용할 수 있다.
[Python Code Snippet]
(targetDF.alias("t") # DeltaTable 이 Target이 되어야 한다.
.merge(sourceDF.alias("s"), "s.key = t.key") # merge 조건을 정한다. Source는 DataFrame이다.
.whenMatchedUpdateAll() # key값이 동일한 경우 모두 변경 반영
.whenNotMatchedInsertAll() # key값이 없는 경우 모두 insert
.whenNotMatchedBySourceDelete() # source에 없는 경우 delete
.execute()
)
단, 여기서 targetDF는 DeltaTable 이어야하고, UPSERT 를 하고 싶은 데이터는 sourceDF로 DataFrame이어야 한다. 생각해 보면 작업하기 편하도록 merge 함수를 만들었다고 이해할 수 있다. 업데이트 및 적재의 타겟이 되는 것은 DeltaTable이 될 것이고, 새롭게 배치 작업을 통해 들어오게 되는 데이터는 DataFrame으로 읽어들여서 DeltaTable에 Upsert 연산을 할 것이기 때문이다.
동일한 작업에 대해 SQL 으로 작성하는 경우는 아래와 같다.
[SQL Code Snippet]
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
728x90
'Data Engineering > Databricks & Delta Lake' 카테고리의 다른 글
[Databricks] Delta Live Table 로 파이프라인 개발 & 데이터 퀄리티 모니터링하기 (0) | 2024.04.14 |
---|---|
[Databricks] 데이터브릭스의 Unity Catalog 기능에 대해 살펴보기 (0) | 2024.03.31 |
[Databricks] 데이터브릭스 무료 버전 사용하는 방법 (Databricks Community Edition) (0) | 2023.10.29 |
[AWS/Databricks] Databricks Workspace Private Link로 구성하기 - Private Link를 사용해야 하는 이유 & VPC Endpoint 유형 (0) | 2023.07.03 |
[Databricks] DLT Pipeline 으로 AWS DMS CDC 구현하기 (0) | 2023.06.22 |