RDD转换DadaFrame&使用SQL操作数据源&跨数据源join&SQL与DF与DS的比较&Spark元数据管理: catalog

目录

  1. RDD转换DadaFrame
  2. 使用SQL操作数据源
  3. 跨数据源join
  4. SQL与DF与DS的比较
  5. Spark元数据管理: catalog

RDD转换DadaFrame

  1. 第一种方式是使用反射来推断包含特定对象类型的RDD的模式

    object reflect {
    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getSimpleName).getOrCreate()
    import spark.implicits._
    val in = "tunan-spark-sql/data/top.txt"
    val fileRDD: RDD[String] = spark.sparkContext.textFile(in)

    val mapRDD = fileRDD.map(lines => {
    val words = lines.split(",")
    people(words(0), words(1), words(2).toInt)
    })
    mapRDD.toDF().show()
    }
    case class people(name:String,subject:String,grade:Int)
    }

    通过反射class的这种方式可以获得Schema创建DataFrame,简单通用,但是在创建外部数据源的场景下不适用

  2. 第二种方法是通过编程接口,通过StructType可以构造Schema,然后将其应用于现有的RDD

    object interface {
    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getSimpleName).getOrCreate()
    val in = "tunan-spark-sql/data/top.txt"
    val fileRDD: RDD[String] = spark.sparkContext.textFile(in)

    //在原RDD上创建rowRDD
    val mapRDD = fileRDD.map(lines => {
    val words = lines.split(",")
    Row(words(0), words(1), words(2).toDouble)
    })

    //创建和上一步对应的行结构类型的StructType
    val innerStruct =
    StructType(
    StructField("name", StringType, false) ::
    StructField("subject", StringType, false) ::
    StructField("grade", DoubleType, false) :: Nil
    )

    //将schema和Rows结合,创建出DF
    val df = spark.createDataFrame(mapRDD, innerStruct)

    df.show()
    }
    }

    步骤:

    1. 在原RDD上创建rowRDD
    2. 创建和上一步对应的行结构类型的StructType
    3. 将schema和Rows结合,创建出DF

使用SQL操作数据源

在官网的Data Sources 下,每个数据源下都有一个Sql选项卡,其中就是对应的SQL采集源数据,并生成对应的SQL视图的代码,如:

spark-sql (default)> CREATE TEMPORARY VIEW jsonTable
> USING org.apache.spark.sql.json
> OPTIONS (
> path "file:///home/hadoop/data/people.json"
> );
Response code

spark-sql (default)> SELECT * FROM jsonTable;
age name
NULL Michael
30 Andy
19 Justin

跨数据源join

跨数据源join是Spark非常好用的一个特性,从不同的数据源拿到spark中,再从spark写出去,简直轻而易举。

下面我们将验证从hive和mysql中分别拿出一个表join(在idea中操作时,需要先连上hive)

  1. jdbc

    val jdbcDF = spark.read
    .format("jdbc")
    .option("url", "jdbc:mysql://hadoop/?characterEncoding=utf-8&useSSL=false")
    .option("dbtable", "tunan.dept")
    .option("user", "root")
    .option("password", "root")
    .load()
  2. hive

    val hiveDF = spark.sql("select * from default.emp")
  3. join

    val joinDF: DataFrame = jdbcDF.join(hiveDF, "deptno")
  4. 查看结果

    >> joinDF.show(false)
    +------+----------+-----+-----+------+---------+----+----------+------+------+
    |deptno| dname|level|empno| ename| job| jno| date| sal| prize|
    +------+----------+-----+-----+------+---------+----+----------+------+------+
    | 10|ACCOUNTING| 1700| 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| null|
    | 10|ACCOUNTING| 1700| 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|
    | 20| RESEARCH| 1800| 7934|MILLER| CLERK|7782| 1982-1-23|1300.0| null|
    | 20| RESEARCH| 1800| 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| null|
    ....
    +------+----------+-----+-----+------+---------+----+----------+------+------+
  5. 我们并不需要全部的数据,下面我们将经过处理选择我们需要的数据

    // 定义视实体类
    case class EmpDept(deptno:String,dname:String,level:Int,empno:String,ename:String,job:String,jno:String,date:String,sal:Double,prize:String)
    case class Result(empno:String,ename:String,deptno:String,dname:String,prize:String)

    //DF转换成DS
    val joinDS: Dataset[EmpDept] = joinDF.as[EmpDept]
    //从DS中拿到数据,反射的方式拿到Schema信息
    val mapDS = joinDS.map(x => Result(x.empno, x.ename, x.deptno, x.dname,x.prize))
  6. 保存数据

    // 保存到文件
    mapDS.write.format("orc").save("tunan-spark-sql/out")
    // 保存到MySQL数据库
    mapDS.write.format("jdbc")
    .option("url", "jdbc:mysql://hadoop/?characterEncoding=utf-8&useSSL=false")
    .option("dbtable", "tunan.join_result")
    .option("user", "root")
    .option("password", "root")
    .mode("overwrite")
    .save()

    查看结果:

    +-----+------+------+----------+------+
    |empno| ename|deptno| dname| prize|
    +-----+------+------+----------+------+
    | 7566| JONES| 10|ACCOUNTING| null|
    | 7521| WARD| 10|ACCOUNTING| 500.0|
    | 7934|MILLER| 20| RESEARCH| null|
    | 7902| FORD| 20| RESEARCH| null|
    ...
    +-----+------+------+----------+------+

