一些转换操作(join, coGroup, keyBy, groupBy)要求在元素集合上定义键。另外一些转换操作 (Reduce, GroupReduce, Aggregate, Windows)允许在应用这些转换之前将数据按键分组。
如下对 DataSet 分组
DataSet<...> input = // [...] |
如下对 DataStream 指定键
DataStream<...> input = // [...] |
Flink 的数据模型不是基于键值对的。因此你不需要将数据集类型物理地打包到键和值中。键都是“虚拟的”,它们的功能是指导分组算子用哪些数据来分组。
请注意:下面的讨论中我们将以 DataStream
和 keyby
为例。 对于 DataSet API 你只需要用 DataSet
和 groupBy
替换即可。
为 Tuple 定义键
最简单的方式是按照 Tuple 的一个或多个字段进行分组:
// 按照第一个字段(整型字段)对 Tuple 分组 |
// 这里我们用第一个字段和第二个字段组成的组合键对 Tuple 分组 |
对于嵌套 Tuple 请注意: 如果你的 DataStream 是嵌套 Tuple,例如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds; |
指定 keyBy(0)
将导致系统使用整个 Tuple2
作为键(一个整数和一个浮点数)。 如果你想“进入”到 Tuple2
的内部,你必须使用如下所述的字段表达式键。
使用字段表达式定义键
可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组、排序、join 或 coGrouping 的键。
字段表达式可以很容易地选取复合(嵌套)类型中的字段,例如 Tuple 和 POJO 类型。
下例中,我们有一个包含“word”和“count”两个字段的 POJO:WC
。要用 word
字段分组,我们只需要把它的名字传给 keyBy()
函数即可。
// 普通的 POJO(简单的 Java 对象) |
字段表达式语法:
- 根据字段名称选择 POJO 的字段。例如
“user”
就是指 POJO 类型的“user”字段。 - 根据 1 开始的字段名称或 0 开始的字段索引选择 Tuple 的字段。例如
“_1”
和“5”
分别指 Java Tuple 类型的第一个和第六个字段。 - 可以选择 POJO 和 Tuple 的嵌套字段。 例如,一个 POJO 类型有一个“user”字段还是一个 POJO 类型,那么
“user.zip”
即指这个“user”字段的“zip”字段。任意嵌套和混合的 POJO 和 Tuple都是支持的,例如“_2.user.zip”
或“user._4.1.zip”
。 - 可以使用
"*"
通配符表达式选择完整的类型。这也适用于非 Tuple 或 POJO 类型。
字段表达式示例:
class WC(var complex: ComplexNestedClass, var count: Int) { |
这些字段表达式对于以上代码示例都是合法的:
"count"
:WC
类的 count 字段。"complex"
:递归选择 POJO 类型ComplexNestedClass
的 complex 字段的全部字段。"complex.word._3"
:选择嵌套Tuple3
类型的最后一个字段。"complex.hadoopCitizen"
:选择 hadoop 的IntWritable
类型。
使用键选择器函数定义键
定义键的另一种方法是“键选择器”函数。键选择器函数将单个元素作为输入并返回元素的键。键可以是任意类型,并且可以由确定性计算得出。
下例展示了一个简单返回对象字段的键选择器函数:
// 普通的 case class |