SparkSQL&DataFrame的read和write&SparkSQL做统计分析&UDF函数&存储格式的转换

目录

  1. SparkSQL
  2. DataFrame的read和write
  3. SparkSQL做统计分析
  4. UDF函数
  5. 存储格式的转换

SparkSQL

认识SparkSQL

  1. SparkSQL的进化之路

    1.0以前:
    Shark
    1.1.x开始:
    SparkSQL(只是测试性的) SQL
    1.3.x:
    SparkSQL(正式版本)+Dataframe
    1.5.x:
    SparkSQL 钨丝计划
    1.6.x:
    SparkSQL+DataFrame+DataSet(测试版本)
    2.x.x:
    SparkSQL+DataFrame+DataSet(正式版本)
    SparkSQL:还有其他的优化
    StructuredStreaming(DataSet)
  2. 什么是SparkSQL?

    spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。

  3. SparkSQL的作用

    提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎

    DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

  4. 运行原理

    将 Spark SQL 转化为 RDD, 然后提交到集群执行

  5. 特点

    1. 容易整合
    2. 统一的数据访问方式
    3. 兼容 Hive
    4. 标准的数据连接
  6. spark-sql

    spark-sql是一个Spark专属的SQL命令行交互工具,在使用spark-sql之前要把hive-site.xml 拷贝到Spark/Conf下,spark-sql和spark-shell用法一样,但是在引入外部依赖的时候,spark-sql需要用–jars和–driver-class-path同时引入依赖才不会报错

  7. 持久化

    在spark-sql中的持久化Table命令是: cache table xxx,清除持久化 uncache table xxx

    spark-SQL中的cache和uncache都是eager的,立即执行的

    考点:RDD和SparkSQL的cache有什么区别?

    • RDD中的cache是lazy的 spark-SQL中的cache是eager的
  8. 遗留问题

    –files/–jars 传进去的东西清不掉

SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。

SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

特点:

  1. 为用户提供一个统一的切入点使用Spark 各项功能

  2. 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

  3. 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互

  4. 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

DataFrame

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

RDD和DataFrame的存储内容比较

DataFrame的read和write

