探索
前段时间,我必须从 mysql 表中读取数据,对这些数据进行一些操作,并将结果存储在磁盘上。
显而易见的选择是使用 spark,因为我已经在其他东西上使用它,它似乎超级容易实现。
这或多或少是我必须做的事情 (为了简单起见,我删除了做操作的部分):
spark.read.format("jdbc"). |
看起来不错,但是没有完全奏效。要么是超级慢,要么是根据table的大小完全打满。
调整 spark 和集群属性有一点帮助,但并没有解决问题。
由于我使用的是 aws emr,sqoop是 emr 支持的应用程序的一部分,因此我的尝试是有意义的。
sqoop import |
sqoop 几乎立刻就表现得很好,你所需要做的就是根据数据的大小设置map的数量,它的工作非常完美。
由于 spark 和 sqoop 都基于 hadoop mapreduce框架,因此 spark 显然可以至少和 sqoop 一样好工作,因此我只需要了解如何做到这一点。我决定仔细看看spark做了什么,看看能不能用 spark 模仿。
通过打开 sqoop 的详细标志,您可以获得更多详细信息。我发现,sqoop 正在将输入拆分到不同的map,这是有意义的,这毕竟是减少数据量,spark 做同样的事情。但在这样做之前,sqoop 做了一些spark没有做的聪明的事情。
它首先获取主键 (除非您为他提供另一个键来拆分数据),然后检查其最小值和最大值。然后,它允许其每个map查询数据,但键具有不同的边界,以便在map之间平均拆分行。
例如,如果键最大值为 100,并且有5个map,则第一个map的查询将如下所示:
SELECT * FROM mytable WHERE mykey >= 1 AND mykey <= 20; |
第二个map的查询将如下所示:
SELECT * FROM mytable WHERE mykey >= 21 AND mykey <= 40; |
这完全有道理。spark 无法正常工作,因为它不知道如何在map之间拆分数据。
因此,现在是用 spark 实现同样逻辑的时候了。这意味着我必须对我的代码执行这些操作,才能使 spark 正常工作。
- 获取表的主键。
- 查找键的最小值和最大值。
- 使用这些值执行 spark。
这是我最后得到的代码:
spark.read |
效果非常好。
言论:
numPartitions
我为 spark 设置的只是一个值,我发现根据行数给出了很好的结果。这可以更改,因为数据的大小也会受到当然的列大小和数据类型的影响。- 最后的重新分区操作是为了避免有小文件。
配置
spark jdbc 读取数据的时候,需要重点掌握两个配置:
dbtable
读取或写入的 JDBC 表。请注意,在读取路径中使用它时,可以使用在 FROM SQL 查询子句中有效的任何东西。例如,除了完整表之外,您还可以在括号中使用子查询。不允许同时指定
dbtable
和query
选项。(select c1, c2 from t1) as t
query
用于将数据读入Spark的查询。指定的查询将加括号,并在
FROM
子句中用作子查询。Spark还将为subquery子句分配一个别名。例如,spark将向 JDBC 源发出以下形式的查询。
SELECT <columns> FROM (<user_specified_query>) spark_gen_alias
使用此选项时,有以下两个限制。
不允许同时指定
dbtable
和query
选项。不允许同时指定query
和partitionColumn
选项。当
partitionColumn
需要指定时,可以使用dbtable
代替指定子查询,并且可以使用作为一部分提供的子查询别名来限定分区列。 例子:spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()
注意:dbtable可以指定分区字段、分区数量、以及分区字段的最大最小值,query不可以指定,除此之外query会默认指定别名,在写sql时要同时指定库名和表名。
聚合下推
通过源码可以知道,目前Spark JDBC Datasource在拉取数据形成JDBCRDD时,只把查询字段列表,表名和Filter下推到了数据库一侧:
如果我的sql还有聚合查询,那么聚合查询其实是在Spark端执行的。即先经过过滤把所需字段的所有数据抽取出来形成RDD,在RDD上再执行聚合操作。那能不能把全部SQL都下推到数据库侧呢?
答案是可以的。看代码逻辑,只要将table变量构成一个子句即可,子句的逻辑就是我要查询的sql逻辑,比如select avg(scores), class from db.test_table where id > 0 and id < 100 group by class
,将其构造成一个子句就是:(select avg(scores), class from db.test_table where id > 0 and id < 100 group by class) as result
。将该子句带入dbtable 配置中就可以实现SQL逻辑的全部下推。这样spark就可以仅仅作为一个proxy存在。
option("dbtable", "(select avg(scores), class from db.test_table where id > 0 and id < 100 group by class) as result") |
注意:子句中表名前必须加带库名。