Spark资源动态分配

背景

一般在使用Spark的的时候通过 spark-submit.sh 配置 num-executors 显示的指定executor的个数。然后AppMaster会向资源调度框架如yarn申请资源,每个executor在yarn中以Container的形式存在。无论executor是否执行任务,都会占用相应的资源,直到应用结束后释放。很显然要是有一种方式,可以动态的申请executor,不用的时候释放掉,那么集群的资源利用率会更高。

其实Flink就是这样做的,在Flink中资源都是以slot来动态申请,但是也会出现并行的任务太多,突然申请超多的节点,直接导致集群资源被耗光。在未来的1.11中也会像spark一样,增加最大的资源限制,避免无限膨胀。

原理

简单分析下Spark静态资源分配中Executor的生命周期,下面就以spark-shell为例,看一下静态资源分配与动态资源分配的区别。

# 以yarn模式执行,并指定executor个数为1
$ spark-shell --master=yarn --num-executors=1

# 提交Job1 wordcount
scala> sc.textFile("file:///etc/hosts").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();

# 提交Job2 wordcount
scala> sc.textFile("file:///etc/profile").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();

# Ctrl+C Kill JVM

正常在使用spark时,会根据启动命令创建executor,那么无论任务执行还是空闲这个executor都会与应用生命周期保持一致,只有应用退出,executor才会释放。加入spark-shell此时没有任务执行,那么executor也会占用资源长期不释放,浪费集群资源。

任务启动后,会按照事先的配置,启动最小的集群资源,比如申请一个executor。当一段时间没有执行任务后,executor会因长期空闲而被释放掉,资源也会被yarn一类的资源调度框架回收。当spark-shell再次接收到命令时,会重新向资源调度框架申请资源运行任务。也就是说当应用没有任务提交时,不会占用集群的过多资源,提前释放以供其他应用使用,这样整个集群的资源利用率就更高了。

实现

带着问题来看一下动态资源分配是如何实现的?

移除哪些executor?

因为executor可能负责task任务的计算,也可能负责计算后数据的存储,因此判断executor过期并移除,可以通过计算和存储两方面来考虑。spark提供了两个参数:executorIdleTimeout 和 cachedExecutorIdleTimeout 分别控制计算空闲的时间和存储空闲的时间,只要这两个时间都超时,就会移除该executor

如何控制executor的数量避免膨胀?

像Flink就因为没有executor的申请限制,经常会导致大任务直接消耗光集群资源。在spark里面可以通过 minExecutors 配置最小值,maxExecutors 配置最大值,initialExecutors 配置初始值(默认值最小值)。

何时申请executor?申请多少executor?

当有任务pending的时候就会认为集群资源不足,开始申请executor。具体的判断是通过 schedulerBacklogTimeout 来控制。申请的数量则是通过 正在运行和pending的任务数量 * executorAllocationRatio / 并行度 得到。因此在并行度不变的前提下,可以通过调控 executorAllocationRatio 大小来调整申请的数量。

相关参数

spark.dynamicAllocation.enabled
默认值,false
是否使用动态资源分配,将会动态扩展executor的数量。

spark.dynamicAllocation.executorIdleTimeout
默认值,60s
executor空闲多长时间会被回收。

spark.dynamicAllocation.cachedExecutorIdleTimeout
默认值,infinity
exectuor缓存数据的时间,超过这个时间还是空闲状态,将会移除executor

spark.dynamicAllocation.initialExecutors
默认值,spark.dynamicAllocation.minExecutors
如果配置了 --num-executors 或 spark.execturo.instancese,则优先使用这两个参数的配置。

spark.dynamicAllocation.maxExecutors
默认值,infinity
exectuors的上限

spark.dynamicAllocation.minExecutors
默认值,0
executors的下限

spark.dynamicAllocation.executorAllocationRatio
默认值,1
一般spark会使用动态分配申请足够多的资源来保证任务进程的并行度。这样会最小化任务的延迟。对于一些小任务,这个配置会导致浪费很多资源,并且其他的executor可能并没有做任何工作。1提供最大的并行度,0.5则会将exectuor的数量减半。

spark.dynamicAllocation.schedulerBacklogTimeout
默认值,1s
待执行的任务积压超过这个时间,将会请求新的执行者。

spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
默认值,schedulerBacklogTimeout
资源不足时,多长时间开始申请executor。
Author: Tunan
Link: http://yerias.github.io/2021/04/11/spark/42/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.