Flink三种指定Key的方式

一些转换操作(join, coGroup, keyBy, groupBy)要求在元素集合上定义键。另外一些转换操作 (Reduce, GroupReduce, Aggregate, Windows)允许在应用这些转换之前将数据按键分组。

如下对 DataSet 分组

DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*在这里定义键*/)
.reduceGroup(/*一些处理操作*/);

如下对 DataStream 指定键

DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*在这里定义键*/)
.window(/*指定窗口*/);

Flink 的数据模型不是基于键值对的。因此你不需要将数据集类型物理地打包到键和值中。键都是“虚拟的”,它们的功能是指导分组算子用哪些数据来分组。

请注意:下面的讨论中我们将以 DataStreamkeyby 为例。 对于 DataSet API 你只需要用 DataSetgroupBy 替换即可。

为 Tuple 定义键

最简单的方式是按照 Tuple 的一个或多个字段进行分组:

// 按照第一个字段(整型字段)对 Tuple 分组
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)
// 这里我们用第一个字段和第二个字段组成的组合键对 Tuple 分组
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)

对于嵌套 Tuple 请注意: 如果你的 DataStream 是嵌套 Tuple,例如:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

指定 keyBy(0) 将导致系统使用整个 Tuple2 作为键(一个整数和一个浮点数)。 如果你想“进入”到 Tuple2 的内部,你必须使用如下所述的字段表达式键。

使用字段表达式定义键

可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组、排序、join 或 coGrouping 的键。

字段表达式可以很容易地选取复合(嵌套)类型中的字段,例如 TuplePOJO 类型。

下例中,我们有一个包含“word”和“count”两个字段的 POJO:WC。要用 word 字段分组,我们只需要把它的名字传给 keyBy() 函数即可。

// 普通的 POJO(简单的 Java 对象)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*指定窗口*/)

// 或者,代码少一点的 case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*指定窗口*/)

字段表达式语法

  • 根据字段名称选择 POJO 的字段。例如 “user” 就是指 POJO 类型的“user”字段。
  • 根据 1 开始的字段名称或 0 开始的字段索引选择 Tuple 的字段。例如 “_1”“5” 分别指 Java Tuple 类型的第一个和第六个字段。
  • 可以选择 POJO 和 Tuple 的嵌套字段。 例如,一个 POJO 类型有一个“user”字段还是一个 POJO 类型,那么 “user.zip” 即指这个“user”字段的“zip”字段。任意嵌套和混合的 POJO 和 Tuple都是支持的,例如 “_2.user.zip”“user._4.1.zip”
  • 可以使用 "*" 通配符表达式选择完整的类型。这也适用于非 Tuple 或 POJO 类型。

字段表达式示例:

class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}

class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}

这些字段表达式对于以上代码示例都是合法的:

  • "count"WC 类的 count 字段。
  • "complex":递归选择 POJO 类型 ComplexNestedClass 的 complex 字段的全部字段。
  • "complex.word._3":选择嵌套 Tuple3 类型的最后一个字段。
  • "complex.hadoopCitizen":选择 hadoop 的 IntWritable 类型。

使用键选择器函数定义键

定义键的另一种方法是“键选择器”函数。键选择器函数将单个元素作为输入并返回元素的键。键可以是任意类型,并且可以由确定性计算得出。

下例展示了一个简单返回对象字段的键选择器函数:

// 普通的 case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )
Author: Tunan
Link: http://yerias.github.io/2020/12/20/flink/2/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.