数据列自动推导&数据错误执行模式&UDAF&UDTF&解读Spark SQL执行计划优化

目录

  1. 数据列自动推导
  2. 数据错误执行模式
  3. UDAF
  4. UDTF
  5. 解读Spark SQL执行计划优化

数据列自动推导

  1. 源数据

    a|b|c
    1|2|3
    4|tunan|6
    7|8|9.0
  2. 代码处理

    def main(args: Array[String]): Unit = {
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .getOrCreate()

    val csvDF: DataFrame = spark.read
    .format("csv")
    .option("header","true")
    .option("sep","|")
    .option("interSchema","true")
    .load("tunan-spark-sql/data/test.csv")

    csvDF.printSchema()
    }
  3. 打印数据Schema信息

    root
    |-- a: integer (nullable = true)
    |-- b: string (nullable = true)
    |-- c: double (nullable = true)

数据错误执行模式

在Spark中,读取数据时,遇到错误数据或者脏数据时,我们可以使用option设置mode,区分将错误数据是默认处理PERMISSIVE,还是丢弃数据DROPMALFORMED,还是快速失败FAILFAST,这些方法可以在ParseMode.scala

  1. 源数据

    {"a":1,"b":2,"c":3}
    {"a":4,:5,"c":6}
    {"a":7,"b":8,"c":9}
  2. 读数据

    val jsonDF: DataFrame = spark.read.json("tunan-spark-sql/data/test.json")
    jsonDF.show()

    结果:

    +----------------+----+----+----+
    | _corrupt_record| a| b| c|
    +----------------+----+----+----+
    | null| 1| 2| 3|
    |{"a":4,:5,"c":6}|null|null|null|
    | null| 7| 8| 9|
    +----------------+----+----+----+

    如果没有在option中设置mode选项,默认为PERMISSIVE,通过_corrupt_record列打印出错误信息

  3. 使用option设置mode为DROPMALFORMED,如果碰到错误的数据,则自动丢弃

    val jsonDF: DataFrame = spark.read.option("mode","DROPMALFORMED").json("tunan-spark-sql/data/test.json")
    jsonDF.show()

    结果:

    +---+---+---+
    | a| b| c|
    +---+---+---+
    | 1| 2| 3|
    | 7| 8| 9|
    +---+---+---+

UDAF

  1. 自定义一个UDAF的class或者object,作为具体的逻辑实现,需要继承UserDefinedAggregateFunction

    object AgeAvgUDAF extends UserDefinedAggregateFunction{
    //输入类型
    override def inputSchema: StructType = StructType(
    StructField("input",DoubleType,true)::Nil
    )
    //聚合内部中的buffer类型
    override def bufferSchema: StructType = StructType(
    StructField("sums",DoubleType,true)::
    StructField("num",LongType,true)::Nil
    )

    //输入数据类型
    override def dataType: DataType = DoubleType

    //输入数据类型是否和输出数据类型相等
    override def deterministic: Boolean = true

    //聚合内部buffer的初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0.0
    buffer(1) = 0L
    }

    //分区内更新聚合buffer
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer.update(0,buffer.getDouble(0)+input.getDouble(0))
    buffer.update(1,buffer.getLong(1)+1)
    }

    //分区间合并
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0,buffer1.getDouble(0)+buffer2.getDouble(0))
    buffer1.update(1,buffer1.getLong(1)+buffer2.getLong(1))
    }

    //最终计算
    override def evaluate(buffer: Row): Any = {
    buffer.getDouble(0)/buffer.getLong(1)
    }
    }
  2. 注册并使用UDAF

     def main(args: Array[String]): Unit = {
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .getOrCreate()

    // 自定义数据源
    val list = new util.ArrayList[Row]()
    list.add(Row("zhangsan",18,"男"))
    list.add(Row("lisi",20,"男"))
    list.add(Row("wangwu",26,"女"))
    list.add(Row("翠翠",18,"女"))
    list.add(Row("闰土",8,"男"))

    // 自定义Schema
    val schema = StructType(
    StructField("name", StringType, true)::
    StructField("age", IntegerType, true)::
    StructField("sex", StringType, true)::Nil
    )

    //创建df
    val df = spark.createDataFrame(list, schema)

    //创建视图
    df.createOrReplaceTempView("people")

    //注册UDAF
    spark.udf.register("age_avg_udaf",AgeAvgUDAF)

    //使用UDAF
    spark.sql("select sex,age_avg_udaf(age) as ave_age from people group by sex").show()
    }
  3. 结果展示

    +---+---------+
    |sex| ave_age|
    +---+---------+
    | 男| 15.33|
    | 女| 22.0|
    +---+---------+

