目录
- RDD转换DadaFrame
- 使用SQL操作数据源
- 跨数据源join
- SQL与DF与DS的比较
- Spark元数据管理: catalog
RDD转换DadaFrame
第一种方式是使用反射来推断包含特定对象类型的RDD的模式
object reflect {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getSimpleName).getOrCreate()
import spark.implicits._
val in = "tunan-spark-sql/data/top.txt"
val fileRDD: RDD[String] = spark.sparkContext.textFile(in)
val mapRDD = fileRDD.map(lines => {
val words = lines.split(",")
people(words(0), words(1), words(2).toInt)
})
mapRDD.toDF().show()
}
case class people(name:String,subject:String,grade:Int)
}通过反射class的这种方式可以获得Schema创建DataFrame,简单通用,但是在创建外部数据源的场景下不适用
第二种方法是通过编程接口,通过StructType可以构造Schema,然后将其应用于现有的RDD
object interface {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getSimpleName).getOrCreate()
val in = "tunan-spark-sql/data/top.txt"
val fileRDD: RDD[String] = spark.sparkContext.textFile(in)
//在原RDD上创建rowRDD
val mapRDD = fileRDD.map(lines => {
val words = lines.split(",")
Row(words(0), words(1), words(2).toDouble)
})
//创建和上一步对应的行结构类型的StructType
val innerStruct =
StructType(
StructField("name", StringType, false) ::
StructField("subject", StringType, false) ::
StructField("grade", DoubleType, false) :: Nil
)
//将schema和Rows结合,创建出DF
val df = spark.createDataFrame(mapRDD, innerStruct)
df.show()
}
}步骤:
- 在原RDD上创建rowRDD
- 创建和上一步对应的行结构类型的StructType
- 将schema和Rows结合,创建出DF
使用SQL操作数据源
在官网的Data Sources 下,每个数据源下都有一个Sql选项卡,其中就是对应的SQL采集源数据,并生成对应的SQL视图的代码,如:
spark-sql (default)> CREATE TEMPORARY VIEW jsonTable |
跨数据源join
跨数据源join是Spark非常好用的一个特性,从不同的数据源拿到spark中,再从spark写出去,简直轻而易举。
下面我们将验证从hive和mysql中分别拿出一个表join(在idea中操作时,需要先连上hive)
jdbc
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hadoop/?characterEncoding=utf-8&useSSL=false")
.option("dbtable", "tunan.dept")
.option("user", "root")
.option("password", "root")
.load()hive
val hiveDF = spark.sql("select * from default.emp")
join
val joinDF: DataFrame = jdbcDF.join(hiveDF, "deptno")
查看结果
>> joinDF.show(false)
+------+----------+-----+-----+------+---------+----+----------+------+------+
|deptno| dname|level|empno| ename| job| jno| date| sal| prize|
+------+----------+-----+-----+------+---------+----+----------+------+------+
| 10|ACCOUNTING| 1700| 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| null|
| 10|ACCOUNTING| 1700| 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|
| 20| RESEARCH| 1800| 7934|MILLER| CLERK|7782| 1982-1-23|1300.0| null|
| 20| RESEARCH| 1800| 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| null|
....
+------+----------+-----+-----+------+---------+----+----------+------+------+我们并不需要全部的数据,下面我们将经过处理选择我们需要的数据
// 定义视实体类
case class EmpDept(deptno:String,dname:String,level:Int,empno:String,ename:String,job:String,jno:String,date:String,sal:Double,prize:String)
case class Result(empno:String,ename:String,deptno:String,dname:String,prize:String)
//DF转换成DS
val joinDS: Dataset[EmpDept] = joinDF.as[EmpDept]
//从DS中拿到数据,反射的方式拿到Schema信息
val mapDS = joinDS.map(x => Result(x.empno, x.ename, x.deptno, x.dname,x.prize))保存数据
// 保存到文件
mapDS.write.format("orc").save("tunan-spark-sql/out")
// 保存到MySQL数据库
mapDS.write.format("jdbc")
.option("url", "jdbc:mysql://hadoop/?characterEncoding=utf-8&useSSL=false")
.option("dbtable", "tunan.join_result")
.option("user", "root")
.option("password", "root")
.mode("overwrite")
.save()查看结果:
+-----+------+------+----------+------+
|empno| ename|deptno| dname| prize|
+-----+------+------+----------+------+
| 7566| JONES| 10|ACCOUNTING| null|
| 7521| WARD| 10|ACCOUNTING| 500.0|
| 7934|MILLER| 20| RESEARCH| null|
| 7902| FORD| 20| RESEARCH| null|
...
+-----+------+------+----------+------+
SQL与DF与DS的比较
小问题:spark.read.load() 这句代码没用指定读取格式,那么它的默认格式是什么?
现在我们需要对比的是SQL、DF、DS三者对Syntax Errors和Analysis Errors的不同程度的响应
在上一步中,我们将joinDF转化成了joinDS,现在我们就看看他们在选择需要的列的时候,做了什么样的执行计划
val selectDF: DataFrame = joinDF.select("ename") |
很明显selectDS做出了优化
00 Project [ename#7] |
由此我们可以根据在Spark SQL应用中,选择列的时候,SQL、DF、DS三者做一个比较
SQL | DF | DS | |
---|---|---|---|
Syntax Errors | runtime | compile | compile |
Analysis Errors | runtime | runtime | compile |
在执行SQL的时候,无论是语法错误还是运行错误,都无法在编译时就提前暴露出来
在执行DF的时候,算子如果写错了,会提前暴露出来,但是写的列名只有在运行的时候才会检查是否正确
在执行DS的时候,由于case class反射的机制,算子和列名都可以提前到代码编写时就检测到错误
所以最优的执行顺序为 DS > DF > SQL
面试题:RDD、DS、DF的区别
- RDD不支持SQL
- DF每一行都是Row类型,不能直接访问字段,必须解析才行
- DS每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获 得每一行的信息
- DataFrame与Dataset均支持spark sql的操作,比如select,group by之类,还 能注册临时表/视窗,进行sql语句操作
- 可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要 写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题。
面试题:RDD和Dataset/DataFrame中的Persist的默认缓存级别
- Dataset 中的缓存级别是 MEMORY_AND_DISK
- RDD 中的缓存级别是 MEMORY_ONLY
面试题:Spark RDD和Spark SQL的的cache有什么区别
Spark RDD的cache是lazy的,需要action才会执行cache操作
Spark SQL的cache是egaer的,马上就cache了
Spark元数据管理: catalog
拿到catalog
val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getSimpleName).enableHiveSupport().getOrCreate() |
展示所有数据库
val dbList: Dataset[Database] = catalog.listDatabases()
dbList.show(false)当前数据库
catalog.currentDatabase
只展示名字
import spark.implicits._
dbList.map(_.name).show()展示指定库的所有表
catalog.setCurrentDatabase("offline_dw")
catalog.listTables().show(false)过滤表
val listTable = catalog.listTables()
listTable.filter('name === "dws_country_traffic").show(false)判断某个表是否缓存
catalog.isCached("dws_country_traffic")
catalog.cacheTable("dws_country_traffic")
catalog.isCached("dws_country_traffic")
catalog.uncacheTable("dws_country_traffic")注意:catalog的cacheTable是lazy的
展示所有函数
catalog.listFunctions().show(1000,false)
注册函数,再次展示
spark.udf.register("udf_string_length",(word:String) => {
word.split(",").length
})
catalog.listFunctions().filter('name === "udf_string_length").show(false)