Practical Spark – Memory (10)

Practical 시리즈는 AWS 환경 위에서 데이터 인프라를 구축하는데 참고할만한 내용들을 담고 있습니다. 이 글은 그 중 Practical Spark 의 한 챕터입니다.

이번 챕터에서는 Spark 의 Memory 구조를 살펴보면서 Spark Executor 에서 발생한 OOM 을 어떤 옵션을 통해 수정할 수 있는지 알아봅니다. 또한 EMR / EKS 사용시 고려해야 할 메모리를 포함한 옵션들에 대해서도 논의 해보고 PySpark / Apache Arrow 에서 Memory 가 어떻게 다루어지는지도 이야기 해 봅니다.

Practical 시리즈는 AWS 환경 위에서 데이터 인프라를 구축하는데 참고할만한 내용들을 담고 있습니다. 아래의 내용들을 주로 다룹니다.

  • Practical Spark
  • Practical Kafka
  • Practical Kubernetes
  • Practical AWS

이 블로그에 작성된 내용은 위 시리즈 중 Gitbook 으로 작성된 Practical Spark 를 옮긴 것입니다. 원본 Gitbook 와 실습에 사용하는 코드는 아래의 링크에서 보실 수 있습니다.


Spark Memory

이번 챕터에서는 Spark 의 메모리 구조에 대해서 알아보겠습니다. Spark 는 JVM 위에서 실행됩니다. PySpark 를 쓰는 경우에는 외부에 Python 프로세스가 존재할 수 있으나 Driver 또는 Executor 를 위한 JVM 이 실행되는건 동일합니다. 지난 챕터에서 Spark 모드에 따른 구분에서 알아보았듯이 Spark 는 다양한 실행 모드를 지원하고 각 실행 모드에 따라 컴포넌트의 실행 위치가 달라지게 됩니다.

  • Local 모드라면 단일 JVM 내에 Driver 와 Executor 가 존재합니다
  • Client 모드라면 Driver JVM 과 다수의 Executor JVM 이 존재합니다
  • Cluster 모드라면, JVM 기준으로 Client 모드와 동일하되 Job 을 Submit / Wait 하는 JVM 이 존재할 수 있습니다.

아래 그림들은 각 Mode 별 Driver, Executor 의 컴포넌트의 위치를 보여줍니다.

Spark Local Mode (Spark in Action)
Spark Client Mode (Spark in Action)
Spark Cluster Mode (Spark in Action)

Spark Memory Configuration

Spark 에서는 위 사진에 나온 다양한 컴포넌트들의 메모리를 조절할 수 있는 옵션을 제공합니다.

ConfigurationDescription
spark.driver.coresNumber of cores to use for the driver process (only in cluster mode)
spark.driver.memoryAmount of memory to use for the driver process
spark.driver.memoryOverheadAmount of non-heap memory to be allocated per driver process in cluster mode

This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
spark.executor.coresThe number of cores to use on each executors
spark.executor.memoryAmount of memory to use per executor proces
spark.executor.memoryOverheadAmount of additional memory to be allocated per executor process

This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
spark.executor.pyspark.memoryThe amount of memory to be allocated to PySpark in each executor
spark.memory.fractionFraction of (heap space – 300MB) used for execution and storage.

The lower this is, the more frequently spills and cached data eviction occur
spark.memory.storageFractionAmount of storage memory immune to eviction, expressed as a fraction of the size of the region set aside by spark.memory.fraction.

The higher this is, the less working memory may be available to execution and tasks may spill to disk more often.
spark.memory.offHeap.enabledIf true, Spark will attempt to use off-heap memory for certain operations
spark.memory.offHeap.sizeThe absolute amount of memory which can be used for off-heap allocation
spark.python.worker.memoryAmount of memory to use per python worker process during aggregation
spark.kubernetes.driver.limit.cores
spark.kubernetes.executor.limits.cores
spark.kubernetes.driver.request.cores
spark.kubernetes.executor.request.cores
spark.kubernetes.memoryOverheadFactor

아래 스크린샷은 Spark UI 에서 Spark Driver / Executor 의 리소스 설정값들을 확인하는 것을 보여줍니다. (구버전의 Spark UI)

