Spark使用TableInputFormat和TableOutputFormat高效读写HBase

在第一次建立Hbase表的时候,我们可能需要往里面一次性导入大量的初始化数据。我们很自然地想到将数据一条条插入到HBase中,或者通过MR方式等。但是这些方式不是慢就是在导入的过程的占用Region资源导致效率低下,所以很不适合一次性导入大量数据。

总的来说,使用 Bulk Load 方式由于利用了 HBase的数据信息是按照特定格式存储在 HDFS 里的这一特性,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后完成巨量数据快速入库的操作,配合 MapReduce 完成这样的操作,不占用 Region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。Bulk Load 的实现原理是通过一个 MapReduce Job 来实现的,通过 Job 直接生成一个 HBase的内部 HFile 格式文件,用来形成一个特殊的 HBase数据表,然后直接将数据文件加载到运行的集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。

HBase工具类

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}

object HBaseUtil {

def getConnection(host: String, port: String, table: String, tmpDir: String = "/tmp"): Connection = {
val conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", host)
conf.set("hbase.zookeeper.property.clientPort", port)
conf.set(TableOutputFormat.OUTPUT_TABLE, table)
conf.set(TableInputFormat.INPUT_TABLE, table)
//插入的时临时文件存储位置,大数据量需要
conf.set("mapreduce.output.fileoutputformat.outputdir", tmpDir)

val conn = ConnectionFactory.createConnection(conf)
conn
}

def closeConnection(conn: Connection): Unit = {
if (conn != null) {
conn.close()
}
}
}

Spark读取HBase数据

import com.tunan.spark.utils.hbase.HBaseUtil
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/*
spark rdd 读取 HBASE的 最优解方案
*/


object HBaseSource {

val HOST_NAME = "aliyun"
val PORT = "2181"
val TABLE_NAME = "student"

val CF = "cf"

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)

val conn = HBaseUtil.getConnection(HOST_NAME, PORT, TABLE_NAME)
val hConf = conn.getConfiguration

val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

rdd.map(_._2)
.foreachPartition(partition => {
partition.foreach(row => {
val id = Bytes.toString(row.getRow)
val name = Bytes.toString(row.getValue(Bytes.toBytes(CF), Bytes.toBytes("name")))
val age = Bytes.toString(row.getValue(Bytes.toBytes(CF),Bytes.toBytes("age")))
println(Student(id,name,age))
})
})
sc.stop()
}
private case class Student(id:String,name:String,age:String)
}

Spark写入HBase数据

import com.tunan.spark.utils.hbase.HBaseUtil
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}


/*
spark rdd 写入 HBASE的 最优解方案
*/

object HBaseSink {

val HOST_NAME = "aliyun"
val PORT = "2181"
val TABLE_NAME = "student"

val CF = "cf"

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)

val rdd = sc.parallelize(Student("001", "zs", "18") :: Student("002", "ls", "28") :: Student("003", "ww", "38") :: Nil)

val conn = HBaseUtil.getConnection(HOST_NAME, PORT, TABLE_NAME)
val hConf = conn.getConfiguration

val job = Job.getInstance(hConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

val result = rdd.mapPartitions(partition => {
partition.map(row => {
val put = new Put(Bytes.toBytes(row.id))
put.addColumn(Bytes.toBytes(CF), Bytes.toBytes("name"), Bytes.toBytes(row.name))
put.addColumn(Bytes.toBytes(CF), Bytes.toBytes("age"), Bytes.toBytes(row.age))
(new ImmutableBytesWritable, put)
})
})

result.saveAsNewAPIHadoopDataset(job.getConfiguration)

sc.stop()
}

private case class Student(id:String,name:String,age:String)
}

补充

  • TableInputFormat 专门处理基于HBase的MapReduce的输入数据的格式类(读取)

    hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey)
    hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey)
  • TableOutputFormat专门处理基于HBase的MapReduce的输出数据的格式类(写出)

  • ImmutableBytesWritable 是一种数据类型,一般作为RowKey使用,也可以读取成字符串

  • Result HBase客户端读取的结果集

  • newAPIHadoopRDD 获取给定Hadoop文件的RDD,其中包含任意的新API InputFormat和传递给输入格式的额外配置选项。

    • 因为Hadoop的RecordReader类对每条记录重用相同的对象,直接缓存返回的RDD或直接将其传递给聚合或shuffle操作将创建对同一对象的许多引用。如果打算直接缓存、排序或聚合Hadoop对象,应该首先使用“map”函数复制它们
  • saveAsNewAPIHadoopDataset 使用新的Hadoop API将RDD输出到任何支持Hadoop的存储系统,使用用于该存储系统的Hadoop配置对象。Conf应该设置一个OutputFormat和任何需要的输出路径(例如要写入的表名),就像Hadoop MapReduce作业一样。

    • 我们应该确保我们的任务是等幂的,不然可能会出现错误的结果,因为Hadoop目录新的数据会自动覆盖旧的,也就是可能错误的数据把正确的数据覆盖掉issues

参考链接: https://www.iteblog.com/archives/1889.html

参考链接: https://www.iteblog.com/archives/1891.html

Author: Tunan
Link: http://yerias.github.io/2021/04/11/spark/41/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.