defconvertToHBase(input: DataSet[(String, String, Int, String)]): DataSet[(Text, Mutation)] = { input.map(newRichMapFunction[(String, String, Int, String), (Text, Mutation)] { overridedefmap(value: (String, String, Int, String)): (Text, Mutation) = { val cf = "cf".getBytes() val id = value._1 val name = value._2 val age = value._3.toString val city = value._4
val text = newText(id) val put = newPut(id.getBytes()) if (StringUtils.isNotEmpty(name)) { put.addColumn(cf, "name".getBytes(), name.getBytes()) } if (StringUtils.isNotEmpty(age + "")) { put.addColumn(cf, "age".getBytes(), age.getBytes()) } if (StringUtils.isNotEmpty(city)) { put.addColumn(cf, "city".getBytes(), city.getBytes()) } (text, put) } }) }
defmain(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val students = for { i <- 1 to 10 } yield (i + "", s"name_${i}", 30 + i, s"city_${i}")
val input = env.fromCollection(students) val result = convertToHBase(input)
var connect:Connection = null
try { connect = HBaseUtil.getConnection(HOST, PORT, TABLE_NAME) val config = connect.getConfiguration val job = Job.getInstance(config) result.output(newHadoopOutputFormat[Text, Mutation](newTableOutputFormat[Text], job)) } catch { case ex:Exception => ex.printStackTrace() } finally { HBaseUtil.closeConnection(connect) }