spark.driver.memory 등 일부 Driver 옵션은 Local / Client 모드의 경우 spark.driver.memory 를 세팅하는 것이 아니라 --driver-memory 커맨드라인 옵션을 통해 제공되어야 합니다.

Driver JVM 이 spark.driver.memory 를 읽는 시점에 이미 실행되었기 때문입니다 (JVM 이미 뜬 상태에서 JVM 을 위한 메모리를 세팅할 순 없습니다.)

Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the –driver-memory command line option or in your default properties file.

또한 Local 모드의 경우 Executor 가 Driver JVM 내에 위치하므로 –driver-memory 옵션을 통해 세팅할 수 있습니다.

이런 모드별 설정값들을 올바르게 세팅하는 것은 여러분이 팀 / 사내에 Jupyter 등 Spark 를 위한 인프라를 제공할 때 매우 중요합니다.


Spark Memory Details

spark.driver.cores, spark.executor.memory 와 같은 옵션은 개별 컴포넌트의 CPU 코어와 사용 가능한 메모리를 지정하기 때문에 설정의 이름만 보아도 직관적으로 알 수 있습니다. 그러나 다음의 설정값들은 이름으로 부터 이해가 바로 어려운데 Executor 를 기준으로 하나씩 살펴보겠습니다.

  • spark.memory.fraction
  • spark.memory.storageFraction
  • spark.executor.memoryOverhead

Spark < 1.6 Memory (Decoding Memory in Spark)
Spark >= 1.6 Memory (Decoding Memory in Spark)

우선 Spark Executor 의 JVM Heap 메모리를 크게 다음과 같이 나눌 수 있습니다.

  • Spark Memory (spark.memory.fraction = 0.6, default)
    • Execution Memory (spark.memory.storageFraction 를 제외한 spark.memory.fraction 의 나머지)
      • 데이터 집계 과정에서 Shuffle, Aggregation, Sort 등을 위해 사용합니다
    • Storage Memory (spark.memory.storageFraction = 0.5, default)
      • 캐싱 (DataFrame.cache, CACHE TABLE) 또는 Broadcast, Driver 로 보내는 결과들이 이 영역의 메모리를 사용합니다.
  • User Memory (전체 JVM Heap 에서 spark.memory.fraction 와 Reserved Memory 를 제외한)
    • Spark 가 사용하는 내부 메타데이터, 사용자 생성 데이터 구조 저장이나 UDF 및 OOM 을 방지하기 위한 대비 (Safeguard) 영역으로 사용합니다.
  • Reserved Memory (300 MiB)

일반적으로 ‘Executor 메모리가 부족하다’ 라고 말하면 Spark Memory 가 부족한 경우가 대부분입니다. 이 경우에는

  1. Executor 가 사용하는 전체 JVM 메모리 사이즈를 늘리거나
  2. spark.memory.fraction 값을 올릴 수 있습니다.

캐싱을 많이 사용한다면 Storage Memory 가 모자랄 수 있습니다. spark.memory.storageFraction 값을 늘릴수도 있겠지만, Spark 1.6 부터는 Unified Memory Management (SPARK-10000) 가 도입되면서 ‘Spark Memory 1.6+’ 스크린샷의 초록색영역 (Spark Memory) 통합되었기 때문에 큰 효과가 없을 수 있습니다. 만약 메모리가 부족하다고 판단이 되면 비용이 허락하는 한도 내에서 전체 메모리를 늘려보는 것도 방법입니다.

위에서 언급한것 과 같이 Spark 1.6 부터 Spark Memory 영역이 통합되면서

  • 캐싱을 (Storage) 사용하지 않을 경우에는 Execution (집계) 를 위해 Stroage Memory 영역을 사용할 수가 있게 되었고
  • 캐싱을 (Storage) 많이 사용한다면 Execution Memory 영역을 필요시 더 사용할 수 있게 되었습니다.
  • spark.memory.storageFraction 값은 이제 절대적인 Stroage Memory 양이 아니라, Evition 되지 않는 최대 메모리 양을 지정하는 옵션이 되었습니다

