Spark术语&Spark提交&YARN上的提交模式&窄依赖&宽依赖

目录

  1. Spark术语
  2. Spark提交
  3. YARN上提交模式
  4. 宽依赖
  5. 窄依赖

术语

下表总结了关于集群概念的术语:

Term Meaning
Application Spark上的应用程序。由一个driver program和集群上的executors组成。
Application jar 一个包含用户的Spark应用程序的jar包
Driver program 运行应用程序main()函数并创建SparkContext的进程
Cluster manager 用于获取集群资源的外部服务(例如,standalone manager、Mesos、YARN)
Deploy mode 区别driver process在何处运行。在“cluster”模式下,框架启动集群内部的驱动程序。在“client”模式下,提交者启动集群外部的驱动程序。
Worker node 可以在集群中运行application的任何节点
Executor 在Worker node上被application启动的进程,它运行任务并将数据保存在内存或磁盘存储器中。每个application都有自己的Executor。
Task 将被发送给一个执行者的工作单元
Job 由多个任务组成的并行计算,这些任务在响应一个Spark操作时产生(如保存、收集)
Stage 每个作业被分成更小的任务集,称为阶段,这些阶段相互依赖(类似于MapReduce中的map和reduce阶段)

Spark提交

注意:在使用Spark提交之前,一定要在环境变量中配置HADOOP_CONF_DIR,否则hadoop的环境引不进来

export HADOOP_CONF_DIR=XXX

Spark支持的部署模式:

./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

一些常用的选项是:

  • --class:您的应用程序的入口点(例如org.apache.spark.examples.SparkPi
  • --master:集群的主URL(例如spark://23.195.26.187:7077
  • --deploy-mode:将驱动程序部署在工作节点(cluster)上还是作为外部客户端(client)本地部署(默认值:client
  • --conf:键值格式的任意Spark配置属性。对于包含空格的值,将“ key = value”用引号引起来(如图所示)。
  • application-jar:jar包的路径,包括您的应用程序和所有依赖项。URL必须在群集内部全局可见,例如,hdfs://路径或file://所有节点上都存在的路径。
  • application-arguments:参数传递给您的主类的main方法(如果有)

其他常用的选项:

  • --num-executors:executors的数量

  • --executor-memory:每个executor的内存数量

  • --total-executor-cores 100:executor的总的core数

  • --jars:指定需要依赖的jar包,多个jar包逗号分隔,application中直接引用

  • --files:需要依赖的文件,在application中使用SparkFiles.get(“file”)取出,同时需要放在resources目录下

注意:local模式默认读写HDFS数据 读本地要加file://

提交模式

cliet模式

  1. Driver运行在Client
  2. AM职责就是去YARN上申请资源
  3. Driver会和请求到的container/executor进行通信
  4. Driver是不能退出的

Client模式

Client模式控制台能看到日志

cluster模式

  1. Driver运行位置在AM

  2. Client提交上去了 它退出对整个作业没影响

  3. AM(申请资源)+Driver(调度DAG,分发任务)

    Cluster模式

控制台不能看到日志,不支持Spark-shell(Spark-SQL) ,交互性操作的都不能

窄依赖

  1. 一个父RDD的分区至多被一个子RDD的某个分区使用一次
  2. 一个父RDD的分区和一个子RDD的分区是唯一映射 典型的map
  3. 多个父RDD的分区和一个子RDD的分区是唯一映射 典型的union

窄依赖

在窄依赖中有个特殊的join是不经过shuffle 的

这个特殊的join的存在有三个条件:

  1. RDD1的分区数 = RDD2的分区数
  2. RDD1的分区数 = Join的分区数
  3. RDD2的分区数 = Join的分区数

我们看一个案例:

/**
* rdd1、rdd2、join 三者的分区数相同,不经过shuffle
*/
val rdd1 = sc.parallelize(List(("香蕉",20), ("苹果",50), ("菠萝",30), ("猕猴桃", 50)),2)
val rdd2 = sc.parallelize(List(("草莓",90), ("苹果",25), ("菠萝",25), ("猕猴桃", 30), ("西瓜", 45)),2)

val rdd3 = rdd1.reduceByKey(_ + _)
val rdd4 = rdd2.reduceByKey(_ + _)

val joinRDD = rdd3.join(rdd4,2)
joinRDD.collect()

再看Application的DAG图,从两个 reduceByKey 到 join 是一个 stage 中的,说明没有产生 shuffle

特殊的Jion

宽依赖

  1. 一个父RDD的分区会被子RDD的分区使用多次

宽依赖

除了前面那种是三个条件满足的,其他的 join 都是宽依赖

我们使RDD1的分区数和RDD2的分区数相等,但是 join的分区数不相等

/**
* rdd1、rdd2的分区数不同,但是和join的分区数不同,会经过shuffle
*/
val rdd1 = sc.parallelize(List(("香蕉",20), ("苹果",50), ("菠萝",30), ("猕猴桃", 50)),2)
val rdd2 = sc.parallelize(List(("草莓",90), ("苹果",25), ("菠萝",25), ("猕猴桃", 30), ("西瓜", 45)),2)

val rdd3 = rdd1.reduceByKey(_ + _)
val rdd4 = rdd2.reduceByKey(_ + _)

val joinRDD = rdd3.join(rdd4,4)
joinRDD.collect()

我们看DAG图,产生了stage,也就是经过了shuffle

特殊的Jion宽依赖

Author: Tunan
Link: http://yerias.github.io/2019/10/06/spark/6/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.