在第一次建立Hbase表的时候,我们可能需要往里面一次性导入大量的初始化数据。我们很自然地想到将数据一条条插入到HBase中,或者通过MR方式等。但是这些方式不是慢就是在导入的过程的占用Region资源导致效率低下,所以很不适合一次性导入大量数据。
总的来说,使用 Bulk Load 方式由于利用了 HBase的数据信息是按照特定格式存储在 HDFS 里的这一特性,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后完成巨量数据快速入库的操作,配合 MapReduce 完成这样的操作,不占用 Region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。Bulk Load 的实现原理是通过一个 MapReduce Job 来实现的,通过 Job 直接生成一个 HBase的内部 HFile 格式文件,用来形成一个特殊的 HBase数据表,然后直接将数据文件加载到运行的集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。
HBase工具类 import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Connection , ConnectionFactory }import org.apache.hadoop.hbase.mapreduce.{TableInputFormat , TableOutputFormat }object HBaseUtil { def getConnection (host: String , port: String , table: String , tmpDir: String = "/tmp" ): Connection = { val conf = HBaseConfiguration .create conf.set("hbase.zookeeper.quorum" , host) conf.set("hbase.zookeeper.property.clientPort" , port) conf.set(TableOutputFormat .OUTPUT_TABLE , table) conf.set(TableInputFormat .INPUT_TABLE , table) conf.set("mapreduce.output.fileoutputformat.outputdir" , tmpDir) val conn = ConnectionFactory .createConnection(conf) conn } def closeConnection (conn: Connection ): Unit = { if (conn != null ) { conn.close() } } }
Spark读取HBase数据 import com.tunan.spark.utils.hbase.HBaseUtil import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf , SparkContext }object HBaseSource { val HOST_NAME = "aliyun" val PORT = "2181" val TABLE_NAME = "student" val CF = "cf" def main (args: Array [String ]): Unit = { val conf = new SparkConf ().setMaster("local[*]" ).setAppName(this .getClass.getSimpleName) val sc = new SparkContext (conf) val conn = HBaseUtil .getConnection(HOST_NAME , PORT , TABLE_NAME ) val hConf = conn.getConfiguration val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat ], classOf[ImmutableBytesWritable ], classOf[Result ]) rdd.map(_._2) .foreachPartition(partition => { partition.foreach(row => { val id = Bytes .toString(row.getRow) val name = Bytes .toString(row.getValue(Bytes .toBytes(CF ), Bytes .toBytes("name" ))) val age = Bytes .toString(row.getValue(Bytes .toBytes(CF ),Bytes .toBytes("age" ))) println(Student (id,name,age)) }) }) sc.stop() } private case class Student (id:String ,name:String ,age:String ) }
Spark写入HBase数据 import com.tunan.spark.utils.hbase.HBaseUtil import org.apache.hadoop.hbase.client.{Put , Result }import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf , SparkContext }object HBaseSink { val HOST_NAME = "aliyun" val PORT = "2181" val TABLE_NAME = "student" val CF = "cf" def main (args: Array [String ]): Unit = { val conf = new SparkConf ().setMaster("local[2]" ).setAppName(this .getClass.getSimpleName) val sc = new SparkContext (conf) val rdd = sc.parallelize(Student ("001" , "zs" , "18" ) :: Student ("002" , "ls" , "28" ) :: Student ("003" , "ww" , "38" ) :: Nil ) val conn = HBaseUtil .getConnection(HOST_NAME , PORT , TABLE_NAME ) val hConf = conn.getConfiguration val job = Job .getInstance(hConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable ]) job.setOutputValueClass(classOf[Result ]) job.setOutputFormatClass(classOf[TableOutputFormat [ImmutableBytesWritable ]]) val result = rdd.mapPartitions(partition => { partition.map(row => { val put = new Put (Bytes .toBytes(row.id)) put.addColumn(Bytes .toBytes(CF ), Bytes .toBytes("name" ), Bytes .toBytes(row.name)) put.addColumn(Bytes .toBytes(CF ), Bytes .toBytes("age" ), Bytes .toBytes(row.age)) (new ImmutableBytesWritable , put) }) }) result.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } private case class Student (id:String ,name:String ,age:String ) }
补充
TableInputFormat
专门处理基于HBase的MapReduce的输入数据的格式类(读取)
hbaseConf.set(TableInputFormat .SCAN_ROW_START , startRowKey) hbaseConf.set(TableInputFormat .SCAN_ROW_STOP , endRowKey)
TableOutputFormat
专门处理基于HBase的MapReduce的输出数据的格式类(写出)
ImmutableBytesWritable
是一种数据类型,一般作为RowKey使用,也可以读取成字符串
Result
HBase客户端读取的结果集
newAPIHadoopRDD
获取给定Hadoop文件的RDD,其中包含任意的新API InputFormat和传递给输入格式的额外配置选项。
因为Hadoop的RecordReader类对每条记录重用相同的对象,直接缓存返回的RDD或直接将其传递给聚合或shuffle操作将创建对同一对象的许多引用。如果打算直接缓存、排序或聚合Hadoop对象,应该首先使用“map”函数复制它们 。
saveAsNewAPIHadoopDataset
使用新的Hadoop API将RDD输出到任何支持Hadoop的存储系统,使用用于该存储系统的Hadoop配置对象。Conf应该设置一个OutputFormat和任何需要的输出路径(例如要写入的表名 ),就像Hadoop MapReduce作业一样。
我们应该确保我们的任务是等幂的,不然可能会出现错误的结果,因为Hadoop目录新的数据会自动覆盖旧的,也就是可能错误的数据把正确的数据覆盖掉issues 。
参考链接: https://www.iteblog.com/archives/1889.html
参考链接: https://www.iteblog.com/archives/1891.html