Spark 에서 캐싱에 대한 한 가지 오해는 “캐싱된 데이터는 메모리에 계속 존재한다” 입니다. Spark 는 In-memory 컴퓨팅을 ‘지원’ 하는 것이지 In-memory 만으로 집계를 수행하지 않습니다. 만약 데이터가 메모리에 들어갈 수 없다면 중간 집계 결과를 Disk 등에 남길 수 있습니다. 캐싱된 RDD Partition 또한 마찬가지입니다. SPARK-14289 (Multiple Eviction Strategies for Cached Partition)에서 볼 수 있듯이

  • Spark 는 신규 캐싱등을 위해 Storage Memory 가 부족할 경우 기존의 캐싱된 RDD Partition 을 LRU 등의 정책을 바탕으로 내보낼 (Eviction) 수 있습니다
  • 축출된 (Evicted) RDD Partition 은 추후에 다시 필요하다면, Transformation 등을 실행 계획의 내용을 바탕으로 다시 계산을 수행해 만들거나, 아니면 Disk 에 저장된 내용을 바로 읽어 사용할 수 있습니다. 캐싱 함수의 옵션으로 제공되는 Storage Level 에 의해서 동작 방식을 조절할 수 있습니다.

Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD’s storage level.

https://spark.apache.org/faq.html

지난 시간에 캐싱 옵션으로 Level 을 줄 수 있다고 이야기 했었습니다. cache() 함수의 파라미터로 넘길 수 있는 MEMORY_AND_DISK_SER 는 언제 필요할까요?

캐싱된 RDD Partition 이 Eviction 될 때 Disk 에 쓰이는 경우를 가정하고 이야기 해 봅시다. 만약 사용하지 않음에도 unpersist() 함수를 호출하지 않는다면, Disk 사용량과 연산 속도가 어떻게 될까요?

In Spark, execution and storage share a unified region (M). When no execution memory is used, storage can acquire all the available memory and vice versa. Execution may evict storage if necessary, but only until total storage memory usage falls under a certain threshold (R). In other words, R describes a subregion within M where cached blocks are never evicted. Storage may not evict execution due to complexities in implementation.

https://spark.apache.org/docs/latest/tuning.html#memory-management-overview

아래 그림은 Spark 의 각 옵션이 Memory 영역에 어떻게 적용되는지를 볼 수 있는 그림입니다.


Memory Overhead 및 Off-heap 옵션은 JVM 메모리 외의 영역에서 Spark 가 사용할 메모리를 지정합니다. Spark 는 버전에 따라 옵션이 많이 변화했습니다. Spark 3.0+ 를 기준으로 보면 JVM 외 영역에서

  • spark.executor.memoryOverhead (= executor.memory * 0.1, default)
    • PySpark 를 사용할 경우 Python Process 의 메모리 (spark.executor.pyspark.memory) 등 Non-JVM 메모리 영역을 지정합니다.
  • spark.memory.offHeap.size (= false, default)
    • String 을 저장하는 등 Java (JVM) 이 내부적으로 사용하는 용도 및 Spark 의 특정 기능을 위해 사용되기도 합니다
Non-JVM Spark Memory (Decoding Memory in Spark)

이제 JVM 및 Non-JVM 메모리 영역을 모아 Executor 메모리 전체를 한 눈에 살펴보겠습니다.

Spark Container Memory (Decoding Memory in Spark)

만약 Off-heap 메모리 관리 기능을 spark.memory.offHeap.enabled 옵션을 통해 활성화 한다면 Spark 에서는 Storage Memory, Execution Memory 를 On-heap 이외에도 Off-heap 까지 활용하게 됩니다.

Spark Off-heap Usage (Link)

On-heap 을 사용할 경우와 Off-heap 을 사용할 경우의 차이점은 무엇일까요? 객체가 매우 많은 경우 Heap 내에서 GC 가 발생할 때의 문제점에 대해서도 이야기 해 봅시다.


Spark 내부의 Tungsten 이라 불리는 실행 엔진은 Off-heap 메모리 관리 기능을 제공합니다

GC 를 피하기 위해 Off-heap 기능을 이용할 수 있지만, 그렇지 않더라도 Spark 는 이미 내부적으로 충분히 GC 에 최적화된 방법으로 객체를 생성하고 관리하고 있습니다.