UDTF

UDTF还在研究,先搞个简单的案例

object ExplodeUDTF {

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.getOrCreate()

// 自定义schema
val schema = StructType(
StructField("teacher", StringType, true) ::
StructField("sources", StringType, true) :: Nil
)
// 自定义数据源
val list = new util.ArrayList[Row]()
list.add(Row("tunan", "hive,spark,flink"))
list.add(Row("xiaoqi", "cdh,kafka,hbase"))

// 创建临时视图
val df = spark.createDataFrame(list, schema)

import spark.implicits._
// 使用flatMap拆分
df.flatMap(x => {
val line = new ListBuffer[(String, String)]()
val sources = x.getString(1).split(",")
for (source <- sources){
line.append((x.getString(0),source))
}
//返回
line
}).toDF("teacher","source").show()
}
}

结果

+-------+------+
|teacher|source|
+-------+------+
| tunan| hive|
| tunan| spark|
| tunan| flink|
| xiaoqi| cdh|
| xiaoqi| kafka|
| xiaoqi| hbase|
+-------+------+

解读Spark SQL执行计划优化

  1. 建空表

    create table sqltest (key string,value string)
  2. 执行SQL

    explain extended select a.key*(3*5),b.value from sqltest a join sqltest b on a.key=b.key and a.key >3;
  3. 解读执行计划

    // 解析逻辑计划,做些简单的解析
    == Parsed Logical Plan ==
    'Project [unresolvedalias(('a.key * (3 * 5)), None), 'b.value]
    +- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3))
    :- 'SubqueryAlias `a`
    : +- 'UnresolvedRelation `sqltest`
    +- 'SubqueryAlias `b`
    +- 'UnresolvedRelation `sqltest`

    // 分析逻辑计划,解析出了数据类型,拿到数据库和表,拿到了序列化方式
    == Analyzed Logical Plan ==
    (CAST(key AS DOUBLE) * CAST((3 * 5) AS DOUBLE)): double, value: string
    Project [(cast(key#2 as double) * cast((3 * 5) as double)) AS (CAST(key AS DOUBLE) * CAST((3 * 5) AS DOUBLE))#6, value#5]
    +- Join Inner, ((key#2 = key#4) && (cast(key#2 as int) > 3))
    :- SubqueryAlias `a`
    : +- SubqueryAlias `default`.`sqltest`
    : +- HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#2, value#3]
    +- SubqueryAlias `b`
    +- SubqueryAlias `default`.`sqltest`
    +- HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#4, value#5]

    // 优化逻辑计划,数值类型的运算直接拿到结果,解析过滤条件
    == Optimized Logical Plan ==
    Project [(cast(key#2 as double) * 15.0) AS (CAST(key AS DOUBLE) * CAST((3 * 5) AS DOUBLE))#6, value#5]
    +- Join Inner, (key#2 = key#4)
    :- Project [key#2]
    : +- Filter (isnotnull(key#2) && (cast(key#2 as int) > 3))
    : +- HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#2, value#3]
    +- Filter ((cast(key#4 as int) > 3) && isnotnull(key#4))
    +- HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#4, value#5]

    //物理计划,join方式为SortMergeJoin,数据使用hashpartitioning保存,扫描表的方式是HiveTableRelation
    == Physical Plan ==
    *(5) Project [(cast(key#2 as double) * 15.0) AS (CAST(key AS DOUBLE) * CAST((3 * 5) AS DOUBLE))#6, value#5]
    +- *(5) SortMergeJoin [key#2], [key#4], Inner
    :- *(2) Sort [key#2 ASC NULLS FIRST], false, 0
    : +- Exchange hashpartitioning(key#2, 200)
    : +- *(1) Filter (isnotnull(key#2) && (cast(key#2 as int) > 3))
    : +- Scan hive default.sqltest [key#2], HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#2, value#3]
    +- *(4) Sort [key#4 ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(key#4, 200)
    +- *(3) Filter ((cast(key#4 as int) > 3) && isnotnull(key#4))
    +- Scan hive default.sqltest [key#4, value#5], HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#4, value#5]

    可以简单的看做四步,分别是解析逻辑计划、分析逻辑计划、优化逻辑计划、物理执行计划

Author: Tunan
Link: http://yerias.github.io/2019/10/17/spark/17/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.