Spark on YARN的整体运行的架构

YARN

重要的角色

  • Resource Manager: 全局资源管理器
    • ApplicationManager: 应用管理器(RM子组件),
      • 接收Client提交的请求
      • 为每个APP分配一个appId
      • 选择NM启动AM
      • 管理AM的生命周期
    • Scheduler: 资源调度器(RM子组件) 根据调度策略分配资源, 响应AM的资源请求
  • NodeManager: 节点管理器, 每台机器分配一个NM, 是这台机器在集群中的代理
    • 管理单个节点的资源
    • 向RM报告节点状态和资源使用情况
    • 接收从AM收到请求
  • Container
    • YARN中资源分配的基本单位
    • 对应着一组CPU 内存等资源
    • 运行ApplicationMaster (AM) 或Executor

Spark on YARN的运行流程

ApplicationMaster(区别于上面的ApplicationManager): 每个app都会分配一个AM, 该AM是这个app在YARN集群的代理, 运行在YARN集群中的app通过AM向RM(scheduler)请求资源

  1. 提交AM
    1. Client准备好AM的运行环境, AM提交到ApplicationManager上
    2. RM的ApplicationManager
      1. 分配AppId
      2. 验证资源
      3. 选择NM启动AM
    3. NM启动AM
      1. 下载Jar, 配置文件
      2. 启动AM的main入口 (通过java org.apache.spark.deploy.yarn.ApplicationMaster)
  2. AM请求资源
    1. AM向RM注册自己
      1. AM调用amClietn.registerApplicaionMaster()
      2. 告知RM自己的host和port
    2. AM请求Executor容器
      1. AM周期性调用amClient.allocate()向Scheduler请求Container
      2. 请求内容: 容器数量, 容器的资源(CPU, 内存), 位置偏好
    3. Scheduler分配容器(运行资源)
      1. 根据当前调度策略分配资源
      2. 返回已分配的容器列表给AM
  3. 启动Executor
    1. AM启动Executor
      1. AM通过nmClient.startContainer()向NM发送启动命令 (通过java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend)
    2. Executor连接Driver执行任务
      1. Executor通过RPC连接到Driver
      2. Driver的TaskScheduler分配Task到Executor
      3. Executor返回Task的执行结果到Driver

整体的流程循环是Client将AM放到YARN中执行 AM循环申请Executor(直到全部申请成功), Executor通过RPC和Driver机建立联系

源码解析

源码确实是最明确的, 但是源码并不是最清晰的, 在对整体逻辑和功能性有基础的了解后, 通过源码明确细节是更明智的做法, 代码永远不能回答”为什么”, 只能回答”是什么”, “怎么做”

提交AM

这部分的核心的职责是将AM提交到YARN上运行

Entry Point: org.apache.spark.deploy.Client.scala这个文件就是SparkSubmit要执行的app, 入口点是这个类的start()函数

1
2
3
4
5
6
7
// 因为是在YARN集群上运行的, 所以清除掉本地的配置, 在YARN集群模式下, 使用YARN的缓存
conf.remove(JARS)
conf.remove(FILES)
conf.remove(ARCHIVES)

// 关注构造和run()方法
new Client(new ClientArguments(args), conf, null).run()