json

  1. 数据的读取[DataFrameReader]

    object rdd2df {
    def main(args: Array[String]): Unit = {
    val in = "tunan-spark-sql/data/people.json"
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
    // 读取json数据
    val df: DataFrame = spark.read.format("json").load(in)
    // 使用$"" 导入隐式转换
    import spark.implicits._
    // 可以使用UDF
    df.select($"name",$"age").show(2,false)
    // 不可以使用UDF 适合大部分场景
    df.select("name","age").show()
    // 不推介,写着复杂
    df.select(df("name"),df("age")).show(2)
    }
    }

    select方法用于选择要输出的列,推介使用 $”col” 和 “col” 的方法

    1. 使用select可以选取打印的列,空值为null
    2. show()默认打印20条数据,可以指定条数
    3. truncate默认为true,截取长度,可以设置为false

    select方法有三种不同的写法,fliter也有

    df.select(df("name"),df("age")).filter('name === "Andy").show()	//推介使用
    df.select(df("name"),df("age")).filter(df("name") === "Andy").show()
    df.select(df("name"),df("age")).filter("name = 'Andy'").show()

    printSchema()方法可以查看数据的Schema信息

    df.printSchema()
    ------------------------------------------------
    root
    |-- age: long (nullable = true)
    |-- name: string (nullable = true)
  2. 数据的存储[DataFrameWriter]

    val selectDf: DataFrame = df.select($"name", $"age")
    // 写出json数据
    selectDf.write.format("json").mode("overwrite").save(out)

    这里需要知道的一个概念是Save Modes

    Save操作可以选择使用SaveMode,它指定目标如果存在,如何处理现有数据。重要的是要认识到,这些保存模式不利用任何锁定,也不是原子性的。此外,在执行覆盖时,在写入新数据之前将删除数据。

    Scala/Java Any Language Meaning
    SaveMode.ErrorIfExists (default) "error" or "errorifexists" (default) 在将DataFrame保存到数据源时,如果数据已经存在,则会抛出error。
    SaveMode.Append "append" 在将DataFrame保存到数据源时,如果数据/表已经存在,则DataFrame的内容将被append到现有数据中。
    SaveMode.Overwrite "overwrite" overwrite模式意味着在将DataFrame保存到数据源时,如果数据/表已经存在,则现有数据将被DataFrame的内容覆盖。
    SaveMode.Ignore "ignore" ignore模式意味着在将DataFrame保存到数据源时,如果数据已经存在,则save操作不保存DataFrame的内容,也不更改现有数据。这类似于SQL中的CREATE TABLE IF NOT EXISTS

text

  1. 数据的读取

    object text2df {
    def main(args: Array[String]): Unit = {
    val in = "tunan-spark-sql/data/people.txt"
    val out = "tunan-spark-sql/out"
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

    import spark.implicits._

    //DataFrame不能直接split,且调用map返回的是一个Dataset
    val df: DataFrame = spark.read.format("text").load(in)
    val mapDF: Dataset[(String, String)] = df.map(row => {
    val words = row.toString().split(",")
    (words(0), words(1))
    })
    mapDF.show()

    //DataFrame转换为RDD后,再toDF,返回的是一个DataFrame
    val mapRDD2DF: DataFrame = df.rdd.map(row => {
    val words = row.toString().split(",")
    (words(0), words(1))
    }).toDF()
    mapRDD2DF.show()

    //使用textFile方法读取文本文件直接返回的是一个Dataset
    val ds: Dataset[String] = spark.read.textFile(in)
    val mapDs: Dataset[(String, String)] = ds.map(row => {
    val words = row.split(",")
    (words(0), words(1))
    })
    mapDs.show()
    }
    }

    文本数据读进来的一行在一个字段里面,所以要使用map算子,在map中split

    1. 直接read.format()读进来的是DataFrame,map中不能直接split
    2. DataFrame通过.rdd的方式转换成RDD,map中也不能直接split
    3. 通过read.textFile()的方式读进来的是Dataset,map中可以split
  2. 数据的存储

    val df: DataFrame = spark.read.format("text").load(in)
    val mapDF = df.map(row => {
    val words = row.toString().split(",")
    // 拼接成一列
    words(0) +","+words(1)
    })

    mapDF.write.format("text").mode("overwrite").save(out)

    文本数据写出去的时候

    1. 不支持int类型,如果存在int类型,会报错,解决办法是toString,转换成字符串
    2. 只能作为一列输出,如果是多列,会报错,解决办法是拼接起来,组成一列

    文本数据压缩输出,只要是Spark支持的压缩的格式,都可以指定

    mapDF.write
    .format("text")
    // 添加压缩操作
    .option("compression","gzip")
    .mode("overwrite")
    .save(out)

csv

  1. 数据的读取

    object csv2df {
    def main(args: Array[String]): Unit = {
    val in = "tunan-spark-sql/data/people.csv"
    val out = "tunan-spark-sql/out"

    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

    val df: DataFrame = spark.read
    .format("csv")
    .option("header", "true")
    .option("sep", ";")
    .option("interSchema","true")
    .load(in)
    df.show()
    }
    }

    csv读取数据注意使用几个参数

    1. 指定表头:option("header", "true")
    2. 指定分隔符:option("sep", ";")
    3. 类型自动推测:option("interSchema","true")

jdbc

在操作jdbc之前要导入两个依赖,一个是mysql-jdbc,用来连接mysql,一个是config,用来解决硬编码的问题

依赖:

<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>

application.conf文件

db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop/listener?characterEncoding=utf-8&useSSL=false"
db.default.user="root"
db.default.password="root"
db.default.source="dws_ad_phone_type_dist"
db.default.target="dws_ad_phone_type_dist_1"
db.default.db="access_dw"

# Connection Pool settings
db.default.poolInitialSize=10
db.default.poolMaxSize=20
db.default.connectionTimeoutMillis=1000
  1. 数据的读取

    object mysql2df {

    def main(args: Array[String]): Unit = {
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

    //获取配置文件中的值,db.default开头
    val conf = ConfigFactory.load()
    val driver = conf.getString("db.default.driver")
    val url = conf.getString("db.default.url")
    val user = conf.getString("db.default.user")
    val password = conf.getString("db.default.password")
    val source = conf.getString("db.default.source")
    val target = conf.getString("db.default.target")
    val db = conf.getString("db.default.db")

    //读取数据库的内容
    val df: DataFrame = spark.read
    .format("jdbc")
    .option("url", url)
    .option("dbtable", s"$db.$source") //库名.源表
    .option("user", user)
    .option("password", password)
    .option("driver", driver)
    .load()
    //使用DataFrame创建临时表提供spark.sql查询
    df.createOrReplaceTempView("phone_type_dist")

    //spark.sql写SQL返回一个DataFrame
    val sqlDF: DataFrame = spark.sql("select * from phone_type_dist where phoneSystemType = 'IOS'")
    }
    }
    • 使用df.createOrReplaceTempView()方法创建一个DataFrame数据生成的临时表,提供spark.sql()使用SQL操作数据,返回的也是一个DataFrame
  2. 数据的存储

    //接着上面返回的sqlDF: DataFrame
    sqlDF.write
    .format("jdbc")
    .option("url", url)
    .option("dbtable", s"$db.$target") //库名.目标表
    .option("user", user)
    .option("password", password)
    .option("driver",driver)
    .save()

SparkSQL做统计分析

  1. 数据

  2. 需求:求每个国家的每个域名的访问流量排名前2

  3. SQL实现

    object GroupTopN {

    def main(args: Array[String]): Unit = {
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

    //读取数据
    val ds = spark.read.textFile(in)

    import spark.implicits._
    //为生成需要的表格做准备
    val df: DataFrame = ds.map(row => {
    val words = row.split(",")
    (words(3), words(12), words(15).toLong)
    }).toDF("country", "domain", "traffic")

    df.createOrReplaceTempView("access")

    // 每个国家的域名流量前2
    val topNSQL="""select
    | *
    |from (
    | select
    | t.*,row_number() over(partition by country order by sum_traffic desc) r
    | from
    | (
    | select country,domain,sum(traffic) as sum_traffic from access group by country,domain
    | ) t
    | ) rt
    |where rt.r <=2 """.stripMargin

    spark.sql(topNSQL).show()
    }
    }
  4. 如果只要求traffic的降序,可以使用API直接写出来

    分组,求和,别名,降序

    //traffic降序排序
    import org.apache.spark.sql.functions._
    df.groupBy("country","domain").agg(sum("traffic").as("sum_traffic")).sort($"sum_traffic".desc).show()

    注意看源码中案例仿写

  5. 结果展示

    +----------------+-----------------+-----------+---+
    | country| domain|sum_traffic| r|
    +----------------+-----------------+-----------+---+
    | 中国| www.bilibili.com| 24265886| 1|
    | 中国|www.ruozedata.com| 4187637| 2|
    | 利比亚| www.bilibili.com| 22816| 1|
    | 利比亚| ruoze.ke.qq.com| 15970| 2|
    | 加纳| www.bilibili.com| 138659| 1|
    | 加纳|www.ruozedata.com| 17988| 2|
    | 利比里亚| www.bilibili.com| 20593| 1|
    | 利比里亚| ruoze.ke.qq.com| 7466| 2|
    +----------------+-----------------+-----------+---+

UDF函数

  1. 数据

    大狗	小破车,渣团,热刺,我纯
    桶子 利物浦
    二娃 南大王,西班牙人
  2. 需求:求出每个人的爱好的个数

  3. SQLs实现

    object LoveLength {

    def main(args: Array[String]): Unit = {

    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

    //读取文本内容
    val ds: Dataset[String] = spark.read.textFile(in)

    //文本转换成DF
    import spark.implicits._
    val df: DataFrame = ds.map(row => {
    val words = row.split("\t")
    (words(0), words(1))
    }).toDF("name", "love")

    //创建UDF
    spark.udf.register("length", (love: String) => {
    love.split(",").length
    })

    //DF创建临时表
    df.createOrReplaceTempView("udf_love")

    //在sql中使用UDF函数
    spark.sql("select name,love,length(love) as love_length from udf_love").show()
    }
    }
  4. 上面是使用SQL的解决方案,还可以使用API的方法

    //自定义的udf需要返回值
    val loveLengthUDF: UserDefinedFunction = spark.udf.register("length", (love: String) => {
    love.split(",").length
    })
    //df.select中传入UDF函数
    df.select($"name",$"love",loveLengthUDF($"love")).show()
  5. 结果展示

    +----+---------------------+----------------+
    |大狗|小破车,渣团,热刺,我纯 | 4|
    |桶子| 利物浦 | 1|
    |二娃| 南大王,西班牙人 | 2|
    +----+---------------------+----------------+

存储格式的转换

Spark读text文件进行清洗,清洗完以后直接以我们想要的列式存储格式输出,如果按以前的方式要经过很多复杂的步骤

用Spark的时候只需要在df.write.format("orc").mode().save()中指定格式即可,如orc,现在就很方便了,想转成什么格式,只要format支持就ok

object text2orc {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.config("spark.some.config.option", "some-value")
.getOrCreate()

val in = "tunan-spark-sql/data/people.txt"
val out = "tunan-spark-sql/out"

val df = spark.read.textFile(in)

import spark.implicits._

//对文本文件做处理
df.map(row => {
val words = row.split(",")
(words(0),words(1))
})
.toDF("name","age") //这一步解决了数据没有表头的问题
.write
.mode("overwrite") //save mode
.format("orc") //save format
.save(out) //save path
}
}

20200416更新: df.write.format(“…”).option(“compression”,”…”) ==> 存储格式+压缩格式

Author: Tunan
Link: http://yerias.github.io/2019/10/11/spark/11/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.