Flink DataSink

概述

flink的sink是flink三大逻辑结构之一(source,transform,sink),功能就是负责把flink处理后的数据输出到外部系统中,flink 的sink和source的代码结构类似。

在编写代码的过程中,我们可以使用flink已经提供的sink,如kafka,jdbc,es等,当然我们也可以通过自定义的方式,来实现我们自己的sink。下面说明核心类

下图是官方给出的sink方式,都是DataStream类的API,直接调用即可实现sink,刚才代码中的print就是其中一个:

核心类

SinkFunction 是一个接口,类似于SourceFunction接口。SinkFunction中主要包含一个方法,那就是用于数据输出的invoke 方法,每条记录都会执行一次invoke方法,用于执行输出操作。

// Writes the given value to the sink. This function is called for every record.
default void invoke(IN value) throws Exception
default void invoke(IN value, Context context) throws Exception
// Context接口中返回关于时间的信息
interface Context<T> {

/** Returns the current processing time. */
long currentProcessingTime();

/** Returns the current event-time watermark. */
long currentWatermark();

/**
* Returns the timestamp of the current input record or {@code null} if the element does not
* have an assigned timestamp.
*/
Long timestamp();
}

我们一般自定义Sink的时候,都是继承AbstractRichFunction,他是一个抽象类,实现了RichFunction接口。

public abstract class AbstractRichFunction implements RichFunction, Serializable

并且提供了关于RuntimContext的操作和open,clone方法。

AbstractRichFunction 有很多实现类,比如直接输出结果的 PrintSinkFunction,在我们开发的过程中,我们进程用print语句来打印结果,但是print函数中就是讲PrintSinkFunction类传递到addSink方法中。

public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}

PrintSinkFunction

我们这里分析一下PrintSinkFunction这个类,这个类就是将没个元素输出到标准输出或者是标准错误输出流中。

public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {

private static final long serialVersionUID = 1L;

private final PrintSinkOutputWriter<IN> writer;

/**
* Instantiates a print sink function that prints to standard out.
*/
public PrintSinkFunction() {
writer = new PrintSinkOutputWriter<>(false);
}

/**
* Instantiates a print sink function that prints to standard out.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
*/
public PrintSinkFunction(final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(stdErr);
}

/**
* Instantiates a print sink function that prints to standard out and gives a sink identifier.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
*/
public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}

@Override
public void invoke(IN record) {
writer.write(record);
}

@Override
public String toString() {
return writer.toString();
}
}

分析:

1、调用构造函数来创建一个PrintSinkOutputWriter

2、调用open方法中在调用PrintSinkOutputWriter 的open方法,进行初始化

3、调用invoke方法,通过PrintSinkOutputWriter 的writer方法吧record输出

自定义MySQL Sink

我们这里自定义一个msyql的sink,也就是把flink中的数据,最后输出到mysql中。

object WriteToMySQL {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

val files = env.readTextFile("tunan-flink-stream/data/access.txt")

val result = files.map(row => {
val words = row.split(",").map(_.trim)
Access(words(0).toLong, words(1), words(2).toLong)
}).keyBy(x => (x.time, x.domain)).sum(2)

result.addSink(new CustomMySQL)

env.execute(this.getClass.getSimpleName)
}
}

class CustomMySQL extends RichSinkFunction[Access]{
var conn:Connection = _
var pstate:PreparedStatement =_

override def open(parameters: Configuration): Unit = {
conn = MySQLUtils.getConnection
pstate = conn.prepareStatement("REPLACE INTO access(time,domain,traffic) values(?,?,?)")
}

// 每条数据做一次插入操作,性能低下,需要根据window优化
override def invoke(value: Access): Unit = {
pstate.setLong(1,value.time)
pstate.setString(2,value.domain)
pstate.setLong(3,value.traffics)

pstate.execute()
}

override def close(): Unit = {
MySQLUtils.close(conn,pstate,null)
}
}
Author: Tunan
Link: http://yerias.github.io/2021/01/03/flink/13/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.