目录
- State的工作方式
- 重启策略
State的工作方式
Flink 中有两种基本的状态:Keyed State
和 Operator State
。
Keyed State
Keyed State 通常和 key 相关,仅可使用在 KeyedStream
的方法和算子中。
你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个 key 仅出现在一个分区内。 逻辑上每个 keyed-state 和唯一元组 <parallel-operator-instance, key> 绑定,由于每个 key 仅”属于” 算子的一个并发,因此简化为 <operator, key>。
Keyed State 会按照 Key Group 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; Key Group 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
Operator State
对于 Operator State (或者 non-keyed state) 来说,每个 operator state 和一个并发实例进行绑定。 Kafka Connector 是 Flink 中使用 operator state 的一个很好的示例。 每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
Raw State 与 Managed State
Keyed State 和 Operator State 分别有两种存在形式:managed and raw.
Managed State 由 Flink 运行时控制的数据结构表示,比如内部的 hash table 或者 RocksDB。 比如 “ValueState”, “ListState” 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。
Raw State 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。
注意:
所有 datastream 的 function 都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。 由于 Flink 可以在修改并发时更好的分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。
使用 Managed Keyed State
managed keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream
上使用,可以通过 stream.keyBy(...)
得到 KeyedStream
.
ValueState
: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过update(T)
进行更新,通过T value()
进行检索。MapState
: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用put(UK,UV)
或者putAll(Map)
添加映射。 使用get(UK)
检索特定 key。 使用entries()
,keys()
和values()
分别检索映射、键和值的可迭代视图。你还可以通过isEmpty()
来判断是否包含任何键值对。ListState
: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过add(T)
或者addAll(List)
进行添加元素,通过Iterable get()
获得整个列表。还可以通过update(List)
覆盖当前的列表。ReducingState
: 保存一个单值,表示添加到状态的所有值的聚合。接口与ListState
类似,但使用add(T)
增加元素,会使用提供的ReduceFunction
进行聚合。AggregatingState
: 保留一个单值,表示添加到状态的所有值的聚合。和ReducingState
相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与ListState
类似,但使用add(IN)
添加的元素会用指定的AggregateFunction
进行聚合。
所有类型的状态还有一个clear()
方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。 另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。
你必须创建一个 StateDescriptor
,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction
。 根据不同的状态类型,可以创建ValueStateDescriptor
,ListStateDescriptor
, ReducingStateDescriptor
,FoldingStateDescriptor
或 MapStateDescriptor
。
状态通过 RuntimeContext
进行访问,因此只能在 rich functions 中使用。请参阅这里获取相关信息, 但是我们很快也会看到一个例子。RichFunction
中 RuntimeContext
提供如下方法:
ValueState getState(ValueStateDescriptor)
ReducingState getReducingState(ReducingStateDescriptor)
ListState getListState(ListStateDescriptor)
AggregatingState getAggregatingState(AggregatingStateDescriptor)
FoldingState getFoldingState(FoldingStateDescriptor)
MapState getMapState(MapStateDescriptor)
下面是一个 ValueStat
的例子,展示了如何将这些部分组合起来:
object StateCustomApp { |
下面是一个 MapState
的例子,展示了如何将这些部分组合起来:
object StateCustomApp { |
使用 Managed Operator State
用户可以通过实现 CheckpointedFunction
或 ListCheckpointed
接口来使用 managed operator state。
ListCheckpointed
接口是 CheckpointedFunction
的精简版,仅支持 even-split redistributuion 的 list state。同样需要实现两个方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception; |
snapshotState()
需要返回一个将写入到 checkpoint 的对象列表,restoreState
则需要处理恢复回来的对象列表。如果状态不可切分, 则可以在 snapshotState()
中返回 Collections.singletonList(MY_STATE)
。
object StateCustomApp { |
重启策略
当 Task 发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以使得作业恢复到正常执行状态。
Flink 作业如果没有定义重启策略,则会遵循集群启动时加载的默认重启策略。 如果提交作业时设置了重启策略,该策略将覆盖掉集群的默认策略。
Fixed Delay Restart Strategy
固定延时重启策略按照给定的次数尝试重启作业。 如果尝试超过了给定的最大次数,作业将最终失败。 在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。
通过在 flink-conf.yaml
中设置如下配置参数,默认启用此策略。
restart-strategy: fixed-delay |
固定延迟重启策略也可以在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment(); |