该篇文章主要分析一下Spark源码中启动spark-shell脚本的处理逻辑,从spark-shell一步步深入进去看看任务提交的整体流程
spark-shell脚本解读
初始化cygwin=false
cygwin=false
检查你的系统是否属于cygwin
case "$(uname)" in
CYGWIN*) cygwin=true;;
esac
设置shell的模式为POSIX标准模式
set -o posix
检测是否设置过SPARK_HOME环境变量
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
export _SPARK_CMD_USAGE="Usage: ./bin/spark-shell [options]
Scala REPL options:
-I <file> preload <file>, enforcing line-by-line interpretation"
scala默认不会使用java classpath,需要手动设置一下让scala使用java
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"
function main() {
判断是否是cygwin
if $cygwin; then
# 关闭echo回显,设置读操作最少1个字符
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
# 启动spark-submit 执行org.apache.spark.repl.Main类,并设置应用的名字,传递参数
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
# 开启echo回显
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
# 启动spark-submit 执行org.apache.spark.repl.Main类,并设置应用的名字,传递参数
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
fi
}
exit_status=127
saved_stty=""
restore stty settings (echo in particular)
function restoreSttySettings() {
stty $saved_stty
saved_stty=""
}
判断是否恢复终端设置
function onExit() {
if [[ "$saved_stty" != "" ]]; then
restoreSttySettings
fi
exit $exit_status
}
捕获INT中断信号,然就执行onExit方法
trap onExit INT
保存了当前的终端配置
saved_stty=$(stty -g 2>/dev/null)
如果收到退出命令,就恢复stty状态
if [[ ! $? ]]; then
saved_stty=""
fi
调用main方法,并传递所有的参数
main "$@"
exit_status=$?
onExit上面启动了spark-submit,接下来我们解读该脚本
检查是否设置了${SPARK_HOME}
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
在Python 3.3+中禁用字符串的随机哈希
export PYTHONHASHSEED=0
启动spark-class,并传递了org.apache.spark.deploy.SparkSubmit作为第一个参数,然后把前面Spark-shell的参数都传给spark-class
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"在spark-submit中又启动了spark-class,继续解读
检查/设置SPARK_HOME
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
配置一些环境变量
. "${SPARK_HOME}"/bin/load-spark-env.sh
找到bin/java,并赋值给RUNNER变量
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
拿到Spark的Jar包
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
如果需要,将启动程序构建目录添加到类路径。
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
测试
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
封装了真正的执行的spark的类
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
关闭posix模式,因为它不允许进程替换
set +o posix
CMD=()
首先循环读取ARG参数,加入到CMD中
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
执行Spark的类
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
可以看到虽然是执行了spark-shell,但是最终执行的是org.apache.spark.launcher.Main
类,也就是说spark-shell的最底层是使用java来启动的
他们的执行流程大致如下
spark-shell{ |