这里为什么要移除? 避免重复分发
在SparkSubmit中, 因为ClusterMode是YARN, 所以会将spark.jars/spark.files/spark/archives都复制到spark.yarn.dist/*这个对应的YARN配置上了. 因为确定是YARN Mode所以需要移除这些通用配置. Client就只使用YARN专用配置分发文件

Client构造函数

需要关注的是

  • ClusterMode这个时候因为AM和Driver是一体的, 都运行在集群上, 这个时候AM机要按照Driver机来配置
  • ClientMode AM和Driver机是分离的, AM只负责向ApplicationManager申请资源, Driver运行在Client机上, 所以这个时候集群中的AM机轻量配置即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private val yarnClient = YarnClient.createYarnClient  
private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

private val amMemoryOverheadFactor = if (isClusterMode) {
sparkConf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
} else {
AM_MEMORY_OVERHEAD_FACTOR
}

// AM related configurations
private val amMemory = if (isClusterMode) {
sparkConf.get(DRIVER_MEMORY).toInt
} else {
sparkConf.get(AM_MEMORY).toInt
}

private val driverMinimumMemoryOverhead =
if (isClusterMode) {
sparkConf.get(DRIVER_MIN_MEMORY_OVERHEAD)
} else {
384L
}

private val amMemoryOverhead = {
val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
sparkConf.get(amMemoryOverheadEntry).getOrElse(
math.max((amMemoryOverheadFactor * amMemory).toLong,
driverMinimumMemoryOverhead)).toInt
}

private val amCores = if (isClusterMode) {
sparkConf.get(DRIVER_CORES)
} else {
sparkConf.get(AM_CORES)
}

// Executor related configurations
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
// Executor offHeap memory in MiB.
protected val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf)

private val executorMemoryOvereadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD)
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((executorMemoryOvereadFactor * executorMemory).toLong,
minMemoryOverhead)).toInt

Client.run()

提交一个application到RM上(也就是AM)

1
2
3
4
5
6
7
8
9
10
11
12
def run(): Unit = {
submitApplication()
if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport()
val state = report.getYarnApplicationState
// log
// 如果fail/killed, throw SparkException
} else {
// get YARNAppReport
// 如果fail/killed/error, throw SparkException
}
}

将app提交到YARN上 (重点)

  • 从RM的ApplicationManager中申请一个Application出来
  • 获取YARN中AM的Stage Dir存储路径, (UserGroupName or hadoop homeDir) + appId
  • 创建AM container context
  • 基于container context和appId等信息, 创建一个ApplicationSubmissionContext
  • 提交这个ApplicationSubmissionContext到RM, 并且通过launcherBackend监控这个任务的状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def submitApplication(): Unit = {  
ResourceRequestHelper.validateResources(sparkConf)

try {
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
this.appId = newAppResponse.getApplicationId()

// The app staging dir based on the STAGING_DIR configuration if configured
// otherwise based on the users home directory.
// 省略

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext()
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
}
}

准备ContainerContext, 这一步设置了env, java opt, cmd

  • 其中的重点是amClass的设置
    • ClusterMode -> org.apache.spark.deploy.yarn.ApplicationMaster
    • ClientMode -> org.apache.spark.deploy.yarn.ExecutorLauncher
    • 这两个类的main方法就是接下来这两个类在yarn中要执行的entry point
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
private def createContainerLaunchContext(): ContainerLaunchContext = {  
val pySparkArchives =
if (sparkConf.get(IS_PYTHON_APP)) {
findPySparkArchives()
} else {
Nil
}

// 设置env
val launchEnv = setupLaunchEnv(stagingDirPath, pySparkArchives)
val localResources = prepareLocalResources(stagingDirPath, pySparkArchives)

val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)

val javaOpts = ListBuffer[String]()

javaOpts += s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6}"

// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"

val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
javaOpts += "-Djava.io.tmpdir=" + tmpDir

// Include driver-specific java options if we are launching a driver
if (isClusterMode) {
sparkConf.get(DRIVER_JAVA_OPTIONS).foreach { opts =>
javaOpts ++= Utils.splitCommandString(opts)
.map(Utils.substituteAppId(_, this.appId.toString))
.map(YarnSparkHadoopUtil.escapeForShell)
}
val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
sys.props.get("spark.driver.libraryPath")).flatten
if (libraryPaths.nonEmpty) {
prefixEnv = Some(createLibraryPathPrefix(libraryPaths.mkString(File.pathSeparator),
sparkConf))
}
} else {
// Validate and include yarn am specific java options in yarn-client mode.
sparkConf.get(AM_JAVA_OPTIONS).foreach { opts =>
if (opts.contains("-Dspark")) {
val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to set Spark options (was '$opts')."
throw new SparkException(msg)
}
if (opts.contains("-Xmx")) {
val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to specify max heap memory settings " +
s"(was '$opts'). Use spark.yarn.am.memory instead."
throw new SparkException(msg)
}
javaOpts ++= Utils.splitCommandString(opts)
.map(Utils.substituteAppId(_, this.appId.toString))
.map(YarnSparkHadoopUtil.escapeForShell)
}
sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
prefixEnv = Some(createLibraryPathPrefix(paths, sparkConf))
}
}

val userClass =
if (isClusterMode) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
val userJar =
if (args.userJar != null) {
Seq("--jar", args.userJar)
} else {
Nil
}
val primaryPyFile =
if (isClusterMode && args.primaryPyFile != null) {
Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
} else {
Nil
}
val primaryRFile =
if (args.primaryRFile != null) {
Seq("--primary-r-file", args.primaryRFile)
} else {
Nil
}
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
if (args.primaryRFile != null &&
(args.primaryRFile.endsWith(".R") || args.primaryRFile.endsWith(".r"))) {
args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
}
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
Seq("--dist-cache-conf",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))

// Command for the ApplicationMaster
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
}

在设置完containerContext以后, 创建最终的ApplicationSubmissionContext
核心内容就是将amContainerContext的内容都设置到newapp.getApplicationSubmissionContext上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def createApplicationSubmissionContext(  
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {

val componentName = if (isClusterMode) {
config.YARN_DRIVER_RESOURCE_TYPES_PREFIX
} else {
config.YARN_AM_RESOURCE_TYPES_PREFIX
}
val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)
val amResources = yarnAMResources ++
getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)
logDebug(s"AM resources: $amResources")
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
appContext.setQueue(sparkConf.get(QUEUE_NAME))
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType(sparkConf.get(APPLICATION_TYPE))

sparkConf.get(APPLICATION_TAGS).foreach { tags =>
appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
}
sparkConf.get(MAX_APP_ATTEMPTS) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
"Cluster's default value will be used.")
}

sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
appContext.setAttemptFailuresValidityInterval(interval)
}

val capability = Records.newRecord(classOf[Resource])
capability.setMemorySize(amMemory + amMemoryOverhead)
capability.setVirtualCores(amCores)
if (amResources.nonEmpty) {
ResourceRequestHelper.setResourceRequests(amResources, capability)
}
logDebug(s"Created resource capability for AM request: $capability")

sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
val amRequest = Records.newRecord(classOf[ResourceRequest])
amRequest.setResourceName(ResourceRequest.ANY)
amRequest.setPriority(Priority.newInstance(0))
amRequest.setCapability(capability)
amRequest.setNumContainers(1)
amRequest.setNodeLabelExpression(expr)
appContext.setAMContainerResourceRequests(Collections.singletonList(amRequest))
case None =>
appContext.setResource(capability)
}

appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)

sparkConf.get(APPLICATION_PRIORITY).foreach { appPriority =>
appContext.setPriority(Priority.newInstance(appPriority))
}
appContext
}

至此我们就完成了对要提交任务的所有的设置
接下来就会在submitApplication方法中的yarnClient.submitApplication(appContext)将任务提交到ApplicationManager上

AM请求资源

这里我们暂时只看到ApplicationMaster路线, 也就是在ClusterMode的情况下

Entry Point: org.apache.spark.deploy.yarn.ApplicationMaster.scala
在main方法中主要是做些环境变化后的环境和配置上的处理, 最后逻辑在master.run()处展开, 在master.run()

  • ClusterMode : runDriver()
  • ClientMode: runExecutorLauncher()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def main(args: Array[String]): Unit = {
// 解析命令行参数
// 加载Spark配置
// 设置系统属性
// ...

val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)

// 获取委托令牌(用于访问HDFS等)
// ...

// 关键入口点, master.run()方法
ugi.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = System.exit(master.run())
})
}

final def run(): Int = {
try {
//...
val stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
val stagingDirFs = stagingDirPath.getFileSystem(yarnConf)
//...
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
//...
exitCode
}
}

runDriver() (重点)

  1. startUserApplication(): 启动一个线程, 在线程里面重写的run方法是运行用户程序的main方法, 同时这个线程的名称是Driver, 启动Driver线程
    1. 用户(Driver)线程在运行用户的代码的创建SparkContext(后简称sc)的时候, 会在初始化完毕以后调用wait, 等待Executor都初始化完毕, 防止Driver线程要执行任务, 但是Executor没有申请完毕的情况出现
  2. 等待用户进程完成sc的初始化, 向RM注册AM
  3. 调用createAllocator(…), AM机向RM申请资源去创建Container
  4. 等待Container都申请完毕, 恢复Driver机的运行, 让Driver继续执行用户任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private def runDriver(): Unit = {

// 启动Driver线程
userClassThread = startUserApplication()

logInfo("Waiting for spark context initialization...")
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
try {
// 等待用户线程中的SparkContext初始化完毕, timeout AM_MAX_WAIT_TIME ms
// 用户线程new SparkContext的时候会在初始化完毕以后调用wait(), 这也是后面要resume的原因, 这是用户线程和AM线程同步的方式
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))

if (sc != null) {
val rpcEnv = sc.env.rpcEnv

val userConf = sc.getConf
val host = userConf.get(DRIVER_HOST_ADDRESS)
val port = userConf.get(DRIVER_PORT)
// sc初始化完毕以后向RM注册AM
registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
// 在这里会启动一个后台线程周期性从RM申请Executor
createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf())
} else {
throw new IllegalStateException("User did not initialize spark context!")
}
// 恢复用户线程的执行
resumeDriver()
// 等待用户线程的执行
userClassThread.join()
} finally {
resumeDriver()
}
}

申请 container (重点)

  1. 创建allocator
  2. 第一次尝试申请资源
  3. 开启一个后台线程不断执行资源申请
    1. 重试策略是指数退避的, 从0.1s开始, 时间最长到3.0s
    2. 这个后台线程不光是在不断申请资源, 同时是这个application的心跳发送线程, YARN中超过60s没有接收到心跳包就会被失联清理掉了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private def createAllocator(
driverRef: RpcEndpointRef,
_sparkConf: SparkConf,
rpcEnv: RpcEnv,
appAttemptId: ApplicationAttemptId,
distCacheConf: SparkConf): Unit = {
// ...
val appId = appAttemptId.getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val localResources = prepareLocalResources(distCacheConf)

// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
// log...

allocator = client.createAllocator(
yarnConf,
_sparkConf,
appAttemptId,
driverUrl,
driverRef,
securityMgr,
localResources)

// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
// that when the driver sends an initial executor request (e.g. after an AM restart),
// the allocator is ready to service requests.
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
// 申请一次资源
allocator.allocateResources()
//... metricsSystem
// 开启一个后台线程不断地执行allocator.allocateResources(), 重试指数退避
reporterThread = launchReporterThread()
}

launchReporterThreadq()会启动一个run()内容是执行allocationThreadImpl()方法的线程
1. 这个线程会不断地申请资源, 重试时间是指数退避的
2. 同时这里每次的等待重试是通过在同步代码块中wait + 被唤醒后sleep最小重试间隔(0.2s的策略)
1. 在等待重试的时候, 如果Driver机需要申请新的Executor, 这个时候就会notifyAll, 来加快申请, 但是还是会保证最小的重试时间, 也就是(sleep + wait >= 0.2s), 在wait前会记录wait的时间, 如果小于需要的时间, 说明被中断了, 这个时候会sleep
2. 最小的睡眠时间是为了保证RM不会过载, 每次请求之间至少会间隔0.2s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private def allocationThreadImpl(): Unit = {
// The number of failures in a row until the allocation thread gives up.
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
var failureCount = 0
while (!finished) {
try {
// 达到了最大失败重试次数
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else if (allocator.isAllNodeExcluded) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Due to executor failures all available nodes are excluded")
} else {
logDebug("Sending progress")
// 申请资源
allocator.allocateResources()
}
failureCount = 0
}
try {
val numPendingAllocate = allocator.getNumContainersPendingAllocate
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
sleepInterval =
// 如果有待分配的容器, 计算重试等待时间
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
currentAllocationInterval
} else {
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
sleepStartNs = System.nanoTime()
// 可中断的等待
allocatorLock.wait(sleepInterval)
}
// 被提前唤醒的话, 通过sleep来执行不可中断的等待
val sleepDuration = System.nanoTime() - sleepStartNs
if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
if (toSleep > 0) {
Thread.sleep(toSleep)
}
}
}
}
}

申请资源allocateResources()
1. 在allocateResources中完成申请资源请求的添加
1. 计算缺少的Executor数量
2. 创建Container请求
3. 向AMRMClient添加容器请求
2. 在amClient.allocate(progressIndicator)中发送请求到RM
3. 并且在已经申请好的Container中启动Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def allocateResources(): Unit = synchronized {  
// 添加请求
updateResourceRequests()

val progressIndicator = 0.1f
// 发送请求
val allocateResponse = amClient.allocate(progressIndicator)

// ....health track
if (allocatedContainers.size > 0) {
// 这里会在Container中启动Executor
handleAllocatedContainers(allocatedContainers.asScala.toSeq)
}
}

def updateResourceRequests(): Unit = synchronized {
// 1. 计算缺少的Executor数量
val missingPerProfile = targetNumExecutorsPerResourceProfileId.map { case (rpId, targetNum) =>
val starting = getOrUpdateNumExecutorsStartingForRPId(rpId).get
val pending = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty).size
val running = getOrUpdateRunningExecutorForRPId(rpId).size

missingPerProfile.foreach { case (rpId, missing) =>
val hostToLocalTaskCount =
hostToLocalTaskCountPerResourceProfileId.getOrElse(rpId, Map.empty)
val pendingAllocate = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty)
val numPendingAllocate = pendingAllocate.size
// Split the pending container request into three groups: locality matched list, locality unmatched list and non-locality list.
// Take the locality matched container request into
// consideration of container placement, treat as allocated containers. // For locality unmatched and locality free container requests, cancel these container // requests, since required locality preference has been changed, recalculating using // container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCount, pendingAllocate)

if (missing > 0) {
val resource = rpIdToYarnResource.get(rpId)
// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers

// to maximize locality, include requests with no locality preference that can be cancelled
val potentialContainers = availableContainers + anyHostRequests.size

val allocatedHostToContainer = getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
val numLocalityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(rpId, 0)
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
potentialContainers, numLocalityAwareTasks, hostToLocalTaskCount,
allocatedHostToContainer, localRequests, rpIdToResourceProfile(rpId))
// 2. 创建容器请求
val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
containerLocalityPreferences.foreach {
case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
newLocalityRequests += createContainerRequest(resource, nodes, racks, rpId)
case _ =>
}

if (availableContainers >= newLocalityRequests.size) {
// more containers are available than needed for locality, fill in requests for any host
for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
newLocalityRequests += createContainerRequest(resource, null, null, rpId)
}
} else {
val numToCancel = newLocalityRequests.size - availableContainers
// cancel some requests without locality preferences to schedule more local containers
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
}

// 3. 向AMRMClient添加容器请求
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
}
}
}
}

启动Executor

在YarnAllocator中的handleAllocatedContainers ->

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// 1. 容器匹配逻辑

// Match incoming requests by host
val remainingAfterHostMatches = new ArrayBuffer[Container]
for (allocatedContainer <- allocatedContainers) {
matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
containersToUse, remainingAfterHostMatches)
}
// a separate thread to perform the operation.
val remainingAfterRackMatches = new ArrayBuffer[Container]
if (remainingAfterHostMatches.nonEmpty) {
var exception: Option[Throwable] = None
val thread = new Thread("spark-rack-resolver") {
override def run(): Unit = {
try {
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
} catch {
case e: Throwable =>
exception = Some(e)
}
}
}
thread.setDaemon(true)
thread.start()

try {
thread.join()
} catch {
case e: InterruptedException =>
thread.interrupt()
throw e
}

if (exception.isDefined) {
throw exception.get
}
}

// Assign remaining that are neither node-local nor rack-local
val remainingAfterOffRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterRackMatches) {
matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
remainingAfterOffRackMatches)
}

// 启动Executor
runAllocatedContainers(containersToUse)
}

启动Executor (重点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

/**
* Launches executors in the allocated containers.
*/
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
for (container <- containersToUse) {
// 生成 Executor ID
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString


val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
val rp = rpIdToResourceProfile(rpId)
val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf)
val containerMem = rp.executorResources.get(ResourceProfile.MEMORY).
map(_.amount).getOrElse(defaultResources.executorMemoryMiB).toInt

