只关注spark application通过spark-submit shell脚本启动的情况
以问题为导向探究

  1. ✅ spark-submit 脚本做了什么?
  2. ✅ Launcher 层的作用是什么?为什么需要它?
  3. ✅ 参数是如何从 Shell 传递到 Java 的?
  4. ✅ 为什么使用 NULL 分隔符?
  5. ✅ prepareSubmitEnvironment() 返回的 4 个值分别是什么?
  6. ✅ Client 和 Cluster 模式的 childMainClass 有什么不同?
  7. ✅ 不同集群管理器(YARN/K8s/Standalone)的启动有什么区别?
  8. ✅ app.start() 之后发生了什么?
  9. ✅ 配置的优先级是怎样的?
  10. ✅ 为什么某些模式组合不支持(如 LOCAL + CLUSTER)?

How spark command is parsed

shell脚本入口阶段

找到SPARK_HOME, JAVA_HOME, 将launcher入口类和submit入口类添加到执行路径中, 补全java命令, 执行命令

整体的流程

entry point

file: bin/spark-submit

  • 提交命令
1
2
3
spark-submit \
--master yarn \
--class MyApp my.jar arg1 arg2
  • 源码
1
2
3
4
5
6
7
# 找到SPARK_HOME的路径
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi

# 执行bin./spark-class脚本, 将传递给spark-submit的参数添加上org.apache.spark.deploy.SparkSubmit
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
  1. 找到并设置SPARK_HOME
  2. 调用exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
    1. 通过SPARK_HOME的路径加载spark-class脚本, 同时在命令行参数中添加org.apache.spark.deploy.SparkSubmit, 这个类是spark提交任务的入口类, 也就是后面java要执行的类

为执行java Launcher做准备和设置

