Spark JDBC谓词下推

探索

前段时间,我必须从 mysql 表中读取数据,对这些数据进行一些操作,并将结果存储在磁盘上。

显而易见的选择是使用 spark,因为我已经在其他东西上使用它,它似乎超级容易实现。

这或多或少是我必须做的事情 (为了简单起见,我删除了做操作的部分):

spark.read.format("jdbc").
option("url""jdbc:mysql://dbhost/sbschhema").
option("dbtable""mytable").
option("user""myuser").
option("password""mypassword").
load().write.parquet("/data/out")

看起来不错,但是没有完全奏效。要么是超级慢,要么是根据table的大小完全打满。

调整 spark 和集群属性有一点帮助,但并没有解决问题。

由于我使用的是 aws emr,sqoop是 emr 支持的应用程序的一部分,因此我的尝试是有意义的。

sqoop import 
--verbose
--connect jdbc:mysql://dbhost/sbschhema
--username myuser
--table opportunity
--password mypassword
--m 20
--as-parquetfile
--target-dir /data/out

sqoop 几乎立刻就表现得很好,你所需要做的就是根据数据的大小设置map的数量,它的工作非常完美。

由于 spark 和 sqoop 都基于 hadoop mapreduce框架,因此 spark 显然可以至少和 sqoop 一样好工作,因此我只需要了解如何做到这一点。我决定仔细看看spark做了什么,看看能不能用 spark 模仿。

通过打开 sqoop 的详细标志,您可以获得更多详细信息。我发现,sqoop 正在将输入拆分到不同的map,这是有意义的,这毕竟是减少数据量,spark 做同样的事情。但在这样做之前,sqoop 做了一些spark没有做的聪明的事情。

它首先获取主键 (除非您为他提供另一个键来拆分数据),然后检查其最小值和最大值。然后,它允许其每个map查询数据,但键具有不同的边界,以便在map之间平均拆分行。

例如,如果键最大值为 100,并且有5个map,则第一个map的查询将如下所示:

SELECT * FROM mytable WHERE mykey >= 1 AND mykey <= 20;

第二个map的查询将如下所示:

SELECT * FROM mytable WHERE mykey >= 21 AND mykey <= 40;

这完全有道理。spark 无法正常工作,因为它不知道如何在map之间拆分数据。

因此,现在是用 spark 实现同样逻辑的时候了。这意味着我必须对我的代码执行这些操作,才能使 spark 正常工作。

  1. 获取表的主键。
  2. 查找键的最小值和最大值。
  3. 使用这些值执行 spark。

这是我最后得到的代码:

spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://aliyun:3306")
.option("dbtable", "(select avg(scores), class from db.test_table where id > 0 and id < 100 group by class) as result")
.option("user", "power")
.option("password", "mee")
.option("fetchsize", "30")
.load()
.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)}

效果非常好。

言论:

  1. numPartitions 我为 spark 设置的只是一个值,我发现根据行数给出了很好的结果。这可以更改,因为数据的大小也会受到当然的列大小和数据类型的影响。
  2. 最后的重新分区操作是为了避免有小文件。

配置

spark jdbc 读取数据的时候,需要重点掌握两个配置:

  • dbtable

    读取或写入的 JDBC 表。请注意,在读取路径中使用它时,可以使用在 FROM SQL 查询子句中有效的任何东西。例如,除了完整表之外,您还可以在括号中使用子查询。不允许同时指定dbtablequery选项。

    (select c1, c2 from t1) as t
  • query

    用于将数据读入Spark的查询。指定的查询将加括号,并在FROM子句中用作子查询。Spark还将为subquery子句分配一个别名。例如,spark将向 JDBC 源发出以下形式的查询。

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

使用此选项时,有以下两个限制。

  1. 不允许同时指定dbtablequery选项。不允许同时指定querypartitionColumn选项。

  2. partitionColumn需要指定时,可以使用dbtable代替指定子查询,并且可以使用作为一部分提供的子查询别名来限定分区列。 例子:

    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .load()

注意:dbtable可以指定分区字段、分区数量、以及分区字段的最大最小值,query不可以指定,除此之外query会默认指定别名,在写sql时要同时指定库名和表名。

聚合下推

通过源码可以知道,目前Spark JDBC Datasource在拉取数据形成JDBCRDD时,只把查询字段列表,表名和Filter下推到了数据库一侧:

如果我的sql还有聚合查询,那么聚合查询其实是在Spark端执行的。即先经过过滤把所需字段的所有数据抽取出来形成RDD,在RDD上再执行聚合操作。那能不能把全部SQL都下推到数据库侧呢?

答案是可以的。看代码逻辑,只要将table变量构成一个子句即可,子句的逻辑就是我要查询的sql逻辑,比如select avg(scores), class from db.test_table where id > 0 and id < 100 group by class,将其构造成一个子句就是:(select avg(scores), class from db.test_table where id > 0 and id < 100 group by class) as result。将该子句带入dbtable 配置中就可以实现SQL逻辑的全部下推。这样spark就可以仅仅作为一个proxy存在。

option("dbtable", "(select avg(scores), class from db.test_table where id > 0 and id < 100 group by class) as result")

注意:子句中表名前必须加带库名。

Author: Tunan
Link: http://yerias.github.io/2021/06/21/spark/43/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.