Flink State Backends

目录

  1. State Backends

State Backends

用 Data Stream API 编写的程序通常以各种形式保存状态:

  • 在 Window 触发之前要么收集元素、要么聚合
  • 转换函数可以使用 key/value 格式的状态接口来存储状态
  • 转换函数可以实现 CheckpointedFunction 接口,使其本地变量具有容错能力

在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 State Backend

可用的 State Backends

Flink 内置了以下这些开箱即用的 state backends :

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

如果不设置,默认使用 MemoryStateBackend。

MemoryStateBackend

MemoryStateBackend 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。

在 CheckPoint 时,State Backend 对状态进行快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager(master),同时 JobManager 也将快照信息存储在堆内存中。

MemoryStateBackend 能配置异步快照。强烈建议使用异步快照来防止数据流阻塞,注意,异步快照默认是开启的。 用户可以在实例化 MemoryStateBackend 的时候,将相应布尔类型的构造参数设置为 false 来关闭异步快照(仅在 debug 的时候使用),例如:

env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE, false));

MemoryStateBackend 的限制:

  • 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。
  • 无论配置的最大状态内存大小(MAX_MEM_STATE_SIZE)有多大,都不能大于 akka frame 大小(看配置参数)。
  • 聚合后的状态必须能够放进 JobManager 的内存中。

MemoryStateBackend 适用场景:

  • 本地开发和调试。
  • 状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。

建议同时将 managed memory 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。

FsStateBackend

FsStateBackend 需要配置一个文件系统的 URL(类型、地址、路径),例如:”hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”。

FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中。 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其写入到 CheckPoint 的元数据文件中)。

FsStateBackend 默认使用异步快照来防止 CheckPoint 写状态时对数据处理造成阻塞。 用户可以在实例化 FsStateBackend 的时候,将相应布尔类型的构造参数设置为 false 来关闭异步快照,例如:

env.setStateBackend(new FsStateBackend("hdfs://hadoop:9000/flink/rocks_state_backend"))

FsStateBackend 适用场景:

  • 状态比较大、窗口比较长、key/value 状态比较大的 Job。
  • 所有高可用的场景。

建议同时将 managed memory 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。

RocksDBStateBackend

RocksDBStateBackend 需要配置一个文件系统的 URL (类型、地址、路径),例如:”hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”。

RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 CheckPoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中。 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其存储到 CheckPoint 的元数据文件中)。RocksDBStateBackend 只支持异步快照。

env.setStateBackend(new RocksDBStateBackend(filebackend, true));

RocksDBStateBackend 的限制:

  • 由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。 重要信息: RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。

RocksDBStateBackend 的适用场景:

  • 状态非常大、窗口非常长、key/value 状态非常大的 Job。
  • 所有高可用的场景。

注意,你可以保留的状态大小仅受磁盘空间的限制。与状态存储在内存中的 FsStateBackend 相比,RocksDBStateBackend 允许存储非常大的状态。 然而,这也意味着使用 RocksDBStateBackend 将会使应用程序的最大吞吐量降低。 所有的读写都必须序列化、反序列化操作,这个比基于堆内存的 state backend 的效率要低很多。

更多的进阶操作查看官网

设置每个 Job 的 State Backend

StreamExecutionEnvironment 可以对每个 Job 的 State Backend 进行设置,如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))

如果你想在 IDE 中使用 RocksDBStateBackend,或者需要在作业中通过编程方式动态配置 RocksDBStateBackend,必须添加以下依赖到 Flink 项目中。

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>

设置默认的(全局的) State Backend

flink-conf.yaml 可以通过键 state.backend 设置默认的 State Backend。

可选值包括 jobmanager (MemoryStateBackend)、filesystem (FsStateBackend)、rocksdb (RocksDBStateBackend), 或使用实现了 state backend 工厂 StateBackendFactory 的类的全限定类名, 例如: RocksDBStateBackend 对应为 org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

state.checkpoints.dir 选项指定了所有 State Backend 写 CheckPoint 数据和写元数据文件的目录。 你能在这里找到关于 CheckPoint 目录结构的详细信息。

配置文件的部分示例如下所示:

# 用于存储 operator state 快照的 State Backend
state.backend: filesystem

# 存储快照的目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

增量快照

RocksDBStateBackend 支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。

虽然状态数据量很大时我们推荐使用增量快照,但这并不是默认的快照机制,您需要通过下述配置手动开启该功能:

  • flink-conf.yaml 中设置:state.backend.incremental: true 或者
  • 在代码中按照右侧方式配置(来覆盖默认配置):RocksDBStateBackend backend = new RocksDBStateBackend(filebackend, true);

三种State Backends对比

类别 State Checkpoint 场景
MemoryStateBackend 存在taskManager上的 jobmanager 测试
FsStateBackend 存在taskManager上的 fs 测试/部分生产
RocksDBStateBackend 存在RocksDB上 hdfs 生产
Author: Tunan
Link: http://yerias.github.io/2021/01/17/flink/6/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.