Spark/Tungsten use Encoders/Decoders to represent JVM objects as a highly specialized Spark SQL Types objects which then can be serialized and operated on in a highly performant way. Internal format representation is highly efficient and friendly to GC memory utilization.

Thus, even operating in the default on-heap mode Tungsten alleviates the great overhead of JVM objects memory layout and the GC operating time. Tungsten in that mode does allocate objects on heap for its internal purposes and the allocation memory chunks might be huge but it happens much less frequently and survives GC generation transitions smoothly. This almost eliminates the need to consider moving this internal structure off-heap.

In our experiments with this mode on and off we did not see a considerable run time improvements. But what you get with off-heap mode on is that one need to carefully design for the memory allocation outside of you JVM process. This might impose some difficulties within container managers like YARN, Mesos etc when you will need to allow and plan for additional memory chunks besides your JVM process configuration.

Also in off-heap mode Tungsten uses sun.misc.Unsafe which might not be a desired or even possible in your deployment scenarios (with restrictive java security manager configuration for example).

Spark Off-heap Memory Config And Tungsten

PySpark Memory and Arrow

PySpark Memory Configuration (Decoding Memory in Spark)

PySpark 를 사용한다면 다음 두 가지의 메모리 옵션을 설정할 수 있습니다.

  • spark.python.worker.memory (512m, default) 는 JVM 내에서 Python Worker 의 집계를 위해 사용되는 영역입니다
  • spark.executor.pyspark.memory (설정되지 않음, default) 는 실제 Python Process 의 메모리입니다

spark.executor.pyspark.memory 는 기본값이 설정되어 있지 않으므로 PySpark 사용시 DataFrame 대신 일반 Python 객체와 함수를 이용해 가공하는 등 메모리를 많이 사용할 경우 메모리가 터질 수 있습니다.


많은 경우에 Scala 를 이용해 Spark Batch / Stream Application 을 작성합니다. 다만 사용자의 편의나 특정 Python 라이브러리 사용등을 이유로 Python 을 사용할 수 있는데, 이 경우에는 추가로 실행되는 Python Process 로 인해 Spark 동작이 달라질 수 있습니다.

PySpark Data Flow (Apache Arrow with PySpark)

PySpark 를 사용하면 Scala Spark 를 사용할때와는 다르게 Python 프로세스가 존재합니다.

  • Python Driver Process 는 Py4j 를 이용해서 별도 JVM 프로세스에 Spark Context 를 생성합니다
  • PySpark 에서도 spark.sparkContext 객체가 존재하지만, 이것은 명령을 내리기 위한 객체이며 실제로는 명령을 받은 JVM 내의 SparkContext 가 필요한 작업을 수행합니다

같은 노드 내에 있더라도 Python Process 와 JVM Process 는 서로 다른 프로세스이므로 데이터 (메모리) 를 공유할 수 없습니다. IPC (Inter-process Communication) 간 통신을 위해 Socket 을 이용합니다. (Executor 는 Pipe 를 사용합니다)Scala Spark 를 이용할 경우 필요 없을 Socket 통신을 이용해 데이터를 주고 받으므로 PySpark 는 느린 경우가 많습니다.

  • 예를 들어 PySpark 로 큰 사이즈의 S3 Parquet 파일을 읽어 Pandas 로 가공하고 싶다면 다음과 같은 프로세스를 거칩니다
  • Executor 가 데이터를 읽어 Driver JVM 로 전송합니다. Driver JVM 은 다시 Driver Python 으로 보내기 위해 데이터를 Temp File 등에 기록하고 이것을 Driver Python 이 읽습니다
  • 이 과정에서 네트워크를 통해 데이터를 넘기고, Disk 에 파일을 쓰며 Serialization / Deserialization 이 발생합니다

Apache Arrow (Link)

Arrow 와 같은 공통화된 메모리 직렬 포맷을 이용한다면 Serialization / Deserialization 을 효율적으로 수행할 수 있습니다. PySpark 의 toPandas 는 Arrow 가 활성화 되어 있을 경우 이를 이용하도록 구현되었습니다.



만약 Python UDF 와 같은 Python 코드를 Executor 가 실행해야 한다면 어떨까요?

  • Python 코드를 실행하기 위해 Executor 내의 Partition 들이 Executor 내 Python Process 로 Serialization / Deserialization 되어야 합니다.

