Flink State 和重启策略

目录

  1. State的工作方式
  2. 重启策略

State的工作方式

Flink 中有两种基本的状态:Keyed StateOperator State

Keyed State

Keyed State 通常和 key 相关,仅可使用在 KeyedStream 的方法和算子中。

你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个 key 仅出现在一个分区内。 逻辑上每个 keyed-state 和唯一元组 <parallel-operator-instance, key> 绑定,由于每个 key 仅”属于” 算子的一个并发,因此简化为 <operator, key>。

Keyed State 会按照 Key Group 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; Key Group 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group

Operator State

对于 Operator State (或者 non-keyed state) 来说,每个 operator state 和一个并发实例进行绑定。 Kafka Connector 是 Flink 中使用 operator state 的一个很好的示例。 每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。

Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。

Raw State 与 Managed State

Keyed StateOperator State 分别有两种存在形式:managed and raw.

Managed State 由 Flink 运行时控制的数据结构表示,比如内部的 hash table 或者 RocksDB。 比如 “ValueState”, “ListState” 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。

Raw State 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。

注意: 所有 datastream 的 function 都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。 由于 Flink 可以在修改并发时更好的分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。

使用 Managed Keyed State

managed keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream 上使用,可以通过 stream.keyBy(...) 得到 KeyedStream.

  • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

  • MapState: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries()keys()values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

  • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

  • AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。 另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。

你必须创建一个 StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。 根据不同的状态类型,可以创建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。请参阅这里获取相关信息, 但是我们很快也会看到一个例子。RichFunctionRuntimeContext 提供如下方法:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

下面是一个 ValueStat 的例子,展示了如何将这些部分组合起来:

object StateCustomApp {

/**
* 根据输入元素求平均数
* 只要到达2个元素我们就开始算
*/
def averageKeyedValueState(env: StreamExecutionEnvironment): Unit = {
env.fromCollection(List(
(1, 3),
(1, 5),
(1, 7),
(1, 4),
(1, 2)
)).keyBy(_._1)
.flatMap(new CountWindowAverage)
.print()
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
averageKeyedValueState(env)
env.execute(this.getClass.getSimpleName)
}

}

// 输入一个 kv 输出一个kv
class CountWindowAverage extends RichFlatMapFunction[(Int, Int), (Int, Int)] {

// 定义ValueState
private var sum: ValueState[(Int, Int)] = _


override def open(parameters: Configuration): Unit = {
// 初始化state
sum = getRuntimeContext.getState(new ValueStateDescriptor[(Int, Int)]("average", createTypeInformation[(Int, Int)]))

}

override def flatMap(value: (Int, Int), out: Collector[(Int, Int)]): Unit = {
// 拿到state的值
val tmpState = sum.value()

// 拿到当前state,看看它是否为null
val currentState = if (null != tmpState) {
tmpState
} else {
(0, 0)
}

// 拿到最新的state
val newState = (currentState._1 + 1, currentState._2 + value._2)

// 更新state
sum.update(newState)

if (newState._1 >= 2) {
// 取平均值
out.collect((value._1, newState._2 / newState._1))

// 清空state
sum.clear()
}
}
}

下面是一个 MapState 的例子,展示了如何将这些部分组合起来:

object StateCustomApp {

def userBehaviorKeyedMapState(env: StreamExecutionEnvironment): Unit = {
env.fromCollection(List(
(1, "buy"),
(1, "cart"),
(1, "buy"),
(1, "fav"),
(2, "buy"),
(2, "buy"),
(2, "fav")
)).keyBy(0)
.flatMap(new UserBehaviorCnt)
.print()
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
userBehaviorKeyedMapState(env)
env.execute(this.getClass.getSimpleName)
}

}

// 统计每个用户某种行为的操作次数
class UserBehaviorCnt extends RichFlatMapFunction[(Int, String), (Int, String, Int)] {

// 定义MapState 维护一个kv类型的state
private var behaviorCntState: MapState[String, Int] = _

override def open(parameters: Configuration): Unit = {
// 初始化MapState
behaviorCntState = getRuntimeContext.getMapState(new MapStateDescriptor[String, Int]("userBehavior", classOf[String], classOf[Int]))
}

override def flatMap(value: (Int, String), out: Collector[(Int, String, Int)]): Unit = {
// 定义一个behaviorCnt用来统计行为的次数,默认值为1
var behaviorCnt = 1
// 第一次进来肯定不存在,则自动赋值为1,第二次进来存在则每次+1
if (behaviorCntState.contains(value._2)) {
behaviorCnt = behaviorCntState.get(value._2) + 1
}

// 维护一个MapState来统计次数
behaviorCntState.put(value._2, behaviorCnt)
// 输出
out.collect((value._1, value._2, behaviorCnt))
}
}

使用 Managed Operator State

用户可以通过实现 CheckpointedFunctionListCheckpointed 接口来使用 managed operator state。

ListCheckpointed 接口是 CheckpointedFunction 的精简版,仅支持 even-split redistributuion 的 list state。同样需要实现两个方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

snapshotState() 需要返回一个将写入到 checkpoint 的对象列表,restoreState 则需要处理恢复回来的对象列表。如果状态不可切分, 则可以在 snapshotState() 中返回 Collections.singletonList(MY_STATE)

object StateCustomApp {
/**
* 使用ListCheckpointed来实现buy的次数
*/
def userBehaviorListCheckpointState(env: StreamExecutionEnvironment):Unit = {
env.fromCollection(List(
(1, "buy"),
(1, "cart"),
(1, "buy"),
(1, "fav"),
(2, "buy"),
(2, "fav"),
(2, "fav")
)).keyBy(x => x._1)
.flatMap(new RichFlatMapFunction[(Int,String),(String,Int)] with ListCheckpointed[Integer]{

// 定义userBuyBehaviorCnt = 0
var userBuyBehaviorCnt = 0

// 对每一行袁元素做操作
override def flatMap(value: (Int, String), out: Collector[(String, Int)]): Unit = {
// 判断用户行为 并做+1操作输出
if (value._2 == "buy"){
userBuyBehaviorCnt +=1
out.collect(value._2,userBuyBehaviorCnt)
}
}

// 返回一个包含userBuyBehaviorCnt的对象列表
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Integer] = {
Collections.singletonList(userBuyBehaviorCnt)
}

// 处理恢复回来的对象列表
import scala.collection.JavaConverters._
override def restoreState(state: util.List[Integer]): Unit = {
// 统计list中的个数而不是值
for(ele <- state.asScala){
userBuyBehaviorCnt+=1
}
}
}).print()
}
}

重启策略

当 Task 发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以使得作业恢复到正常执行状态。

Flink 作业如果没有定义重启策略,则会遵循集群启动时加载的默认重启策略。 如果提交作业时设置了重启策略,该策略将覆盖掉集群的默认策略。

Fixed Delay Restart Strategy

固定延时重启策略按照给定的次数尝试重启作业。 如果尝试超过了给定的最大次数,作业将最终失败。 在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。

通过在 flink-conf.yaml 中设置如下配置参数,默认启用此策略。

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

固定延迟重启策略也可以在程序中设置:

val env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
));
Author: Tunan
Link: http://yerias.github.io/2021/01/16/flink/4/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.