背景
一般在使用Spark的的时候通过 spark-submit.sh 配置 num-executors 显示的指定executor的个数。然后AppMaster会向资源调度框架如yarn申请资源,每个executor在yarn中以Container的形式存在。无论executor是否执行任务,都会占用相应的资源,直到应用结束后释放。很显然要是有一种方式,可以动态的申请executor,不用的时候释放掉,那么集群的资源利用率会更高。
其实Flink就是这样做的,在Flink中资源都是以slot来动态申请,但是也会出现并行的任务太多,突然申请超多的节点,直接导致集群资源被耗光。在未来的1.11中也会像spark一样,增加最大的资源限制,避免无限膨胀。
原理
简单分析下Spark静态资源分配中Executor的生命周期,下面就以spark-shell为例,看一下静态资源分配与动态资源分配的区别。
# 以yarn模式执行,并指定executor个数为1 |
正常在使用spark时,会根据启动命令创建executor,那么无论任务执行还是空闲这个executor都会与应用生命周期保持一致,只有应用退出,executor才会释放。加入spark-shell此时没有任务执行,那么executor也会占用资源长期不释放,浪费集群资源。
任务启动后,会按照事先的配置,启动最小的集群资源,比如申请一个executor。当一段时间没有执行任务后,executor会因长期空闲而被释放掉,资源也会被yarn一类的资源调度框架回收。当spark-shell再次接收到命令时,会重新向资源调度框架申请资源运行任务。也就是说当应用没有任务提交时,不会占用集群的过多资源,提前释放以供其他应用使用,这样整个集群的资源利用率就更高了。
实现
带着问题来看一下动态资源分配是如何实现的?
移除哪些executor?
因为executor可能负责task任务的计算,也可能负责计算后数据的存储,因此判断executor过期并移除,可以通过计算和存储两方面来考虑。spark提供了两个参数:executorIdleTimeout 和 cachedExecutorIdleTimeout 分别控制计算空闲的时间和存储空闲的时间,只要这两个时间都超时,就会移除该executor
如何控制executor的数量避免膨胀?
像Flink就因为没有executor的申请限制,经常会导致大任务直接消耗光集群资源。在spark里面可以通过 minExecutors 配置最小值,maxExecutors 配置最大值,initialExecutors 配置初始值(默认值最小值)。
何时申请executor?申请多少executor?
当有任务pending的时候就会认为集群资源不足,开始申请executor。具体的判断是通过 schedulerBacklogTimeout 来控制。申请的数量则是通过 正在运行和pending的任务数量 * executorAllocationRatio / 并行度 得到。因此在并行度不变的前提下,可以通过调控 executorAllocationRatio 大小来调整申请的数量。
相关参数
spark.dynamicAllocation.enabled |