아래의 두 그림을 통해 JVM (Scala / Java) UDF 일 경우와 Python UDF 일 경우를 비교할 수 있습니다.


AWS EMR

AWS EMR 은 AWS 가 제공하는 관리형 빅데이터 클러스터입니다. 그림에서 볼 수 있듯이 EMR 을 이용하면 기존의 Hadoop 클러스터를 손쉽게 대체할 수 있습니다. AWS 에서는 EMR 과 연동된 수 많은 기능을 제공하기 때문에 단순히 대체하는 것을 넘어, 추가적인 기능을 사용할 수 있다는 장점이 있습니다.

EMR 클러스터 생성시 Zeppelin, JupyterHub 등의 시스템은 물론 Flink, Presto 등 다양한 빅데이터 인프라를 설치할 수 있습니다. 다만 용도에 맞추어 개별 클러스터를 생성하는 편이 낫습니다.

Yarn 을 이용해 Spark Application 를 실행할 경우에 Driver, Executor 의 Core 및 Memory 옵션은 이전 섹션에서 설명한것 과 동일합니다. 다만 EMR 과 Yarn 의 일부 옵션 관련해서 조금 살펴볼 필요가 있습니다.


Spark 를 Yarn Cluster 모드로 실행하게 될 경우 EMR 5 버전에서는 아래의 옵션으로 인해 EMR Core 노드에서 실행됩니다. (EMR 6 에서는 아래의 설정값들이 제거되었습니다. 따라서 필요할 경우 EMR Configuration 에 직접 추가해야 합니다)

yarn.node-labels.enabled: true
yarn.node-labels.am.default-node-label-expression: 'CORE'
view raw emr-yarn.conf hosted with ❤ by GitHub

EMR Node Types (EMR Best Practices)

EMR 에서 Core 노드는 HDFS 데몬을 실행하고 데이터를 유지합니다. 다만 AWS EMR 사용자 대다수가 HDFS 를 이용하진 않으므로 큰 의미는 없습니다.

  • Spark Log 등이 작업 종료 후 Aggregation 되어 로 HDFS 에 저장됩니다
  • 각 노드 타입에 대해서는 Understanding EMR Node Types 문서에서 자세한 내용을 살펴볼 수 있습니다.

Yarn Application Master (AM), 즉 Spark 의 경우엔 Driver 는 위에서 언급한 Label 이 존재할때 Core 노드에서 동작합니다. 다만 EMR 에서는 Yarn 스케쥴러의 설정이 DefaultResourceCalculator 가 Default 이므로 DominantResourceCalculator 사용을 위해서는 직접 세팅해야 합니다 (capacity-scheduler.xml)

  • org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator 는 메모리 기반으로 스케쥴링을 하기 때문에 1개를 초과하는 vCPU 가 할당되지 않습니다.
  • org.apache.hadoop.yarn.util.resource.DominantResourceCalculator 는 CPU + 메모리를 고려해 스케쥴링 하기 때문에 사용자가 요청한 수의 vCPU 를 할당합니다

또한 자동으로 세팅되는 vCPU Max 값 이상의 vCPU 를 할당하고 싶을 경우 yarn.scheduler.maximum-allocation-vcores 옵션을 수정해야 할 필요가 있습니다.

  • EMR 에서는 인스턴스 사이즈를 고려해 yarn.scheduler.maximum-allocation-vcores 를 할당하나, Core (Spark Driver) / Task (Spark Executor) 노드의 EC2 타입이 매우 다를경우 작게 설정될 수 있기 때문입니다.
  • 일반적으로는 다른 EC2 타입을 사용하더라도 (m5, c5, r5) 등 CPU 와 리소스의 배수 비율을 어느정도 맞추는 편이 낫습니다.

yarn.nodemanager.resource.memory-mb, yarn.scheduler.minimum-allocation-mb, yarn.scheduler.maximum-allocation-mb 는 메모리 관련 설정입니다.

Task 인스턴스 타입별 Yarn allocation-mb 사이즈 문서에서 각 인스턴스별로 사이즈가 어떻게 세팅이 되는지 살펴봅시다. 만약 인스턴스가 제공하는 메모리 양과 yarn.nodemanager.resource.memory-mb 이 다르다면 왜 그런지 이유를 생각해 봅시다. 시스템 기본 프로세스를 위해 얼마의 메모리가 필요할까요?