file: bin/spark-class

  1. 加载load-spark-env.sh
    1. 尝试加载用户的spark-env.sh (如果存在)
    2. ${SPARK_HOME}/conf/spark-env.sh加载(把这个文件中的配置全都加载到环境变量中)
  2. 设置RUNNER (java的路径JAVA_HOME, 也是后面用于启动整个spark的java版本)
  3. 设置SPARK_JARS_DIR
  4. 设置LAUNCH_CLASSPATH = ${SPARK_JARS_DIR}/*
  5. build_command组装命令: "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"

最后要执行的命令

1
2
3
4
5
java -Xmx128m $SPARK_LAUNCHER_OPTS -cp ${SPARK_JARS_DIR}/*  \
org.apache.spark.launcher.Main \
org.apache.spark.deploy.SparkSubmit \
--master yarn \
--class MyApp my.jar arg1 arg2

Launcher阶段

构建最后要执行的用于启动spark application的java命令行, 输出给spark-class

这一步做的事情处理更加复杂的对参数的整合处理, 最后得到一个完整的目标命令, 之所以放在java中运行, 是因为这部分的处理颇为繁复和复杂, 放在java里更好写

file: launcher/src/main/…/spark/launcher.Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public static void main(String[] argsArray) throws Exception {
List<String> cmd;
// spark-submit case, we just focus on this
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
// build full java command in spark-submit case
try {
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
// now we have prepared parsed command class instances in builder
cmd = buildCommand(builder, env, printLaunchCommand);
} catch //...
} else //...

List<String> bashCmd = prepareBashCommand(cmd, env);
// this output will catch by spark-class shell
for (String c : bashCmd) {
System.out.print(c.replaceFirst("\r$",""));
System.out.print('\0');
}
}


private static List<String> buildCommand(
AbstractCommandBuilder builder,
Map<String, String> env,
boolean printLaunchCommand) {
List<String> cmd = builder.buildCommand(env);
return cmd;
}

// builder's method
public List<String> buildCommand(Map<String, String> env) {
// is Remote...

if (PYSPARK_SHELL.equals(appResource) && !isSpecialCommand) {
return buildPySparkShellCommand(env);
} else if (SPARKR_SHELL.equals(appResource) && !isSpecialCommand) {
return buildSparkRCommand(env);
} else {
// our case will entry this one
return buildSparkSubmitCommand(env);
}
}

private List<String> buildSparkSubmitCommand(Map<String, String> env) {
// Load the properties file and check whether spark-submit will be running the app's driver
// or just launching a cluster app. When running the driver, the JVM's argument will be
// modified to cover the driver's configuration.
Map<String, String> config = getEffectiveConfig();
boolean isClientMode = isClientMode(config);
String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
String defaultExtraClassPath = config.get(SparkLauncher.DRIVER_DEFAULT_EXTRA_CLASS_PATH);
if (extraClassPath == null || extraClassPath.trim().isEmpty()) {
extraClassPath = defaultExtraClassPath;
} else {
extraClassPath += File.pathSeparator + defaultExtraClassPath;
}

List<String> cmd = buildJavaCommand(extraClassPath);
// Take Thrift/Connect Server as daemon
if (isThriftServer(mainClass) || isConnectServer(mainClass)) {
addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
}
addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));

// We don't want the client to specify Xmx. These have to be set by their corresponding
// memory flag --driver-memory or configuration entry spark.driver.memory
String driverDefaultJavaOptions = config.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS);
checkJavaOptions(driverDefaultJavaOptions);
String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
checkJavaOptions(driverExtraJavaOptions);

if (isClientMode) {
// Figuring out where the memory value come from is a little tricky due to precedence.
// Precedence is observed in the following order: // - explicit configuration (setConf()), which also covers --driver-memory cli argument. // - properties file. // - SPARK_DRIVER_MEMORY env variable // - SPARK_MEM env variable // - default value (1g) // Take Thrift/Connect Server as daemon
String tsMemory =
isThriftServer(mainClass) || isConnectServer(mainClass) ?
System.getenv("SPARK_DAEMON_MEMORY") : null;
String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
cmd.add("-Xmx" + memory);
addOptionString(cmd, driverDefaultJavaOptions);
addOptionString(cmd, driverExtraJavaOptions);
mergeEnvPathList(env, getLibPathEnvName(),
config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
}

// SPARK-36796: Always add some JVM runtime default options to submit command
addOptionString(cmd, JavaModuleOptions.defaultModuleOptions());
addOptionString(cmd, "-Dderby.connection.requireAuthentication=false");
cmd.add("org.apache.spark.deploy.SparkSubmit");
cmd.addAll(buildSparkSubmitArgs());
return cmd;
}

从Main中能看到整体的调用的链路: Root -> main

  • SparkSubmitCommandBuilder: 构造方法 -> 将命令行的k-v解析并注入到OptionParser对象中, 保存在builder中
  • buildCommand:
    • builder.buildCommand:
      • buildSparkSubmitCommand:
        • 这里是逻辑的关键点, 会将spark的配置转化成JVM的配置
        • 有内存的设置, 注入默认参数值
  • 最后将命令从cmd/env中提取出来, 并且打印到stdout中供spark-class捕获

这里值得关注的点有两个

  1. memory的注入的优先级
    • explicit configuration (setConf()), which also covers –driver-memory cli argument.
    • properties file.
    • SPARK_DRIVER_MEMORY env variable
    • SPARK_MEM env variable
    • default value (1g)
  2. 传递给spark-class的命令以\0作为分隔符
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
SparkSubmitCommandBuilder(List<String> args) {  
// not entry this switch statement if use single spark-submit
if (args.size() > 0) {
switch (args.get(0)) {
case "pyspark-shell-main":
//...
case "sparkr-shell-main":
//...
case "run-example":
//...
}

OptionParser parser = new OptionParser(true);
parser.parse(submitArgs);
// now we have prepared parsed command class instances in builder
// for Main to resolve it
} else //...
}


// parse是一个模版方法, 其中的关键逻辑是在子类中重写的handle方法
// this method parse cmd command k-v to class field
protected boolean handle(String opt, String value) {
switch (opt) {
case MASTER -> master = value;
case REMOTE -> {
isRemote = true;
remote = value;
}
case DEPLOY_MODE -> deployMode = value;
case PROPERTIES_FILE -> propertiesFile = value;
case LOAD_SPARK_DEFAULTS -> loadSparkDefaults = true;
case DRIVER_MEMORY -> conf.put(SparkLauncher.DRIVER_MEMORY, value);
case DRIVER_JAVA_OPTIONS -> conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
case DRIVER_LIBRARY_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
case DRIVER_DEFAULT_CLASS_PATH ->
conf.put(SparkLauncher.DRIVER_DEFAULT_EXTRA_CLASS_PATH, value);
case DRIVER_CLASS_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
case CONF -> {
checkArgument(value != null, "Missing argument to %s", CONF);
String[] setConf = value.split("=", 2);
checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
conf.put(setConf[0], setConf[1]);
}
case //...
}
return true;
}

这里是SparkSubmitCommandBuilder的调用链条

  • constructor
    • parser.parser: 外包在handle的while循环
      • handle: 将k-v转换成对象中的字段的值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r _ARG; do
ARG=${_ARG//$'\r'}
if [ "$CMD_START_FLAG" == "true" ]; then
CMD+=("$ARG")
else
if [ "$ARG" == $'\0' ]; then
# After NULL character is consumed, change the delimiter and consume command string.
DELIM=''
CMD_START_FLAG="true"
elif [ "$ARG" != "" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@")

CMD=("${CMD[@]:0:$LAST}") # last is exit code, remove it
exec "${CMD[@]}"

最后spark-class将Launcher的输出以\0为分割, 以数组的形式加载到CMD中, 最后exec

正式启动SparkApplication应用

entry point: org.apache.spark.deploy.SparkSubmit.scala

对于应用的启动, spark会针对不同的维度做不同的处理, 所以要理解下面的不同的启动application的路径, 必须的前置知识是对于spark不同的运行的模式的理解

spark的运行模式

Spark从集群管理器部署模式两个维度划分运行模式

分类

从集群管理器来看

  • LOCAL: 单机模式, 单JVM进程, 但是所有组件都在一个进程中
  • STANDALONE: Spark自带的集群管理器, 适合小规模集群
  • YARN: Hadoop Yarn集群
  • k8s: 容器化部署

从部署模式来看

  • Client: Driver机是本地(spark-submit进程), 可以看到实时输出, 方便调试
  • Cluster: 将Driver机交给集群管理, 更加稳定可靠, 但是无法直接获取Driver机输出

不同的集群管理器, 决定了spark不同的资源申请方式, 不同的部署方式, 决定了Driver的启动位置

下面的代码都是SparkSubmit.scala: prepareSubmitEnvironment中的

对于分类的识别

通过--master参数设置
cluster mode识别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val clusterManager: Int = args.maybeMaster match {  
case Some(v) =>
assert(args.maybeRemote.isEmpty)
v match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if SparkMasterRegex.isK8s(m) => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, k8s, or local")
-1
}
case None => LOCAL // default master or remote mode.
}

通过master参数的内容进行匹配, 默认是LOCAL模式

通过spark.submit.deployMode conf来设置
deploy mode识别:

1
2
3
4
5
6
7
val deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ =>
error("Deploy mode must be either client or cluster")
-1
}

默认是CLIENT模式

快速失败

快速失败, 处理不接受的(cluster, deployMode)组合方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
(clusterManager, deployMode) match { 
// CLUSTER & STANDALONE & Py/R
case (STANDALONE, CLUSTER) if args.isPython =>
error("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")

// LOCAL & CLUSTER
case (LOCAL, CLUSTER) =>
error("Cluster deploy mode is not compatible with master \"local\"")

// CLUSTER & SHELL
case (_, CLUSTER) if isShell(args.primaryResource) =>
error("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark Thrift server.")
case (_, CLUSTER) if isConnectServer(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark Connect server.")
case _ =>
}

应用启动

怎么将控制权交给用户代码

根据运行模式, 指定不同的mainClass

  • 如果deployMode == CLIENT
    • 运行的主类是用户代码的mainClass
    • 用户的资源直接使用本地的资源(因为是client模式, app是直接运行在用户机上的)
  • 如果使用的是YarnCluster
    • 运行的主类是YARN_CLUSTER_SUBMIT_CLASS: org.apache.spark.deploy.yarn.YarnClusterApplication
      • 用户的代码是python代码
        • 将用户的代码和资源添加到–primary-py-file
        • –class org.apache.spark.deploy.PythonRunner
      • 如果用户的代码是Java/Scala代码
        • 将资源添加到–jar后
        • 用–class指定用户的mainClass
  • 如果使用的是k8sCluster
    • 运行的主类是KUBERNETES_CLUSTER_SUBMIT_CLASS: org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
      • 和上面一样的处理, 如果是python, 则要运行的mainClass是"--main-class", "org.apache.spark.deploy.PythonRunner"
      • 如果是Java/Scala: --main-class args.mainClass
      • 通过--arg添加上args

在指定完主类以后, 会在runMain中运行app.start()来启动对应的SparkApplication子类

问题解答

  1. ✅ spark-submit 脚本做了什么?
    1. 找到SPARK_HOME, 然后在SPARK_HOME/bin找到并启动spark-class脚本
  2. ✅ Launcher 层的作用是什么?为什么需要它?
    1. 作用: 用于生成最终的完整的适应不同平台的java命令用于启动Spark, 之所以放在java代码而不是bash中, 是因为这里的功能较为复杂, 放在Java中更好处理, 对于Driver机的memory的指定也是在这一步进行的
    2. 我们需要解析用户传入的参数, 来将它们解析成Spark能够方便解析的样式
  3. ✅ 参数是如何从 Shell 传递到 Java 的?
    1. 在启动JVM的时候作为命令行参数传入
  4. ✅ 为什么使用 NULL 分隔符?
    1. 这个分隔符在bash中是完全没有意义的字符, 能够保证在所有的bash和环境中都能正确解析
  5. ✅ prepareSubmitEnvironment() 返回的 4 个值分别是什么?
    1. childMainClass: 要start执行的SparkApplication, 会在这个app里面执行用户的代码
    2. childClassPath: 用户要执行的代码所需要的资源/jar文件路径
    3. childArgs: 用户代码需要的命令行参数
    4. sparkConf: spark的配置
  6. ✅ Client 和 Cluster 模式的 childMainClass 有什么不同?
    1. Client的会直接赋值成用户代码的MainClass, 在app.start的时候, 实际上启动的是JavaMainApplication来包装用户的MainClass并通过反射invoke运行
    2. Cluster会根据用户的集群的类型, 使用不同的向集群提交任务的包装类, 在启动application的时候, 实际上启动的是这些集群启动包装类
  7. ✅ 不同集群管理器(YARN/K8s/Standalone)的启动有什么区别?
    1. 会需要使用不同的main-class/classpath/args的配置的参数, 同时会使用不同的SparkApplication
  8. ✅ app.start() 之后发生了什么?
    1. 这个方法是阻塞调用的, 会等待SparkApplication中的main方法运行完毕
      1. CLIENT模式下: 反射invoke用户的main函数, 所以在执行到这一行代码的时候, 会阻塞等待用户的main方法执行完.
      2. CLUSTER模式下: 运行集群包装类, 在运行完毕以后, 向集群提交任务以后会包装类就运行完毕了
    2. 执行finally代码块: 在k8s环境下运行的spark-submit, 主动调用stop来清理这个submit进程的资源
  9. ✅ 配置的优先级是怎样的?
    1. 命令行参数
    2. 用户通过--properties指定的properties文件
    3. spark-defaults.conf文件
    4. 环境变量
    5. 默认值
    6. 采用灵活度和更加明确的配置方式优先级更高
  10. ✅ 为什么某些模式组合不支持(如 LOCAL + CLUSTER)?
    1. 因为这些deployMode和clusterMode是冲突的, LOCAL意味着没有集群, 这个时候不能选择CLUSTER deployMode