publicstaticvoidmain(String[] argsArray)throws Exception { List<String> cmd; // spark-submit case, we just focus on this if (className.equals("org.apache.spark.deploy.SparkSubmit")) { // build full java command in spark-submit case try { AbstractCommandBuilderbuilder=newSparkSubmitCommandBuilder(args); // now we have prepared parsed command class instances in builder cmd = buildCommand(builder, env, printLaunchCommand); } catch//... } else//...
List<String> bashCmd = prepareBashCommand(cmd, env); // this output will catch by spark-class shell for (String c : bashCmd) { System.out.print(c.replaceFirst("\r$","")); System.out.print('\0'); } }
// builder's method public List<String> buildCommand(Map<String, String> env) { // is Remote... if (PYSPARK_SHELL.equals(appResource) && !isSpecialCommand) { return buildPySparkShellCommand(env); } elseif (SPARKR_SHELL.equals(appResource) && !isSpecialCommand) { return buildSparkRCommand(env); } else { // our case will entry this one return buildSparkSubmitCommand(env); } }
private List<String> buildSparkSubmitCommand(Map<String, String> env) { // Load the properties file and check whether spark-submit will be running the app's driver // or just launching a cluster app. When running the driver, the JVM's argument will be // modified to cover the driver's configuration. Map<String, String> config = getEffectiveConfig(); booleanisClientMode= isClientMode(config); StringextraClassPath= isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null; StringdefaultExtraClassPath= config.get(SparkLauncher.DRIVER_DEFAULT_EXTRA_CLASS_PATH); if (extraClassPath == null || extraClassPath.trim().isEmpty()) { extraClassPath = defaultExtraClassPath; } else { extraClassPath += File.pathSeparator + defaultExtraClassPath; } List<String> cmd = buildJavaCommand(extraClassPath); // Take Thrift/Connect Server as daemon if (isThriftServer(mainClass) || isConnectServer(mainClass)) { addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS")); } addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS")); // We don't want the client to specify Xmx. These have to be set by their corresponding // memory flag --driver-memory or configuration entry spark.driver.memory StringdriverDefaultJavaOptions= config.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS); checkJavaOptions(driverDefaultJavaOptions); StringdriverExtraJavaOptions= config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS); checkJavaOptions(driverExtraJavaOptions); if (isClientMode) { // Figuring out where the memory value come from is a little tricky due to precedence. // Precedence is observed in the following order: // - explicit configuration (setConf()), which also covers --driver-memory cli argument. // - properties file. // - SPARK_DRIVER_MEMORY env variable // - SPARK_MEM env variable // - default value (1g) // Take Thrift/Connect Server as daemon StringtsMemory= isThriftServer(mainClass) || isConnectServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null; Stringmemory= firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY), System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); cmd.add("-Xmx" + memory); addOptionString(cmd, driverDefaultJavaOptions); addOptionString(cmd, driverExtraJavaOptions); mergeEnvPathList(env, getLibPathEnvName(), config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH)); } // SPARK-36796: Always add some JVM runtime default options to submit command addOptionString(cmd, JavaModuleOptions.defaultModuleOptions()); addOptionString(cmd, "-Dderby.connection.requireAuthentication=false"); cmd.add("org.apache.spark.deploy.SparkSubmit"); cmd.addAll(buildSparkSubmitArgs()); return cmd; }
SparkSubmitCommandBuilder(List<String> args) { // not entry this switch statement if use single spark-submit if (args.size() > 0) { switch (args.get(0)) { case"pyspark-shell-main": //... case"sparkr-shell-main": //... case"run-example": //... } OptionParserparser=newOptionParser(true); parser.parse(submitArgs); // now we have prepared parsed command class instances in builder // for Main to resolve it } else//... }
// parse是一个模版方法, 其中的关键逻辑是在子类中重写的handle方法 // this method parse cmd command k-v to class field protectedbooleanhandle(String opt, String value) { switch (opt) { case MASTER -> master = value; case REMOTE -> { isRemote = true; remote = value; } case DEPLOY_MODE -> deployMode = value; case PROPERTIES_FILE -> propertiesFile = value; case LOAD_SPARK_DEFAULTS -> loadSparkDefaults = true; case DRIVER_MEMORY -> conf.put(SparkLauncher.DRIVER_MEMORY, value); case DRIVER_JAVA_OPTIONS -> conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); case DRIVER_LIBRARY_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); case DRIVER_DEFAULT_CLASS_PATH -> conf.put(SparkLauncher.DRIVER_DEFAULT_EXTRA_CLASS_PATH, value); case DRIVER_CLASS_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); case CONF -> { checkArgument(value != null, "Missing argument to %s", CONF); String[] setConf = value.split("=", 2); checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); conf.put(setConf[0], setConf[1]); } case//... } returntrue; }
set +o posix CMD=() DELIM=$'\n' CMD_START_FLAG="false" while IFS= read -d "$DELIM" -r _ARG; do ARG=${_ARG//$'\r'} if [ "$CMD_START_FLAG" == "true" ]; then CMD+=("$ARG") else if [ "$ARG" == $'\0' ]; then # After NULL character is consumed, change the delimiter and consume command string. DELIM='' CMD_START_FLAG="true" elif [ "$ARG" != "" ]; then echo"$ARG" fi fi done < <(build_command "$@")
CMD=("${CMD[@]:0:$LAST}") # last is exit code, remove it exec"${CMD[@]}"
val clusterManager: Int = args.maybeMaster match { caseSome(v) => assert(args.maybeRemote.isEmpty) v match { case"yarn" => YARN case m if m.startsWith("spark") => STANDALONE case m ifSparkMasterRegex.isK8s(m) => KUBERNETES case m if m.startsWith("local") => LOCAL case _ => error("Master must either be yarn or start with spark, k8s, or local") -1 } caseNone => LOCAL// default master or remote mode. }
通过master参数的内容进行匹配, 默认是LOCAL模式
通过spark.submit.deployMode conf来设置 deploy mode识别:
1 2 3 4 5 6 7
val deployMode: Int = args.deployMode match { case"client" | null => CLIENT case"cluster" => CLUSTER case _ => error("Deploy mode must be either client or cluster") -1 }
(clusterManager, deployMode) match { // CLUSTER & STANDALONE & Py/R case (STANDALONE, CLUSTER) if args.isPython => error("Cluster deploy mode is currently not supported for python " + "applications on standalone clusters.") case (STANDALONE, CLUSTER) if args.isR => error("Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.") // LOCAL & CLUSTER case (LOCAL, CLUSTER) => error("Cluster deploy mode is not compatible with master \"local\"") // CLUSTER & SHELL case (_, CLUSTER) if isShell(args.primaryResource) => error("Cluster deploy mode is not applicable to Spark shells.") case (_, CLUSTER) if isSqlShell(args.mainClass) => error("Cluster deploy mode is not applicable to Spark SQL shell.") case (_, CLUSTER) if isThriftServer(args.mainClass) => error("Cluster deploy mode is not applicable to Spark Thrift server.") case (_, CLUSTER) if isConnectServer(args.mainClass) => error("Cluster deploy mode is not applicable to Spark Connect server.") case _ => }