아래 코드는 EMR 생성시 위에서 언급한 옵션들을 수정할 수 있도록 한 설정 코드입니다.

[
{
"Classification":"capacity-scheduler",
"Properties":{
"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
},
"Configurations":[
]
},
{
"Classification":"yarn-site",
"Properties":{
"yarn.scheduler.minimum-allocation-vcores":"1",
"yarn.scheduler.maximum-allocation-vcores":"8"
},
"configurations":[
]
},
{
"Classification":"hdfs-site",
"Properties":{
"dfs.replication":"2"
},
"Configurations":[
]
}
]
view raw emr-yarn.conf hosted with ❤ by GitHub

EMR 에는 Core / Task 란 개념이 있지만, Yarn 은 Core / Task 를 분리해서 생각하지 않고 리소스를 관리합니다. 따라서 yarn.scheduler.capacity.maximum-am-resource-percent 옵션이 0.5 로 잡혀 있어, Core 노드에 리소스가 있음에도 Driver 에 리소스 할당이 되지 않을 수 있습니다.

  • yarn.scheduler.capacity.maximum-am-resource-percent 옵션은 전체 리소스 중 Application Master (Spark Driver) 가 사용할 수 있는 리소스의 비율을 결정합니다.
  • EMR 의 yarn.scheduler.capacity.maximum-am-resource-percent 기본값은 0.5 입니다.

EMR 사용시 maximizeResourceAllocation 옵션을 이용하면 사용자가 Driver, Executor 의 리소스를 직접 세팅하는 것이 아니라 AWS 가 Core / Task 인스턴스의 사이즈를 고려해 할당합니다.

[
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
}
]
view raw emr-yarn.conf hosted with ❤ by GitHub

Spark on Kubernetes

Spark 3.1+ 부터 Kubernetes 를 Cluster Manager 로 사용할 수 있습니다. (GA 버전 기준) Spark 를 Kubernetes 에서 사용할 경우 EMR 이 제공하는 몇몇 특화 기능들은 사용할 수 없지만 (EMRFS S3-Optimized Commiter, EMR Decomission, EMR Autoscaling 등), 그럼에도 몇 가지 이점들이 있습니다.


EMR on EMR vs EMR on EKS (AWS Blog)

위 그림에서 좌측은 EMR 을 사용하는 경우입니다. EMR 은 클러스터 하나당 단일 Spark 버전만 지원하기 때문에 여러 Spark 버전을 사용해야 한다면 Spark 관리가 어렵습니다. 또한 Kubernetes 가 제공하는 Cluster Autoscaler 등 다양한 기능들을 이용할 수 있습니다.

Spark 를 Kubernetes 에서 실행할 경우 아래의 옵션들로 Core 및 Memory 옵션을 지정할 수 있습니다. Memory 의 경우에는 기존과 동일하게 spark.driver.memory, spark.executor.memory 옵션으로 설정이 가능합니다.

spark.kubernetes.driver.limit.cores
spark.kubernetes.executor.limits.cores
spark.kubernetes.driver.request.cores
spark.kubernetes.executor.request.cores
spark.kubernetes.memoryOverheadFactor

이 중에서 memoryOverheadFactor 라는 부분이 기존의 Yarn 에서 Spark 를 할당할때와는 조금 다른 옵션입니다. (Yarn 도 overhead 만큼의 메모리를 추가적으로 할당하나 factor 가 아니라 메모리 사이즈를 고정값으로 받았습니다)

  • spark.kubernetes.memoryOverheadFactor (= 0.1, default) 옵션으로 Non-NVM 영역의 메모리를 Kubernetes Pod 에 추가할 수 있습니다.
  • 예를 들어 spark.executor.memory = 120g 및 spark.kubernetes.memoryOverheadFactor = 0.1 인 경우에 Pod 은 10% 만큼의 Overhead 추가해 132g (GiB) 가 할당됩니다

따라서 Overhead Factor 를 고려해 Kubernetes Pod 의 메모리가 Node 전체의 메모리를 잘 나누어 쓸 수 있도록 설계해야 시스템 전반의 리소스를 효율적으로 사용할 수 있습니다.

