引入hbase-flink的pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.12</artifactId> <version>1.10.0</version> </dependency>
|
查看org.apache.flink.addons.hbase.HBaseUpsertSinkFunction类,实现了可以根据缓存大小、数据数量、等待时长三个条件触发flush数据。并且实现了CheckpointedFunction类,可以做到数据快照和恢复。写入任何数据库都可以根据这个类改写触发flush条件,如ClickHouse
有其他需求需要研究HBaseUpsertSinkFunction的实现方式,下面是写入hbase的使用方法。
object HBaseStreamSinkApp {
def write2HBaseWithRichSinkFunction(): Unit ={ val topic = "test22" val props = new Properties props.put("bootstrap.servers", "aliyun:9092") props.put("group.id", "test") props.put("enable.auto.commit", "false") props.put("auto.commit.interval.ms", "1000") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.enableCheckpointing(3000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val consumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props) val stream = env.addSource(consumer) stream.addSink(new newHBaseWrite)
env.execute(this.getClass.getSimpleName) }
def main(args: Array[String]): Unit = {
write2HBaseWithRichSinkFunction()
}
class newHBaseWrite extends RichSinkFunction[String]{
var hbaseSink: HBaseUpsertSinkFunction = _ var config:Configuration = _
override def open(parameters: org.apache.flink.configuration.Configuration): Unit = { config = HBaseConfiguration.create() config.set(HConstants.ZOOKEEPER_QUORUM, "aliyun") config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000) config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
val schema = new HBaseTableSchema() schema.setRowKey("id",classOf[String]) schema.addColumn("cf","name",classOf[String]) schema.addColumn("cf","age",classOf[String])
hbaseSink = new HBaseUpsertSinkFunction("student",schema,config,Int.MaxValue,3,Int.MaxValue) hbaseSink.open(null) }
override def invoke(in: String): Unit = {
val words = in.split(",")
if(words.length == 3){ println(words.mkString(" - ")) val row = new Row(2) val f = new Row(2) f.setField(0,words(1)) f.setField(1,words(2)) row.setField(0,words(0)) row.setField(1,f)
hbaseSink.invoke(new org.apache.flink.api.java.tuple.Tuple2(true,row),null)
} }
override def close(): Unit = { if(hbaseSink != null){ hbaseSink.close() } } } }
|
下面是仿写org.apache.flink.addons.hbase.HBaseUpsertSinkFunction类实现的写入MySQL的实现方式
object MySQLRichThreadSink {
def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = FlinkKafkaSource.getEnv
val parameterTool: ParameterTool = ParameterTool.fromPropertiesFile("tunan-flink-2021/kafka-conf/parameters.properties")
val sourceData: DataStream[String] = FlinkKafkaSource.createKafkaSource(parameterTool)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
sourceData.addSink(new DataUpsertSinkFunction)
env.execute(this.getClass.getSimpleName) } }
case class Student(id: Int, name: String, age: Int, sex: String, school: String, time: Long)
class DataUpsertSinkFunction extends RichSinkFunction[String] with CheckpointedFunction { var scalikeOperator: ScalikeOperator = _ val bufferFlushIntervalMillis = 10000000L val bufferFlushMaxListSize = 3L
private val failureThrowable = new AtomicReference[Throwable]()
@transient private var executor: ScheduledExecutorService = _ @transient private var scheduledFuture: ScheduledFuture[_] = _ @transient private var numPendingRequests: AtomicLong = _
var dataOfQueue: ConcurrentLinkedDeque[Seq[Any]] = _
val sql = "INSERT INTO student VALUES(?,?,?,?,?,?)"
override def open(parameters: Configuration): Unit = { dataOfQueue = new ConcurrentLinkedDeque[Seq[Any]] scalikeOperator = new ScalikeOperator
this.executor = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("data-upsert-sink-flusher"))
this.scheduledFuture = this.executor.scheduleWithFixedDelay(new Runnable {
def exe = this.synchronized({ try { println("10秒时间到,触发flush") flush() } catch { case e: Exception => e.printStackTrace() failureThrowable.compareAndSet(null, e) } })
override def run(): Unit = exe }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS)
this.numPendingRequests = new AtomicLong(0)
scalikeOperator.getConnection() }
override def invoke(value: String): Unit = {
val next = dataOfQueue.iterator() while(next.hasNext){ println(next.next()) }
println("===========================") if (value == null || value == "") { println(" 输入数据为空") }else { val words = value.split(",").map(_.trim) if(words.length == 6){ val student = Student(words(0).toInt, words(1), words(2).toInt, words(3), words(4), words(5).toLong)
dataAddQueue(student)
println(numPendingRequests.get() + 1)
if (bufferFlushMaxListSize > 0 && numPendingRequests.incrementAndGet >= bufferFlushMaxListSize) { println("记录超过3条,触发flush") flush()
numPendingRequests = new AtomicLong(0) } }else{ println("字符长度不符合6个") } } }
private def flush(): Unit = {
println("触发flush方法")
val dataList = new ListBuffer[Seq[Any]]
val dataIt = dataOfQueue.iterator() while (dataIt.hasNext) { println("处理数据") dataList.append(dataIt.next()) dataIt.remove() }
if (dataList.size >= 0) { try { scalikeOperator.insertBatchData(sql, dataList) }catch { case e:Exception => e.printStackTrace() } numPendingRequests.set(0) dataList.clear() } }
def dataAddQueue(student: Student): Unit = { dataOfQueue.add(Seq(student.id,student.name,student.age,student.sex,student.school,student.time)) }
override def snapshotState(functionSnapshotContext: FunctionSnapshotContext): Unit = { while (this.numPendingRequests.get() != 0) { println("--------snapshotState----------") this.flush() } }
override def initializeState(functionInitializationContext: FunctionInitializationContext): Unit = { } }
|