DataSet高效读写HBase

底层利用的是MapReduce的InputFormat和OutputFormat批量读写HFile文件,最大提升性能。

Flink读取HBase数据

import com.tunan.utils.HBaseUtil
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.hadoop.hbase.client.{Connection, HTable, Result, Scan, Table}
import org.apache.flink.api.java.tuple.Tuple4
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.util.Bytes


object HBaseSourceApp {

val HOST = "aliyun"
val PORT = "2181"
val TABLE_NAME = "student"
val CF = "cf"


def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment

env.createInput(new HBaseSource).print()

env.execute(this.getClass.getSimpleName)
}

class HBaseSource extends TableInputFormat[Tuple4[String, String, Int, String]] {
@transient var ht: HTable = _
@transient var connection: Connection = _

override def close(): Unit = {
if(ht != null){
ht.close()
}
if(connection != null){
connection.close()
}
}

def createTable(): HTable = {
try {
connection = HBaseUtil.getConnection(HOST, PORT, TABLE_NAME)
ht = connection.getTable(TableName.valueOf(TABLE_NAME)).asInstanceOf[HTable]
} catch {
case ex: Exception => ex.printStackTrace()
}
ht
}

override def configure(parameters: Configuration): Unit = {
table = createTable()
if (table != null) {
scan = getScanner
}
}

override def getScanner: Scan = {
val scan = new Scan()
scan.withStartRow(Bytes.toBytes("001"))
scan.withStopRow(Bytes.toBytes("003"))
scan.addFamily(CF.getBytes())
scan
}

override def getTableName: String = {
TABLE_NAME
}

override def mapResultToTuple(result: Result): Tuple4[String, String, Int, String] = {
new Tuple4(
Bytes.toString(result.getRow),
Bytes.toString(result.getValue(CF.getBytes(), "name".getBytes())),
Bytes.toString(result.getValue(CF.getBytes(), "age".getBytes())).toInt,
Bytes.toString(result.getValue(CF.getBytes(), "city".getBytes())
)
)
}
}
}

Flink写入HBase数据

import com.tunan.utils.HBaseUtil
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.hadoop.hbase.client.{Connection, Mutation, Put}
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job

object HBaseSinkApp {

val HOST = "aliyun"
val PORT = "2181"
val TABLE_NAME = "student"

def convertToHBase(input: DataSet[(String, String, Int, String)]): DataSet[(Text, Mutation)] = {
input.map(new RichMapFunction[(String, String, Int, String), (Text, Mutation)] {
override def map(value: (String, String, Int, String)): (Text, Mutation) = {
val cf = "cf".getBytes()
val id = value._1
val name = value._2
val age = value._3.toString
val city = value._4

val text = new Text(id)
val put = new Put(id.getBytes())
if (StringUtils.isNotEmpty(name)) {
put.addColumn(cf, "name".getBytes(), name.getBytes())
}
if (StringUtils.isNotEmpty(age + "")) {
put.addColumn(cf, "age".getBytes(), age.getBytes())
}
if (StringUtils.isNotEmpty(city)) {
put.addColumn(cf, "city".getBytes(), city.getBytes())
}
(text, put)
}
})
}

def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment


val students = for {
i <- 1 to 10
} yield (i + "", s"name_${i}", 30 + i, s"city_${i}")

val input = env.fromCollection(students)
val result = convertToHBase(input)

var connect:Connection = null

try {
connect = HBaseUtil.getConnection(HOST, PORT, TABLE_NAME)
val config = connect.getConfiguration
val job = Job.getInstance(config)
result.output(new HadoopOutputFormat[Text, Mutation](new TableOutputFormat[Text], job))
} catch {
case ex:Exception => ex.printStackTrace()
} finally {
HBaseUtil.closeConnection(connect)
}

env.execute(this.getClass.getSimpleName)
}
}
Author: Tunan
Link: http://yerias.github.io/2021/02/01/flink/7/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.