운이 나쁠 경우, 240 GiB 머신에서 Executor 에 120 GiB 를 할당할 경우 132 Gib 만큼의 Pod 메모리가 할당되어 해당 240 GiB 머신을 두개로 나누어 쓸 수 없는 경우가 발생합니다.


Practice

실습 과제입니다.

다음과 같은 오류 메세지를 발견했을 때 어떤 옵션을 조절하면 좋을지 고민해 봅시다. 조건은 다음과 같습니다.

spark.executor.instances = 10
spark.executor.cores = 10
spark.executor.memory = 30g (GiB)

이 때, 아래와 같은 오류 메세지가 Parquet Write 를 하는 과정에서 발생했습니다. 담당자가 Jupyter 노트북에서 테스트해봐도 Join, Aggregation 등은 문제가 없었습니다.

ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 33.2 GB of 33 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

메모리 관련된 설정은 어떻게 잡혀있을까 싶어 Spark UI 에서 Environment 탭을 확인해 보았습니다.

spark.memory.memoryOverhead = 0.1
spark.memory.fraction = 0.8
spark.memory.storageFraction = 0.5
spark.memory.offHeap.enabled = false

Spark Version 은 2.4.8 을 사용하고 있습니다.

Spark Memory Management (Link)

다음은 메모리 옵션을 수정하기 전 생각해볼 만한 몇 가지 가설입니다.

  • Join, Aggregation 등에서 Memory 가 터졌다면 Heap OOM 메세지가 발생했을 것이다.
  • 일반적인 경우엔 Memory 가 넘친다면 Disk Spill 을 이용해 속도는 느리겠지만 집계할 수 있다.
  • Write 하는 과정에서 터졌다면, 특정 Execurtor 내 파티션에 데이터가 몰린걸까? (Skew) 그렇다면 어떻게 해결할 수 있을까? 데이터 내에는 사용자 ID 및 사용자가 발생시킨 이벤트 타입과 시간 값이 있다.

Off-heap 에서 오류가 났으므로 전체 메모리 사이즈는 동일하게 유지하고 On-heap 을 줄여 Off-heap 을 늘려보기로 했습니다.

  • spark.executor.memory = 25g (GiB)
  • spark.memory.memoryOverhead = 8g (GiB)
  • 전체 = 33 GiB 로 기존과 동일

수정후에는 이제 Off-heap 이 아니라 Heap OOM 이 발생합니다.

RECEIVED SIGNAL TERM
21/11/26 23:19:51 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[read-ahead,5,main]
java.lang.OutOfMemoryError: Java heap space
at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243)
at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
at org.apache.spark.io.ReadAheadInputStream$1.run(ReadAheadInputStream.java:168)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

왜 그런지 고민해 봅시다.

  • spark.executor.memory 를 줄이면, 메모리 영역중 Execution 메모리 (Group by, Window 등 Aggregation 을 위한) 가 줄어듭니다. 어떤 영향이 있을까요?
  • 만약 메모리는 넉넉한데 Group By, Window Function 등에서 Skew 가 발생한다면 어떤 옵션을 수정하는 것이 좋을까요?
    • 1) spark.sql.shuffle.partitions 숫자를 늘려 Skew 확률을 낮춘다.
    • 2) Group By, Window Function 전 해당 집계에서 사용하는 Key 를 기준으로 repartition(X, “key”) 를 진행한다.

Summary

아래는 이번 시간에 다룬 핵심 키워드들 입니다.

  • Arrow
  • UDF
  • Yarn Scheduler: DefaultResourceCalculator, DominantResourceCalculator, FairScheduler
  • Garbage Collection (GC)
  • Storage Memory / Execution Memory

2 thoughts on “Practical Spark – Memory (10)

  1. 안녕하세요.
    Storage Memory와 Execution Memory 작성해주신 부분 설명이 반대로 되어있는 것 같아 댓글 드립니다.
    스파크를 배우고 사용하는 입장에서 그동안 봤던 자료들 중 최고의 시리즈라고 생각하고 정말 도움 많이 받고 있습니다.
    고맙습니다!

    Liked by 1 person

Leave a Reply to 1ambda Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.