// Writes the given value to the sink. This function is called for every record. defaultvoidinvoke(IN value)throws Exception defaultvoidinvoke(IN value, Context context)throws Exception // Context接口中返回关于时间的信息 interface Context<T> {
/** Returns the current processing time. */ longcurrentProcessingTime();
/** Returns the current event-time watermark. */ longcurrentWatermark();
/** * Returns the timestamp of the current input record or {@code null} if the element does not * have an assigned timestamp. */ Long timestamp(); }
public DataStreamSink<T> print(){ PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(); return addSink(printFunction).name("Print to Std. Out"); }
/** * Instantiates a print sink function that prints to standard out. */ publicPrintSinkFunction(){ writer = new PrintSinkOutputWriter<>(false); }
/** * Instantiates a print sink function that prints to standard out. * * @param stdErr True, if the format should print to standard error instead of standard out. */ publicPrintSinkFunction(finalboolean stdErr){ writer = new PrintSinkOutputWriter<>(stdErr); }
/** * Instantiates a print sink function that prints to standard out and gives a sink identifier. * * @param stdErr True, if the format should print to standard error instead of standard out. * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value */ publicPrintSinkFunction(final String sinkIdentifier, finalboolean stdErr){ writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr); }
val env = StreamExecutionEnvironment.getExecutionEnvironment
val files = env.readTextFile("tunan-flink-stream/data/access.txt")
val result = files.map(row => { val words = row.split(",").map(_.trim) Access(words(0).toLong, words(1), words(2).toLong) }).keyBy(x => (x.time, x.domain)).sum(2)
result.addSink(new CustomMySQL)
env.execute(this.getClass.getSimpleName) } }
class CustomMySQL extends RichSinkFunction[Access]{ var conn:Connection = _ var pstate:PreparedStatement =_
override def open(parameters: Configuration): Unit = { conn = MySQLUtils.getConnection pstate = conn.prepareStatement("REPLACE INTO access(time,domain,traffic) values(?,?,?)") }