val defaultCores = defaultResources.cores.get
val containerCores = rp.getExecutorCores.getOrElse(defaultCores)

val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size
if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
launchingExecutorContainerIds.add(containerId)
if (launchContainers) {
// 线程池中异步启动
launcherPool.execute(() => {
try {
// 创建新的ExecutorRunnable !!!
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
containerMem,
containerCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources,
rp.id
).run() // 执行run()
updateInternalState(rpId, executorId, container)
}
}
}
}

ExecutorRunnable启动容器 (重点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def run(): Unit = {  
logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()
}

def startContainer(): java.util.Map[String, ByteBuffer] = {

// 1. 准备启动上下文
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment().asJava

ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)

val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))

val commands = prepareCommand() // 生成容器启动的时候要执行的命令

// ...
}

// Send the start request to the ContainerManager
try {
// 在容器中启动Executor !!!
nmClient.startContainer(container.get, ctx)
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
}
}
在这个生成命令的方法里, 会生成JVM参数, 以及最后要Executor要执行的`org.apache.spark.executor.YarnCoarseGrainedExecutorBackendl`类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private def prepareCommand(): List[String] = {  
// Extra options for the JVM
val javaOpts = ListBuffer[String]()

// Set the JVM memory
val executorMemoryString = s"${executorMemory}m"
javaOpts += "-Xmx" + executorMemoryString

// Set extra Java options for the executor, if defined
sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts =>
val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId)
javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell)
}

