目录
- 数据列自动推导
- 数据错误执行模式
- UDAF
- UDTF
- 解读Spark SQL执行计划优化
数据列自动推导
源数据
a|b|c
1|2|3
4|tunan|6
7|8|9.0代码处理
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
val csvDF: DataFrame = spark.read
.format("csv")
.option("header","true")
.option("sep","|")
.option("interSchema","true")
.load("tunan-spark-sql/data/test.csv")
csvDF.printSchema()
}打印数据Schema信息
root
|-- a: integer (nullable = true)
|-- b: string (nullable = true)
|-- c: double (nullable = true)
数据错误执行模式
在Spark中,读取数据时,遇到错误数据或者脏数据时,我们可以使用option设置mode,区分将错误数据是默认处理PERMISSIVE
,还是丢弃数据DROPMALFORMED
,还是快速失败FAILFAST
,这些方法可以在ParseMode.scala
源数据
{"a":1,"b":2,"c":3}
{"a":4,:5,"c":6}
{"a":7,"b":8,"c":9}读数据
val jsonDF: DataFrame = spark.read.json("tunan-spark-sql/data/test.json")
jsonDF.show()结果:
+----------------+----+----+----+
| _corrupt_record| a| b| c|
+----------------+----+----+----+
| null| 1| 2| 3|
|{"a":4,:5,"c":6}|null|null|null|
| null| 7| 8| 9|
+----------------+----+----+----+如果没有在option中设置mode选项,默认为
PERMISSIVE
,通过_corrupt_record列打印出错误信息使用option设置mode为
DROPMALFORMED
,如果碰到错误的数据,则自动丢弃val jsonDF: DataFrame = spark.read.option("mode","DROPMALFORMED").json("tunan-spark-sql/data/test.json")
jsonDF.show()结果:
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
| 7| 8| 9|
+---+---+---+
UDAF
自定义一个UDAF的class或者object,作为具体的逻辑实现,需要继承
UserDefinedAggregateFunction
object AgeAvgUDAF extends UserDefinedAggregateFunction{
//输入类型
override def inputSchema: StructType = StructType(
StructField("input",DoubleType,true)::Nil
)
//聚合内部中的buffer类型
override def bufferSchema: StructType = StructType(
StructField("sums",DoubleType,true)::
StructField("num",LongType,true)::Nil
)
//输入数据类型
override def dataType: DataType = DoubleType
//输入数据类型是否和输出数据类型相等
override def deterministic: Boolean = true
//聚合内部buffer的初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0.0
buffer(1) = 0L
}
//分区内更新聚合buffer
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer.update(0,buffer.getDouble(0)+input.getDouble(0))
buffer.update(1,buffer.getLong(1)+1)
}
//分区间合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0,buffer1.getDouble(0)+buffer2.getDouble(0))
buffer1.update(1,buffer1.getLong(1)+buffer2.getLong(1))
}
//最终计算
override def evaluate(buffer: Row): Any = {
buffer.getDouble(0)/buffer.getLong(1)
}
}注册并使用UDAF
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
// 自定义数据源
val list = new util.ArrayList[Row]()
list.add(Row("zhangsan",18,"男"))
list.add(Row("lisi",20,"男"))
list.add(Row("wangwu",26,"女"))
list.add(Row("翠翠",18,"女"))
list.add(Row("闰土",8,"男"))
// 自定义Schema
val schema = StructType(
StructField("name", StringType, true)::
StructField("age", IntegerType, true)::
StructField("sex", StringType, true)::Nil
)
//创建df
val df = spark.createDataFrame(list, schema)
//创建视图
df.createOrReplaceTempView("people")
//注册UDAF
spark.udf.register("age_avg_udaf",AgeAvgUDAF)
//使用UDAF
spark.sql("select sex,age_avg_udaf(age) as ave_age from people group by sex").show()
}结果展示
+---+---------+
|sex| ave_age|
+---+---------+
| 男| 15.33|
| 女| 22.0|
+---+---------+
UDTF
UDTF还在研究,先搞个简单的案例
object ExplodeUDTF { |
结果
+-------+------+ |
解读Spark SQL执行计划优化
建空表
create table sqltest (key string,value string)
执行SQL
explain extended select a.key*(3*5),b.value from sqltest a join sqltest b on a.key=b.key and a.key >3;
解读执行计划
// 解析逻辑计划,做些简单的解析
== Parsed Logical Plan ==
'Project [unresolvedalias(('a.key * (3 * 5)), None), 'b.value]
+- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3))
:- 'SubqueryAlias `a`
: +- 'UnresolvedRelation `sqltest`
+- 'SubqueryAlias `b`
+- 'UnresolvedRelation `sqltest`
// 分析逻辑计划,解析出了数据类型,拿到数据库和表,拿到了序列化方式
== Analyzed Logical Plan ==
(CAST(key AS DOUBLE) * CAST((3 * 5) AS DOUBLE)): double, value: string
Project [(cast(key#2 as double) * cast((3 * 5) as double)) AS (CAST(key AS DOUBLE) * CAST((3 * 5) AS DOUBLE))#6, value#5]
+- Join Inner, ((key#2 = key#4) && (cast(key#2 as int) > 3))
:- SubqueryAlias `a`
: +- SubqueryAlias `default`.`sqltest`
: +- HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#2, value#3]
+- SubqueryAlias `b`
+- SubqueryAlias `default`.`sqltest`
+- HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#4, value#5]
// 优化逻辑计划,数值类型的运算直接拿到结果,解析过滤条件
== Optimized Logical Plan ==
Project [(cast(key#2 as double) * 15.0) AS (CAST(key AS DOUBLE) * CAST((3 * 5) AS DOUBLE))#6, value#5]
+- Join Inner, (key#2 = key#4)
:- Project [key#2]
: +- Filter (isnotnull(key#2) && (cast(key#2 as int) > 3))
: +- HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#2, value#3]
+- Filter ((cast(key#4 as int) > 3) && isnotnull(key#4))
+- HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#4, value#5]
//物理计划,join方式为SortMergeJoin,数据使用hashpartitioning保存,扫描表的方式是HiveTableRelation
== Physical Plan ==
*(5) Project [(cast(key#2 as double) * 15.0) AS (CAST(key AS DOUBLE) * CAST((3 * 5) AS DOUBLE))#6, value#5]
+- *(5) SortMergeJoin [key#2], [key#4], Inner
:- *(2) Sort [key#2 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#2, 200)
: +- *(1) Filter (isnotnull(key#2) && (cast(key#2 as int) > 3))
: +- Scan hive default.sqltest [key#2], HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#2, value#3]
+- *(4) Sort [key#4 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#4, 200)
+- *(3) Filter ((cast(key#4 as int) > 3) && isnotnull(key#4))
+- Scan hive default.sqltest [key#4, value#5], HiveTableRelation `default`.`sqltest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#4, value#5]可以简单的看做四步,分别是解析逻辑计划、分析逻辑计划、优化逻辑计划、物理执行计划