'스파크 완벽 가이드' 책에서는 스파크 성능 향상의 기법을 크게 간접적/ 직접적인 기법으로 나누어 설명하고 있다. 또한 사용자가 제어 가능한 범위 내에서 튜닝 기법들을 소개하고 있다. 19장의 내용 중 핵심 내용을 요약과 중요한 부분을 더 정리 해보았다.
1. 간접적인 스파크 성능 향상 기법
1.1 설계 방안
scala vs java vs python vs R
- 구조적 API 로 해결이 되지 않아, RDD 트랜스포매이션이나 UDF 를 사용해야 하는 경우 R , Python 의 사용은 피하는 것이 좋다.
- Python 에서 RDD 코드를 실행하게 되면, Python Process 를 오가는 데이터들을 직렬화 하면서 비용이 크게 발생하고, 안정성이 떨어지게 된다.
- Spark 에서 직렬화란 : 객체를 바이트 스트림으로 변환하여 네트워크를 통해 전송하거나 디스크에 저장하는 프로세스이다. Spark 에서는 클러스터에 속한 여러 노드들이 서로 데이터를 주고 받아야 하므로 직렬화가 필요하다.
DataFrame vs SQL vs Dataset vs RDD
- DataFrame 은 어떤 언어에서 사용하더라도 성능은 동일하다.
- UDF 를 정의해서 사용해야 하는 경우 java 또는 scala 를 사용하는 것이 Python , R 을 사용하는 것 보다 성능이 낫다.
- 그렇지만 근본적으로 성능 개선을 하기 위해서는 UDF 사용을 피하고 DataFrame 이나 SQL 을 사용해야 한다.
1.2 RDD 객체 직렬화
RDD 직렬화 라이브러리로 Kyro 를 활용하여 직접 정의한 데이터 타입을 직렬화 할 수 있다. Java 직렬화보다 빠르고 더 적은 메모리를 사용하지만 사용자 정의 클래스에 대해 등록해야 한다.
1.3 클러스터의 설정
클라우드 환경이라면, spark 작업에 맞는 클러스터 사양을 선택하는 것이 비용 효율적이다. 인스턴스 선택 전에 앞으로 실행하게 될 스파크 job 이 어떤 특성을 가졌는지 파악하는 것이 중요하다고 생각한다. core 수를 많이 써야 하는 연산인지 단순히 메모리가 많이 필요한 작업인지 판단 후에 worker의 수, 각 driver / worker 노드의 사양을 조절했던 경험이 있다.
- CPU 집약적인 작업 : 계산이 많이 필요한 작업의 경우 CPU 사양이 높은 인스턴스를 사용한다. -> 맵리듀스, 트랜스포메이션 연산 (filter, map, flatMap... ), ML 작업 (ex. AWS의 C5, C5n)
- 메모리 집약적인 작업 : 데이터가 메모리에 많이 상주해야 하는 작업을 의미한다. 예를 들어 대규모 데이터 셋을 메모리에 로드해서 처리해야 하는 작업이 있다. -> JOIN 연산이 많이 일어나는 경우, 집계 연산 (groupBy, reduceByKey, aggregateByKey 등 ), 데이터를 메모리에 캐시 작업 (cache, persist )하여 여러번 사용해야 하는 경우 (ex. AWS의 R5 인스턴스)
- I/O 집약적인 작업 : 디스크, 네트워크 I/O 가 많이 발생하는 작업. 대규모 파일에 대해 읽기/쓰기 작업, 데이터 마이그레이션등이 해당한다.
- 일반적인 ETL 작업 : CPU, 메모리, I/O 모두 적절한 수준의 성능이 필요한 작업에는 균형이 잡힌 인스턴스를 사용할 수 있다.
1.4 데이터 보관 시, 효율적인 데이터 포맷 선택하기
저장된 데이터에 자주 접근해야 한다면 당연히 효율적인 데이터 포맷을 선택해서 저장하는 것이 효과적이다.
- csv 같은 파일은 파싱 속도가 느리고 파일을 읽을 때 개행 문자를 처리해 주어야 하는 등의 문제가 있다. 반면, parquet 는 spark 엔진에 최적화된 기본 포맷으로 데이터를 바이너리 형식으로 저장되며, 컬럼별로 데이터를 저장한다. 쿼리에서 사용하지 않는 데이터는 빠르게 건너 뛸 수 있도록 통계를 함께 저장한다.
열 기반으로 데이터를 저장하면 행 기반 저장 방식에 비해 데이터 압축률이 더 높다. 또한 select 문과 같이 데이터를 읽는 경우에 필요한 열의 데이터만 읽어서 처리하는 것이 가능하므로 효율적이다.
1.5 데이터 보관 시, 분할 가능한 데이터 포맷 선택하기
- 분할 가능한 파일 포맷을 선택한다. 여러 태스크가 파일의 서로 다른 부분을 '동시에' 읽을 수 있어야 병렬성을 보장할 수 있기 때문이다. JSON 의 경우 분할이 불가능한 파일 포맷이므로, 단일 머신에서 전체 파일을 읽는다. 이런 경우 병렬성이 떨어진다.
- 분할 가능한 압축 포맷을 선택한다. ZIP, TAR 압축 포맷은 분할할 수 없다. 예를 들어, 10개의 ZIP 파일과 10개의 core가 있더라도 1개의 core에서 ZIP 파일을 읽게 된다. 따라서 분할 가능한 gzip, bzip2 등을 이용하는 것이 좋다.
1.6 테이블 파티셔닝
데이터에서 특정 키를 기준으로 개별 디렉토리에 파일을 저장하는 것을 의미한다. 흔한 예로, 날짜 필드 같은 키를 기준으로 파티셔닝 하여 데이터를 저장하는 것이 있다. 자주 사용하는 키를 기준으로 데이터가 분할 된다면, 전체 데이터를 읽을 필요 없이 불필요한 범위는 건너뛰고 특정 범위의 데이터만 읽으면 되므로 쿼리를 효율적으로 처리할 수 있다.
1.7 버켓팅
버켓팅을 통해서 JOIN, 집계 연산 수행 이전에 데이터를 전체 파티션에 대해 균등하게 분할 할 수 있다. 동일한 버킷 ID를 가진 데이터가 하나의 물리적인 파티션에 모두 모여 있게 되므로, JOIN 연산이나 집계 연산 수행 이전에 발생할 오버헤드가 큰 셔플을 방지할 수 있다.
Q. 버켓팅과 파티셔닝의 차이는 ?
A. 파티셔닝은 특정 컬럼의 값에 따라서 데이터를 분할하는 반면 파티셔닝은 특정 컬럼의 해시 값을 기반으로, 고정된 수의 버킷에 데이터를 분할한다. 따라서 파티셔닝의 경우 파티션 key 값에 따라 데이터가 균등하게 분포된다면 효과적이지만 버켓팅의 경우 해시 함수를 사용하므로 꼭 데이터가 균등하게 분포되어 있지 않아도 균일하게 분산시킬 수 있다.
파티셔닝의 경우 특정 파티션만 읽어도 되므로, 데이터 조회시 필터링 성능이 향상된다. 버켓팅의 경우 동일 버킷에 있는 데이터끼리 조인되므로 조인 연산에 효율적이다. 파티셔닝은 디렉토리 구조로 데이터를 저장하고 버켓팅은 파일 수가 고정되어 있으면 각 파일이 고정된 버킷을 의미한다.
1.8 파일 수와 파일의 크기 고려하기
HDFS에서 기본적으로 128MB 크기의 블록으로 데이터를 관리하지만, 데이터 저장시의 trade off 를 감안해야 한다.
small file problem : 작은 파일이 많은 경우에 발생하는 성능 문제이다. 스케줄러가 많은 수의 파일을 찾아서 읽기 태스크를 실행 해야 하므로 네트워크와 스케줄링에 오버헤드가 발생하고 과도하게 태스크가 생성되는 문제가 발생한다.
large file problem : 반대로 적은 수의 대용량 파일이 있는 경우에 부하를 줄일 수 있지만, 태스크의 수행시간이 길어질 것이다.
1.9 데이터 지역성
- 스파크 클러스터에서 데이터 블록을 교환하지 않고, 특정 노드에서 동작하도록 지정하는 것이다. 최대한 가까운 노드에서 작업을 처리하는 것을 의미한다.
1.10 메모리 부족과 가비지 컬렉션
Spark Executor 또는 Driver의 메모리가 부족하면 태스크를 완료하지 못할 수 있다.
- 메모리를 너무 많이 사용한 경우
- 가비지 컬렉션이 자주 수행되는 경우
- JVM 객체가 너무 많이 생성되어 가비지 컬렉션이 정리하면서 실행 속도가 느려지는 경우
-> Spark 구조적 API 를 사용하면 JVM 객체를 생성하지 않고, spark SQL 은 내부 포맷으로 연산을 수행하므로 메모리를 효율적으로 쓸 수 있다.
- JVM 객체가 너무 많이 생성되어 가비지 컬렉션이 정리하면서 실행 속도가 느려지는 경우
2. 직접적인 스파크 성능 향상 기법
2.1 병렬화
책에 따르면, 특정 스테이지의 처리 속도를 높이기 위해서는 병렬성을 높이는 작업부터 해야 한다고 한다. spark cluster 의 core 수에 따라 spark.shuffle.partitions 와 spark.default.parallelism 을 설정해주어야 한다.
-> But, 개인적인 경험에서 큰 효과를 본 적이 아직은 없었고 Databricks SA 분의 말에 의하면 이런 설정값을 건드리는것을 피하는 것이 좋다고 들었음. 오히려 인스턴스 조절이나 spark 코드에서 파티셔닝 옵션을 주는 것을 추천했음
2.2 최대한 데이터를 필터링하기
최대한 이른 시점에 많은 양의 데이터를 필터링해야 한다. 복잡한 연산 이전에 처리할 데이터 양을 최소화하면 스파크 잡을 효율적으로 처리할 수 있다. 스파크 뿐 만 아니라 sql 쿼리 튜닝 시에도 해당되는 이야기 같다.
2.3 파티션 재분배와 병합
가능한한 적은 양의 데이터를 셔플하는 것이 효율적이다. 셔플 대신 coalesce() 를 통해 전체 파티션의 수를 줄인다.
repartition()은 데이터를 셔플링하게 되지만, JOIN 연산이나 cache() 호출 시에 효율적이다. 즉 전체적인 스파크 job 의 성능을 개선하고 병렬성을 향상시킬 수 있다.
2.4 UDF 사용 피하기
UDF 사용을 최대한 피하고, spark의 구조적 API를 최대한 활용해야 한다. UDF는 데이터를 JVM 객체로 변환하고, 레코드 당 여러번 수행해서 많은 자원이 필요하기 때문이다.
2.5 재사용 데이터셋 캐싱하기
동일한 데이터셋을 재사용해야 하는 작업이 있다면, 캐싱(cache() 함수 호출)을 통해 최적화할 수 있다.
2.6 조인 작업 최적화
- 조인 순서 변경과 같이 코드에서 성능을 개선할 수 있다.
- 카테시안 조인, 외부 조인 사용을 최대한 피한다.
2.7 집계
집계 연산 이전에, 많은 수의 파티션을 가질 수 있도록 데이터를 필터링해야 한다.
2.8 브로드 캐스트 변수
Job 마다 데이터 조각들을 재전송하는 과정을 생략할 수 있으므로, 브로드캐스트 조인과 변수를 통해 스파크 잡을 최적화 할 수 있다.
3. 결론
1. 파티셔닝, 효율적인 파일 포맷을 사용하자. 그리고 가능한 작은 데이터만 읽는다.
2. Data Skew 현상을 방지한다. -> 병렬성 보장과 파티셔닝을 사용한다.
- Data Skew 현상을 해결했던 경험을 정리해 보았습니다.
3. 구조적 API 를 최대한 활용한다. 구조적 API 가 Spark 버전 향상에 따라 최적화된 코드를 제공하기 때문이다.
'Data Engineering > Apache Spark' 카테고리의 다른 글
[Spark] Spark JDBC 연결시 발생하는 data skew 현상 해결하기 (0) | 2024.05.10 |
---|---|
[Spark] Spark Streaming 과 Structure Streaming 비교하기 (0) | 2024.03.03 |
[Spark] Ch.15 클러스터에서 스파크 실행하기 (0) | 2024.01.04 |
[Spark] 스파크 완벽 가이드 Ch09. 데이터 소스 (0) | 2023.11.27 |
[Spark] 스파크 완벽 가이드 04 - Ch 04. 구조적 API 개요 (0) | 2023.11.07 |