SQL与DF与DS的比较

小问题:spark.read.load() 这句代码没用指定读取格式,那么它的默认格式是什么?

现在我们需要对比的是SQL、DF、DS三者对Syntax Errors和Analysis Errors的不同程度的响应

在上一步中,我们将joinDF转化成了joinDS,现在我们就看看他们在选择需要的列的时候,做了什么样的执行计划

val selectDF: DataFrame = joinDF.select("ename")
val selectDS: Dataset[String] = joinDS.map(_.ename)

println(selectDF.queryExecution.optimizedPlan.numberedTreeString)
println("-------------")
println(selectDS.queryExecution.optimizedPlan.numberedTreeString)

很明显selectDS做出了优化

00 Project [ename#7]
01 +- Join Inner, (deptno#0 = deptno#13)
02 :- Project [deptno#0]
03 : +- Filter isnotnull(deptno#0)
04 : +- Relation[deptno#0,dname#1,level#2] JDBCRelation(tunan.dept) [numPartitions=1]
05 +- Project [ename#7, deptno#13]
06 +- Filter isnotnull(deptno#13)
07 +- HiveTableRelation `default`.`emp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [empno#6, ename#7, job#8, jno#9, date#10, sal#11, prize#12, deptno#13]

-------------

00 SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#47]
01 +- MapElements com.tunan.spark.sql.join.JdbcJoinHive$$$Lambda$1097/374205056@10f8e2d2, class com.tunan.spark.sql.join.JdbcJoinHive$EmpDept, [StructField(deptno,StringType,true), StructField(dname,StringType,true), StructField(level,StringType,true), StructField(empno,StringType,true), StructField(ename,StringType,true), StructField(job,StringType,true), StructField(jno,StringType,true), StructField(date,StringType,true), StructField(sal,DoubleType,false), StructField(prize,StringType,true)], obj#46: java.lang.String
02 +- DeserializeToObject newInstance(class com.tunan.spark.sql.join.JdbcJoinHive$EmpDept), obj#45: com.tunan.spark.sql.join.JdbcJoinHive$EmpDept
03 +- Project [deptno#0, dname#1, level#2, empno#6, ename#7, job#8, jno#9, date#10, sal#11, prize#12]
04 +- Join Inner, (deptno#0 = deptno#13)
05 :- Filter isnotnull(deptno#0)
06 : +- Relation[deptno#0,dname#1,level#2] JDBCRelation(tunan.dept) [numPartitions=1]
07 +- Filter isnotnull(deptno#13)
08 +- HiveTableRelation `default`.`emp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [empno#6, ename#7, job#8, jno#9, date#10, sal#11, prize#12, deptno#13]

由此我们可以根据在Spark SQL应用中,选择列的时候,SQL、DF、DS三者做一个比较

SQL DF DS
Syntax Errors runtime compile compile
Analysis Errors runtime runtime compile

在执行SQL的时候,无论是语法错误还是运行错误,都无法在编译时就提前暴露出来

在执行DF的时候,算子如果写错了,会提前暴露出来,但是写的列名只有在运行的时候才会检查是否正确

在执行DS的时候,由于case class反射的机制,算子和列名都可以提前到代码编写时就检测到错误

所以最优的执行顺序为 DS > DF > SQL

面试题:RDD、DS、DF的区别

  1. RDD不支持SQL
  2. DF每一行都是Row类型,不能直接访问字段,必须解析才行
  3. DS每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获 得每一行的信息
  4. DataFrame与Dataset均支持spark sql的操作,比如select,group by之类,还 能注册临时表/视窗,进行sql语句操作
  5. 可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要 写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题。

面试题:RDD和Dataset/DataFrame中的Persist的默认缓存级别

  • Dataset 中的缓存级别是 MEMORY_AND_DISK
  • RDD 中的缓存级别是 MEMORY_ONLY

面试题:Spark RDD和Spark SQL的的cache有什么区别

  • Spark RDD的cache是lazy的,需要action才会执行cache操作

  • Spark SQL的cache是egaer的,马上就cache了

Spark元数据管理: catalog

拿到catalog

val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getSimpleName).enableHiveSupport().getOrCreate()

val catalog: Catalog = spark.catalog
  1. 展示所有数据库

    val dbList: Dataset[Database] = catalog.listDatabases()
    dbList.show(false)
  2. 当前数据库

    catalog.currentDatabase
  3. 只展示名字

    import spark.implicits._
    dbList.map(_.name).show()
  4. 展示指定库的所有表

    catalog.setCurrentDatabase("offline_dw")
    catalog.listTables().show(false)
  5. 过滤表

    val listTable = catalog.listTables()
    listTable.filter('name === "dws_country_traffic").show(false)
  6. 判断某个表是否缓存

    catalog.isCached("dws_country_traffic")
    catalog.cacheTable("dws_country_traffic")
    catalog.isCached("dws_country_traffic")
    catalog.uncacheTable("dws_country_traffic")

    注意:catalog的cacheTable是lazy的

  7. 展示所有函数

    catalog.listFunctions().show(1000false)
  8. 注册函数,再次展示

    spark.udf.register("udf_string_length",(word:String) => {
    word.split(",").length
    })

    catalog.listFunctions().filter('name === "udf_string_length").show(false)
Author: Tunan
Link: http://yerias.github.io/2019/10/15/spark/15/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.