目录
- State的工作方式
- 重启策略
State的工作方式
Flink 中有两种基本的状态:Keyed State 和 Operator State。
Keyed State
Keyed State 通常和 key 相关,仅可使用在 KeyedStream 的方法和算子中。
你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个 key 仅出现在一个分区内。 逻辑上每个 keyed-state 和唯一元组 <parallel-operator-instance, key> 绑定,由于每个 key 仅”属于” 算子的一个并发,因此简化为 <operator, key>。
Keyed State 会按照 Key Group 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; Key Group 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
Operator State
对于 Operator State (或者 non-keyed state) 来说,每个 operator state 和一个并发实例进行绑定。 Kafka Connector 是 Flink 中使用 operator state 的一个很好的示例。 每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
Raw State 与 Managed State
Keyed State 和 Operator State 分别有两种存在形式:managed and raw.
Managed State 由 Flink 运行时控制的数据结构表示,比如内部的 hash table 或者 RocksDB。 比如 “ValueState”, “ListState” 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。
Raw State 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。
注意: 所有 datastream 的 function 都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。 由于 Flink 可以在修改并发时更好的分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。
使用 Managed Keyed State
managed keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream 上使用,可以通过 stream.keyBy(...) 得到 KeyedStream.
ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过update(T)进行更新,通过T value()进行检索。MapState: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用put(UK,UV)或者putAll(Map)添加映射。 使用get(UK)检索特定 key。 使用entries(),keys()和values()分别检索映射、键和值的可迭代视图。你还可以通过isEmpty()来判断是否包含任何键值对。ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过add(T)或者addAll(List)进行添加元素,通过Iterable get()获得整个列表。还可以通过update(List)覆盖当前的列表。ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与ListState类似,但使用add(T)增加元素,会使用提供的ReduceFunction进行聚合。AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合。和ReducingState相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与ListState类似,但使用add(IN)添加的元素会用指定的AggregateFunction进行聚合。
所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。 另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。
你必须创建一个 StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。 根据不同的状态类型,可以创建ValueStateDescriptor,ListStateDescriptor, ReducingStateDescriptor,FoldingStateDescriptor 或 MapStateDescriptor。
状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。请参阅这里获取相关信息, 但是我们很快也会看到一个例子。RichFunction 中 RuntimeContext 提供如下方法:
ValueState getState(ValueStateDescriptor)ReducingState getReducingState(ReducingStateDescriptor)ListState getListState(ListStateDescriptor)AggregatingState getAggregatingState(AggregatingStateDescriptor)FoldingState getFoldingState(FoldingStateDescriptor)MapState getMapState(MapStateDescriptor)
下面是一个 ValueStat 的例子,展示了如何将这些部分组合起来:
object StateCustomApp { |
下面是一个 MapState 的例子,展示了如何将这些部分组合起来:
object StateCustomApp { |
使用 Managed Operator State
用户可以通过实现 CheckpointedFunction 或 ListCheckpointed 接口来使用 managed operator state。
ListCheckpointed 接口是 CheckpointedFunction 的精简版,仅支持 even-split redistributuion 的 list state。同样需要实现两个方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception; |
snapshotState() 需要返回一个将写入到 checkpoint 的对象列表,restoreState 则需要处理恢复回来的对象列表。如果状态不可切分, 则可以在 snapshotState() 中返回 Collections.singletonList(MY_STATE)。
object StateCustomApp { |
重启策略
当 Task 发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以使得作业恢复到正常执行状态。
Flink 作业如果没有定义重启策略,则会遵循集群启动时加载的默认重启策略。 如果提交作业时设置了重启策略,该策略将覆盖掉集群的默认策略。
Fixed Delay Restart Strategy
固定延时重启策略按照给定的次数尝试重启作业。 如果尝试超过了给定的最大次数,作业将最终失败。 在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。
通过在 flink-conf.yaml 中设置如下配置参数,默认启用此策略。
restart-strategy: fixed-delay |
固定延迟重启策略也可以在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment(); |
