Flink 中并行任务的分配
Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个 subtask
为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)
slot 主要隔离内存,cpu 是slot之间共享的。也就是说4核的机器 ,内存足够,可以把slot设置为8。最多能同时运行8个任务。建议一个核心数分配一个slot
图中 source、map 合成的task的并行度为6,keyby 、window、apply合成的task的并行度为6,sink的并行度为1,总共有3个task,13个subtask,但是不是需要13个slot才能满足这个并行度的要求。
我们可以称像source、map、sink,这种计算不复杂的算子称为非资源密集型的算子 aggregate、reduce、sum、window,这种计算复杂的算子称为为资源密集型的算子。
如果把这两种算子的优先级看作相同,平等的分配到每个不同的slo中,当数据流source 来的数据速率相同时,会造成有些slot一直在跑复杂的算子,一直在运行中,当时一直跑简单算子的slot就会很空闲。
flink这里是非资源密集型的算子和资源密集型的算子可以分配到同一个slot中 ,这样所有的slot之间任务就会平等,不会存在一直空闲一直高负载。
一个task的并行度是6,就会分为6个并行的subtask来跑,这6个subtask不能分配到同一个slot中,必须一个slot只有一个。 也就是说当你的集群的slot只有6 ,你不能设置算子的并行度超过6,这样就能保证每个subtask都能在各自的slot中并行执行,同时一个job中的不同的task的subtask可分配到同一个slot中形成pipeline链式执行,这里类似于spark中多个窄依赖的算子组成的stage会一起执行。
共享Slot
默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。
允许slot共享有以下两点好处:
- Flink集群需要的任务槽与作业中使用的最高并行度正好相同(前提,保持默认SlotSharingGroup)。也就是说我们不需要再去计算一个程序总共会起多少个task了。
- 更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,比如将task的2个并行度增加到6个,就能充分利用slot资源,task的6个subtask会分到6个slot中去,同时多个不同的task的subtask会复用一个slot。
共享Slot实例
将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如下图所示的slot分布图。
首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作 keyAggregation/sink 被平均地分配到各个 TaskManager。
SlotSharingGroup(soft)
SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。
保证同一个group的并行度相同的 sub-tasks 共享同一个slots。算子的默认group为default(即默认一个job下的subtask都可以共享一个slot)
为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:someStream.filter(…).slotSharingGroup(“group1”),就强制指定了filter的slot共享组为group1。怎么确定一个未做SlotSharingGroup设置算子的SlotSharingGroup呢(根据上游算子的group 和自身是否设置group共同确定)。适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。
CoLocationGroup(强制)
CoLocationGroup可以保证所有的并行度相同的sub-tasks运行在同一个slot,主要用于迭代流(训练机器学习模型)。
Slot & parallelism的关系
如上图所示,有两个TaskManager,每个TaskManager有3个槽位。假设source操作并行度为3,map操作的并行度为4,sink的并行度为4,所需的task slots数与job中task的最高并行度一致,最高并行度为4,那么使用的Slot也为4。
如果不设置SlotSharingGroup,那么需要的Slot数为应用的最大并行度数。如果设置了SlotSharingGroup,那么需要的Slot数为所有SlotSharingGroup中的最大并行度之和。比如已经强制指定了map的slot共享组为test,那么map和map下游的组为test,map的上游source的组为默认的default,此时default组中最大并行度为10,test组中最大并行度为20,那么需要的Slot=10+20=30。
最后
合理的设置并行度
- 减少本地通信的开销
- 减少序列化和反序列化
满足任务链需要以下条件
- 算子具有相同并行度(具有相同的分区数)
- 算子属于one-to-one