DataStream高效读写HBase

引入hbase-flink的pom

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.12</artifactId>
<version>1.10.0</version>
</dependency>

查看org.apache.flink.addons.hbase.HBaseUpsertSinkFunction类,实现了可以根据缓存大小、数据数量、等待时长三个条件触发flush数据。并且实现了CheckpointedFunction类,可以做到数据快照和恢复。写入任何数据库都可以根据这个类改写触发flush条件,如ClickHouse

有其他需求需要研究HBaseUpsertSinkFunction的实现方式,下面是写入hbase的使用方法。

object HBaseStreamSinkApp {


def write2HBaseWithRichSinkFunction(): Unit ={
val topic = "test22"
val props = new Properties
props.put("bootstrap.servers", "aliyun:9092")
props.put("group.id", "test")
props.put("enable.auto.commit", "false")
props.put("auto.commit.interval.ms", "1000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(3000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val consumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props)
val stream = env.addSource(consumer)
stream.addSink(new newHBaseWrite)

env.execute(this.getClass.getSimpleName)
}


def main(args: Array[String]): Unit = {

write2HBaseWithRichSinkFunction()

}

class newHBaseWrite extends RichSinkFunction[String]{

var hbaseSink: HBaseUpsertSinkFunction = _
var config:Configuration = _

override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {
config = HBaseConfiguration.create()
config.set(HConstants.ZOOKEEPER_QUORUM, "aliyun")
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)

val schema = new HBaseTableSchema()
schema.setRowKey("id",classOf[String])
schema.addColumn("cf","name",classOf[String])
schema.addColumn("cf","age",classOf[String])


hbaseSink = new HBaseUpsertSinkFunction("student",schema,config,Int.MaxValue,3,Int.MaxValue)
hbaseSink.open(null)
}

override def invoke(in: String): Unit = {

val words = in.split(",")


if(words.length == 3){
println(words.mkString(" - "))
val row = new Row(2)
val f = new Row(2)
f.setField(0,words(1))
f.setField(1,words(2))
row.setField(0,words(0))
row.setField(1,f)

hbaseSink.invoke(new org.apache.flink.api.java.tuple.Tuple2(true,row),null)

}
}

override def close(): Unit = {
if(hbaseSink != null){
hbaseSink.close()
}
}
}
}

下面是仿写org.apache.flink.addons.hbase.HBaseUpsertSinkFunction类实现的写入MySQL的实现方式

object MySQLRichThreadSink {


def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = FlinkKafkaSource.getEnv

val parameterTool: ParameterTool = ParameterTool.fromPropertiesFile("tunan-flink-2021/kafka-conf/parameters.properties")

val sourceData: DataStream[String] = FlinkKafkaSource.createKafkaSource(parameterTool)

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

sourceData.addSink(new DataUpsertSinkFunction)

env.execute(this.getClass.getSimpleName)
}
}

case class Student(id: Int, name: String, age: Int, sex: String, school: String, time: Long)


class DataUpsertSinkFunction extends RichSinkFunction[String] with CheckpointedFunction {
var scalikeOperator: ScalikeOperator = _
val bufferFlushIntervalMillis = 10000000L
val bufferFlushMaxListSize = 3L

private val failureThrowable = new AtomicReference[Throwable]()

@transient private var executor: ScheduledExecutorService = _
@transient private var scheduledFuture: ScheduledFuture[_] = _
@transient private var numPendingRequests: AtomicLong = _

var dataOfQueue: ConcurrentLinkedDeque[Seq[Any]] = _

val sql = "INSERT INTO student VALUES(?,?,?,?,?,?)"

override def open(parameters: Configuration): Unit = {
dataOfQueue = new ConcurrentLinkedDeque[Seq[Any]]
scalikeOperator = new ScalikeOperator

this.executor = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("data-upsert-sink-flusher"))

this.scheduledFuture = this.executor.scheduleWithFixedDelay(new Runnable {

def exe = this.synchronized({
try {
println("10秒时间到,触发flush")
flush()
} catch {
case e: Exception => e.printStackTrace()
failureThrowable.compareAndSet(null, e)
}
})

override def run(): Unit = exe
}, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS)

this.numPendingRequests = new AtomicLong(0)

scalikeOperator.getConnection()
}

override def invoke(value: String): Unit = {

val next = dataOfQueue.iterator()
while(next.hasNext){
println(next.next())
}


println("===========================")
if (value == null || value == "") {
println(" 输入数据为空")
}else {
val words = value.split(",").map(_.trim)
if(words.length == 6){
val student = Student(words(0).toInt, words(1), words(2).toInt, words(3), words(4), words(5).toLong)

dataAddQueue(student)

println(numPendingRequests.get() + 1)

if (bufferFlushMaxListSize > 0 && numPendingRequests.incrementAndGet >= bufferFlushMaxListSize) {
println("记录超过3条,触发flush")
flush()

numPendingRequests = new AtomicLong(0)
}
}else{
println("字符长度不符合6个")
}
}
}


private def flush(): Unit = {

println("触发flush方法")

val dataList = new ListBuffer[Seq[Any]]

val dataIt = dataOfQueue.iterator()
while (dataIt.hasNext) {
println("处理数据")
dataList.append(dataIt.next())
dataIt.remove()
}

if (dataList.size >= 0) {
// 对插入数据的操作做容错处理,错误数据默认丢弃(异常数据)
try {
scalikeOperator.insertBatchData(sql, dataList)
}catch {
case e:Exception => e.printStackTrace()
}
numPendingRequests.set(0)
dataList.clear()
}
}

def dataAddQueue(student: Student): Unit = {
dataOfQueue.add(Seq(student.id,student.name,student.age,student.sex,student.school,student.time))
}



// 不知道哪里调用,如果不注释掉,会在每次写入完成时关闭线程池
// override def close(): Unit = {
// if (this.scheduledFuture != null) {
// this.scheduledFuture.cancel(false)
// if (this.executor != null) this.executor.shutdownNow
// }
//
// scalikeOperator.closeConnection()
// }


// 程序挂了后重启恢复数据
override def snapshotState(functionSnapshotContext: FunctionSnapshotContext): Unit = {
while (this.numPendingRequests.get() != 0) {
println("--------snapshotState----------")
this.flush()
}
}

// 不需要实现
override def initializeState(functionInitializationContext: FunctionInitializationContext): Unit = {
}
}
Author: Tunan
Link: http://yerias.github.io/2021/02/02/flink/12/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.