Spark 가 지원하는 데이터 소스를 읽고 쓰는 방법에 대한 단원이다.
데이터 소스 읽기
spark.read.format("csv")\
.option("mode", "FAILFAST")\ # 읽기 모드
.option("inferSchema", "true")\
.schema(someSchema)\
.load()
- 데이터를 읽을 때는 DataFrameReader를 사용하며, 이는 SparkSession의 read 속성으로 접근한다.
- 포맷, 스키마, 읽기 모드, 옵션 과 같은 값들을 지정해주어야 한다.
- 읽기 모드는 스파크가 형식에 맞지 않는 데이터를 만났을 때의 동작방식을 지정하는 옵션이다.
읽기 모드
- permissive : 오류 레코드의 모든 필드를 null로 설정하고 모든 오류 레코드를 _corrupt_record라는 문자열 컬럼에 기록 (기본값)
- dropMalformed: 형식에 맞지 않는 레코드가 포함된 로우를 제거
- failFast : 형식에 맞지 않는 record가 나오면 즉시 종료
데이터 소스 쓰기
데이터 읽기와 유사하며, DataFrameWriter 를 사용한다.
df.write.format("csv").mode("overwrite").option("sep","\t")\
.save("/tmp/my-file.tsv")
- append: 해당 경로에 이미 존재하는 파일 목록에 결과 파일 추가하기
- overwrite: 이미 존재하는 모든 데이터를 완전히 덮어쓰기
- errorIfExists : 해당 경로에 데이터나 파일이 존재하는 경우 오류를 발생시키면서 쓰기 작업 실패 (기본값)
- ignore : 해당 경로에 데이터나 파일이 존재하는 경우 아무런 처리를 하지 않음
CSV 파일
CSV (comma-separated values) 로 콤마 (,) 로 구분된 값을 의미한다. 각 줄이 단일 레코드가 되며 레코드의 각 필드를 콤마로 구분하는 일반적인 텍스트 파일 포맷이다. CSV 용 DataFrameReader를 생성해야 한다.
spark.read.format("csv")
파일 읽기
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/data/2010-summary.csv")
JSON 파일
JSON 은 JavaScript Object Notation 으로, 자바스크립트에서 온 객체 표기법이다. 스파크에서는 줄로 구분된 JSON 을 기본적으로 사용한다.
파일 읽기
spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/data/flight-data/2010-summary.json").show(5)
파일 쓰기
데이터 소스에 관계 없이 JSON 파일에 저장할 수 있다.
csvFile.write.format("json").mode("overwrite").save("/tmp/my-json.json")
Parquet 파일
- parquet 파일은 컬럼 기반의 데이터 저장 방식으로, 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있으며 컬럼 기반의 압축 기능을 제공한다.
- 아파치 스파크와 잘 호환되므로 스파크의 기본 파일 포맷이기도 하다.
- 읽기 연산 시에 JSON 이나 CSV보다 훨씬 효율적으로 동작하므로 장기 저장용 데이터는 파케이 포맷으로 저장하는 것이 좋다.
- parquet 파일은 다른 데이터 소스에 비해 옵션이 거의 없다 !! 그 이유는 parquet 파일은 데이터를 저장할 떄 자체 스키마를 사용하여 데이터를 저장하기 때문이다. 따라서 정확한 스키마가 필요한 경우에 스키마를 설정할 수 있긴 하지만, 파케이 파일은 스키마가 파일 자체에 내장 되어 있다.
파일 읽기
spark.read.format("parquet")\
.load("/data/2010-summary.parquet").show(5)
파일 쓰기
csvFile.write.format("parquet")\
.save("/tmp/my-parquet.parquet")
ORC 파일
하둡 워크로드를 위해 설계된 self-describing 이며 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷이다. 파케이와 큰 차이점이라고 한다면, 파케이는 스파크에 최적화된 반면 ORC는 하이브에 최적화 되어 있다.
파일 읽기
spark.read.format("orc")\
.load("/data/2010-summary.orc").show(5)
파일 쓰기
csvFile.write.format("orc")\
.mode("overwrite").save("/tmp/my-orc.orc")
SQL 데이터 베이스
SQL 데이터 베이스에서 데이터를 읽는 방법은 다른 데이터 소스들과 동일하게 포맷과 옵션을 지정한 후 읽기 작업을 수행하면 된다.
쿼리 푸시다운
스파크는 DataFrame을 만들기 전에 데이터 베이스 자체에서 데이터를 필터링 할 수 있도록 만들 수 있다.
데이터 베이스 병렬로 읽기
numPartitions 옵션을 사용해 읽기 및 쓰기용 동시 작업 수를 제한할 수 있는 최대 파티션 수를 설정할 수 있다. 이 설정으로 과도한 쓰기나 읽기를 막을 수 있다.
슬라이딩 윈도우 기반의 파티셔닝
조건절을 기반으로 분할 할 수 있는 방법이다.
SQL 데이터 베이스 쓰기
텍스트 파일
일반 텍스트 파일을 읽는 작업을 수행할 수 있다. 파일의 각 줄은 DataFrame의 레코드가 된다.
텍스트 파일 읽기
spark.read.textFile("/data/2010-summary.csv")\
.selectExpr("split(value, ',') as rows").show()
텍스트 파일 쓰기
텍스트 파일을 쓸 때는 문자열 칼럼이 하나만 존재해야 한다.
csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text.txt")
고급 I/O 개념
병렬로 데이터 쓰기
파일이나 데이터 수는 데이터를 쓰는 시점에 DataFrame 이 가진 파티션 수에 따라 달라질 수 있다. 기본적으로 데이터 파티션 당 하나의 파일이 작성된다. 아래의 예시는 폴더 안에 5개의 파일을 생성한다.
csvFile.repartition(5).write.format("csv").save("/tmp/multiple.csv")
데이터 형식에 대한 숨겨저있는 의미나, 특성까지 알아볼 수 있어서 유익한 단원이었다.
'Data Engineering > Apache Spark' 카테고리의 다른 글
[Spark] Spark Streaming 과 Structure Streaming 비교하기 (0) | 2024.03.03 |
---|---|
[Spark] Ch.15 클러스터에서 스파크 실행하기 (0) | 2024.01.04 |
[Spark] 스파크 완벽 가이드 04 - Ch 04. 구조적 API 개요 (0) | 2023.11.07 |
[Spark] 스파크 완벽 가이드 (3) - Ch.03 스파크 기능 둘러보기 (Dataset, Structured Streaming) (0) | 2023.11.02 |
[Spark] 스파크 완벽 가이드 (2) - Ch 02. 스파크 간단히 살펴보기 (1) | 2023.10.29 |