// Set the library path through a command prefix to append to the existing value of the
// env variable. val prefixEnv = sparkConf.get(EXECUTOR_LIBRARY_PATH).map { libPath =>
Client.createLibraryPathPrefix(libPath, sparkConf)
}

javaOpts += "-Djava.io.tmpdir=" +
new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)

// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses RPC to connect to the scheduler, the RPC settings are needed as well as the // authentication settings. sparkConf.getAll
.filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }
.foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)

YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
"--resourceProfileId", resourceProfileId.toString) ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
}

NM接受到请求后执行命令

1
2
3
4
5
6
7
8
java -Xmx2048m \
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend \
--driver-url spark://CoarseGrainedScheduler@driver:7077 \
--executor-id 1 \
--hostname worker-node-1 \
--cores 2 \
--app-id application_1234567890_0001 \
--resourceProfileId 0

Executor启动后连接Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {

def main(args: Array[String]): Unit = {
// 解析参数, 创建RPC环境
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores,
env, arguments.resourcesFileOpt, resourceProfile)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
this.getClass.getCanonicalName.stripSuffix("$"))

// 连接Driver
CoarseGrainedExecutorBackend.run(backendArgs, createFn)
System.exit(0)
}

}
YarnCoarseGrainedExecutorBackend.main()

├─ 解析参数
├─ 创建 RPC 环境
├─ 连接到 Driver

└─ 向 Driver 注册 Executor

DriverCoarseGrainedSchedulerBackend 接收注册

└─ TaskScheduler 开始分配 TaskExecutor