Spark源码之解读spark-shell脚本

该篇文章主要分析一下Spark源码中启动spark-shell脚本的处理逻辑,从spark-shell一步步深入进去看看任务提交的整体流程

  1. spark-shell脚本解读

    # 初始化cygwin=false
    cygwin=false

    # 检查你的系统是否属于cygwin
    case "$(uname)" in
    CYGWIN*) cygwin=true;;
    esac

    # 设置shell的模式为POSIX标准模式
    set -o posix

    # 检测是否设置过SPARK_HOME环境变量
    if [ -z "${SPARK_HOME}" ]; then
    source "$(dirname "$0")"/find-spark-home
    fi

    export _SPARK_CMD_USAGE="Usage: ./bin/spark-shell [options]
    Scala REPL options:
    -I <file> preload <file>, enforcing line-by-line interpretation"

    #scala默认不会使用java classpath,需要手动设置一下让scala使用java
    SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

    function main() {
    # 判断是否是cygwin
    if $cygwin; then
    # 关闭echo回显,设置读操作最少1个字符
    stty -icanon min 1 -echo > /dev/null 2>&1
    export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
    # 启动spark-submit 执行org.apache.spark.repl.Main类,并设置应用的名字,传递参数
    "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
    # 开启echo回显
    stty icanon echo > /dev/null 2>&1
    else
    export SPARK_SUBMIT_OPTS
    # 启动spark-submit 执行org.apache.spark.repl.Main类,并设置应用的名字,传递参数
    "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
    fi
    }

    exit_status=127
    saved_stty=""

    # restore stty settings (echo in particular)
    function restoreSttySettings() {
    stty $saved_stty
    saved_stty=""
    }

    # 判断是否恢复终端设置
    function onExit() {
    if [[ "$saved_stty" != "" ]]; then
    restoreSttySettings
    fi
    exit $exit_status
    }

    # 捕获INT中断信号,然就执行onExit方法
    trap onExit INT

    # 保存了当前的终端配置
    saved_stty=$(stty -g 2>/dev/null)
    # 如果收到退出命令,就恢复stty状态
    if [[ ! $? ]]; then
    saved_stty=""
    fi

    # 调用main方法,并传递所有的参数
    main "$@"

    exit_status=$?
    onExit
  2. 上面启动了spark-submit,接下来我们解读该脚本

    # 检查是否设置了${SPARK_HOME}
    if [ -z "${SPARK_HOME}" ]; then
    source "$(dirname "$0")"/find-spark-home
    fi

    # 在Python 3.3+中禁用字符串的随机哈希
    export PYTHONHASHSEED=0

    # 启动spark-class,并传递了org.apache.spark.deploy.SparkSubmit作为第一个参数,然后把前面Spark-shell的参数都传给spark-class
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
  3. 在spark-submit中又启动了spark-class,继续解读

    # 检查/设置SPARK_HOME
    if [ -z "${SPARK_HOME}" ]; then
    source "$(dirname "$0")"/find-spark-home
    fi

    # 配置一些环境变量
    . "${SPARK_HOME}"/bin/load-spark-env.sh

    # 找到bin/java,并赋值给RUNNER变量
    if [ -n "${JAVA_HOME}" ]; then
    RUNNER="${JAVA_HOME}/bin/java"
    else
    if [ "$(command -v java)" ]; then
    RUNNER="java"
    else
    echo "JAVA_HOME is not set" >&2
    exit 1
    fi
    fi

    # 拿到Spark的Jar包
    if [ -d "${SPARK_HOME}/jars" ]; then
    SPARK_JARS_DIR="${SPARK_HOME}/jars"
    else
    SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
    fi

    if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
    echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
    echo "You need to build Spark with the target \"package\" before running this program." 1>&2
    exit 1
    else
    LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
    fi

    # 如果需要,将启动程序构建目录添加到类路径。
    if [ -n "$SPARK_PREPEND_CLASSES" ]; then
    LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
    fi

    # 测试
    if [[ -n "$SPARK_TESTING" ]]; then
    unset YARN_CONF_DIR
    unset HADOOP_CONF_DIR
    fi

    # 封装了真正的执行的spark的类
    build_command() {
    "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
    printf "%d\0" $?
    }

    # 关闭posix模式,因为它不允许进程替换
    set +o posix

    CMD=()
    # 首先循环读取ARG参数,加入到CMD中
    while IFS= read -d '' -r ARG; do
    CMD+=("$ARG")
    # 执行Spark的类
    done < <(build_command "$@")
    COUNT=${#CMD[@]}
    LAST=$((COUNT - 1))
    LAUNCHER_EXIT_CODE=${CMD[$LAST]}


    if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
    echo "${CMD[@]}" | head -n-1 1>&2
    exit 1
    fi

    if [ $LAUNCHER_EXIT_CODE != 0 ]; then
    exit $LAUNCHER_EXIT_CODE
    fi

    CMD=("${CMD[@]:0:$LAST}")
    exec "${CMD[@]}"

可以看到虽然是执行了spark-shell,但是最终执行的是org.apache.spark.launcher.Main类,也就是说spark-shell的最底层是使用java来启动的

他们的执行流程大致如下

spark-shell{
spark-submit{
spark-class{
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
}
}
}
}
Author: Tunan
Link: http://yerias.github.io/2019/10/13/spark/13/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.