Slot分配与共享

Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个 subtask

为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)

slot 主要隔离内存,cpu 是slot之间共享的。也就是说4核的机器 ,内存足够,可以把slot设置为8。最多能同时运行8个任务。建议一个核心数分配一个slot

图中 source、map 合成的task的并行度为6,keyby 、window、apply合成的task的并行度为6,sink的并行度为1,总共有3个task,13个subtask,但是不是需要13个slot才能满足这个并行度的要求。

我们可以称像source、map、sink,这种计算不复杂的算子称为非资源密集型的算子 aggregate、reduce、sum、window,这种计算复杂的算子称为为资源密集型的算子。

如果把这两种算子的优先级看作相同,平等的分配到每个不同的slo中,当数据流source 来的数据速率相同时,会造成有些slot一直在跑复杂的算子,一直在运行中,当时一直跑简单算子的slot就会很空闲。

flink这里是非资源密集型的算子和资源密集型的算子可以分配到同一个slot中 ,这样所有的slot之间任务就会平等,不会存在一直空闲一直高负载。

一个task的并行度是6,就会分为6个并行的subtask来跑,这6个subtask不能分配到同一个slot中,必须一个slot只有一个。 也就是说当你的集群的slot只有6 ,你不能设置算子的并行度超过6,这样就能保证每个subtask都能在各自的slot中并行执行,同时一个job中的不同的task的subtask可分配到同一个slot中形成pipeline链式执行,这里类似于spark中多个窄依赖的算子组成的stage会一起执行。

共享Slot

默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。

允许slot共享有以下两点好处:

  1. Flink集群需要的任务槽与作业中使用的最高并行度正好相同(前提,保持默认SlotSharingGroup)。也就是说我们不需要再去计算一个程序总共会起多少个task了。
  2. 更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,比如将task的2个并行度增加到6个,就能充分利用slot资源,task的6个subtask会分到6个slot中去,同时多个不同的task的subtask会复用一个slot。

共享Slot实例

将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如下图所示的slot分布图。

首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作 keyAggregation/sink 被平均地分配到各个 TaskManager。

SlotSharingGroup(soft)

SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。

保证同一个group的并行度相同的 sub-tasks 共享同一个slots。算子的默认group为default(即默认一个job下的subtask都可以共享一个slot)

为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:someStream.filter(…).slotSharingGroup(“group1”),就强制指定了filter的slot共享组为group1。怎么确定一个未做SlotSharingGroup设置算子的SlotSharingGroup呢(根据上游算子的group 和自身是否设置group共同确定)。适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。

CoLocationGroup(强制)

CoLocationGroup可以保证所有的并行度相同的sub-tasks运行在同一个slot,主要用于迭代流(训练机器学习模型)。

Slot & parallelism的关系

如上图所示,有两个TaskManager,每个TaskManager有3个槽位。假设source操作并行度为3,map操作的并行度为4,sink的并行度为4,所需的task slots数与job中task的最高并行度一致,最高并行度为4,那么使用的Slot也为4。

如果不设置SlotSharingGroup,那么需要的Slot数为应用的最大并行度数。如果设置了SlotSharingGroup,那么需要的Slot数为所有SlotSharingGroup中的最大并行度之和。比如已经强制指定了map的slot共享组为test,那么map和map下游的组为test,map的上游source的组为默认的default,此时default组中最大并行度为10,test组中最大并行度为20,那么需要的Slot=10+20=30。

最后

合理的设置并行度

  • 减少本地通信的开销
  • 减少序列化和反序列化

满足任务链需要以下条件

  • 算子具有相同并行度(具有相同的分区数)
  • 算子属于one-to-one
Author: Tunan
Link: http://yerias.github.io/2021/06/21/flink/15/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.