최근 다양한 이기종 Database에서 Cloud 환경의 Delta Lake 로의 데이터 마이그레이션 일을 spark 로 진행하고 있다. JDBC 연결로 Spark 에서 데이터 read & write 작업을 할 때 발생했던 data skew 현상과 이를 트러블 슈팅했던 경험을 공유하고자 한다.
Spark JDBC 연결
spark 에서 jdbc 연결을 통해 Database 의 table 데이터를 read 하는 작업은 아래와 같다. 별다른 추가 옵션을 주지 않고 spark 가 알아서 분산 처리를 할 것을 기대하고 다음과 같은 코드로 데이터를 읽어오는 작업을 하였다. 사용한 인스턴스는 Databricks 에서 aws ec2 r5.large 인스턴스 (core 2, 16GB) 로 driver node, worker node (min 3 - max 4) 로 실험하였다.
df = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Issue
read 작업의 속도는 초 단위로 이루어 졌다. 그러나 문제는 jdbc 연결로 읽어온 spark dataframe 을 parquet 형식으로 landing zone으로 두고 있는 S3 bucket 에 write 작업을 할때 발생했다. Source system 에 해당하는 데이터의 크기가 1GB 미만이었을 때는 데이터 크기가 상대적으로 작았기 때문에 r5.large 인스턴스 driver 3-4개 정도로도 충분히 처리를 할 수 있었다.
df.write.format(”parquet”).mode(”overwrite”).save(s3_bucket)
그러나 source system의 데이터의 크기가 1GB 정도를 넘어갔을 때, 아래와 같이 한 executor 에 데이터가 쏠리는 data skew 현상이 발생하였다. 실제로 metrics 를 모니터링 했을 때, 한 executor 의 CPU 사용률이 90% 를 넘어가고 인스턴스의 Out Of Memory 현상이 발생하여, 계속 spark job 이 failed 되었다. spark 도 몇번 재 실행을 해보다가 정해진 시도횟수의 default 값만큼 job failed 가 발생해서 결국 time out 에러가 발생하였다.
한편으로 다른 3개의 executor 의 CPU 사용률은 1% 대였으므로 분산처리가 균등하게 이루어 지지 않았다. 따라서 이와 같이 분산처리가 원활히 일어나지 않는 현상에 대해 트러블 슈팅해보았다.
해결 방법 1 : numPartitions 옵션 설정
jdbc connector 를 사용했을 때 분산처리를 하기 위해서 줄 수 있는 옵션들에 대해 찾아보았다.
"numPartitions" 옵션을 줄 수 있다. numPartitions 를 지정하기 위해서 따라 붙는 필수적인 옵션으로는 partitionColumn, lowerBound, upperBound 가 있다. 세 개는 반드시 지정해주어야 read 작업 시에 partition 을 줄 수 있다.
df = (
spark.read.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("user", user)
.option("password", password)
.option("dbtable", table)
.option("partitionColumn", "COL_NAME")
.option("numPartitions",4)
.option("lowerBound", "1")
.option("upperBound", "5000")
.load())
코드 예시는 위와 같다. 각각의 옵션들에 대해 조금 더 살펴보자.
1. partitionColumn
partition 을 나눌 key 값이며, numeric type 값들 (ex. double , integer, timestamp, date, long ...) 만이 partition column 이 될 수 있다.
2. numPartitions
테이블을 read & write 작업할 때 병렬 처리에 사용할 수 있는 최대 partition 들의 개수이다. JDBC connection에서 한번에 맺어지는 connection들의 최대값으로 작용하기도 한다. partition의 개수가 numPartitions 값을 초과하게 되면, spark 가 numPartitions 개수를 지정하는 best practice 나 계산 공식이 정해져있다기 보단, 테이블과 스파크 클러스터 스펙을 고려하여 상황에 맞는 파티션 수를 실험적으로 정해야 한다. 또한 사용하는 node 의 core 수를 고려하여 값을 주는 방법도 있다.
3. lowerBound, upperbound
read 작업 시에 파티션을 나누어서 가져올 데이터의 최솟값과 최대값을 각각 정해주는 옵션이다. 따라서 실제로 table column 에 있는 값들을 고려해서 선정해야 한다. 예를 들어서, date type column 을 partition column 으로 사용하는 경우에는 lowerbound와 upperbound 값이 날짜가 될 것이다.
해결 방법 2 : fetchsize 옵션 설정
다음으로 고려해볼 수 있는 사항은 fetchsize 옵션이다. 한번의 network round trip 이 일어나는 동안 데이블에서 가져올 레코드의 수를 의미한다. 예를 들어 아래와 같이 fetchsize 를 5000으로 주게 되면, JDBC 드라이버는 한번 database 에 요청을 날릴 때 5000개의 레코드를 가져온다. fetchsize 값이 너무 작으면 네트워크 호출이 빈번해지고, 너무 값이 크면 spark memory 사용량이 많아질 수 있으므로 적절히 fetchsize 를 주어야 한다.
.option("fetchsize", 5000)
해결 방법 3 : Spark Cluster 스펙 조절하기
나의 경우 AWS + Databricks 클라우드 환경에서 spark cluster 를 띄워서 사용하여 작업하고 있다. 따라서 EMR 이나, public cloud service에서 spark cluster를 사용하는 경우에도 적용해 볼 수 있을 것 같다.
1. auto scaling 을 사용하는 경우 min, max 값 조절하기
cluster 에서 data skew 현상으로 계속 인스턴스에 OOM (Out Of Memory)이 일어나게 되었다. auto scaling 을 거는 경우 min 값 을 늘려서, 클러스터 실행시 처음부터 인스턴스 개수를 넉넉하게 잡아두는 방법이 초반부터 OOM 이 바로 일어나는 현상을 방지할 수 있었다. 예를 들어 min 1, max 4 였다면 이를 min 3, max 4 이런식으로 auto scaling 을 조절했다.
2. spot instance 보다는 on demand type 의 인스턴스 의 개수를 늘리기
spot instance 의 특성상 비용이 저렴하지만 instance 를 놓치게 될 수도 있다. 이 경우에도 spark 가 메모리를 많이 필요로 하는 작업을 할 때 OOM의 위험 부담이 있다. 따라서 on demand type의 인스턴스를 적절히 미리 주는 것이 하나의 해결방법이 될 수 있다.
3. Spark executor node 의 스펙을 scale up 하기
Spark executor node 의 스펙을 상향시켜주는 scale up 을 고려해볼 수 있다. Core 수와 메모리 사양이 충분한 인스턴스를 선택했는지 확인해보아야 한다. 병렬처리를 위해 core수가 높은 인스턴스를 고르는 것이 유리하다. 또한 spark 가 memory 에서 작업을 수행하므로 memory 크기도 충분한지 고려해야 한다.
SQL 문과 함께 병렬 처리하기
한편, option("query", "<query>") 는 "numPartitions" 를 주는 option 과 같이 쓸 수 없다.
따라서 파티셔닝 작업과 sql query 를 둘 다 이용하고 싶은 경우 "dbtable" 이라는 option을 줄 수 있다. dbtable 자체는 "select * from {dbtable}" 이라는 query 형식으로 날아가게 된다. 따라서 dbtable 에 사용하고자 하는 sql query 문을 담는다면 이는 subquery 로 처리할 수 있다. 예를 들어 아래와 같이 작성할 수 있따.
read 작업 시 전체 테이블을 읽는 것이 아니라 sql query 문으로 filter 를 준 후 데이터를 읽어들이는 작업 시에 사용할 수 있는 대안이다.
query = """(select *
from test_table
where date between 2023-01-01 and 2023-12-31) as q"""
df = (
spark.read.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("user", user)
.option("password", password)
.option("dbtable", query) # dbtable option
.option("partitionColumn", "COL_NAME")
.option("numPartitions",4)
.option("lowerBound", "1")
.option("upperBound", "5000")
.option("fetchsize", 5000)
.load())
결론 및 느낀점
결론적으로 data skew 현상을 해결하기 위해서는 다음의 3가지 방법을 사용하였다.
- 적절한 partition column 을 찾아서 read 작업 시 파티셔닝 진행
- fetchsize 옵션 설정
- 클러스터 옵션 및 적절한 인스턴스 선택하기
또한 write 작업 시 repartition 을 주면 병렬처리를 적절하게 수행할 것이라고 가정하였는데, 오히려 read 할때부터 executor 들이 적절한 만큼의 데이터를 분배하게 하는 것이 관건이었다. jdbc 연결을 통하지 않고, S3 에서 spark read 로 dataframe 을 가져온 경우에는 repartition 이 잘 적용이 되고 있는 것을 확인했다. 그런데 jdbc 연결시에는 기본적으로 하나의 thread 만을 사용하여 연결을 생성한다. 따라서 data skew 현상이 발생하고 이게 write 작업시에도 성능 영향을 주는 것 같다.
위의 3가지 방법을 사용하여 데이터 마이그레이션에 성공하였으나, 결국에는 spark tuning 은 실험적으로 이루어 지게 되었다. 테이블마다 각각 특성이 다르므로, 각 상황에 맞게 조절하는 스킬이 필요했다.
마지막으로, spark 옵션을 주어서 튜닝하는 것 자체만으로도 리얼에서의 문제를 모두 해결하기는 어려울 수 있다. spark jdbc connector 로 database에 대해 작업을 할 때 numpartition 이나 dbtable 같은 query 문은 결국 subquery 들이 원천 database로 질의 되는 것이다. 따라서 원천 database에 부하를 줄 수 밖에 없으며, 원천 database가 운영계인 상황에서는 더욱 주의를 해야한다.
Reference
https://docs.databricks.com/en/connect/external-systems/jdbc.html
https://spark-korea.github.io/docs/sql-data-sources-jdbc.html
https://medium.com/@pintoiu.gabriel/spark-concurrent-jdbc-data-reads-5423552c93f5
'Data Engineering > Apache Spark' 카테고리의 다른 글
Spark 성능 튜닝 기법 정리 (0) | 2024.07.07 |
---|---|
[Spark] Spark Streaming 과 Structure Streaming 비교하기 (0) | 2024.03.03 |
[Spark] Ch.15 클러스터에서 스파크 실행하기 (0) | 2024.01.04 |
[Spark] 스파크 완벽 가이드 Ch09. 데이터 소스 (0) | 2023.11.27 |
[Spark] 스파크 완벽 가이드 04 - Ch 04. 구조적 API 개요 (0) | 2023.11.07 |