defsubmitApplication(): Unit = { ResourceRequestHelper.validateResources(sparkConf) try { launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start()
// Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() this.appId = newAppResponse.getApplicationId() // The app staging dir based on the STAGING_DIR configuration if configured // otherwise based on the users home directory. // 省略 // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext() val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) } }
logInfo("Waiting for spark context initialization...") val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { // 等待用户线程中的SparkContext初始化完毕, timeout AM_MAX_WAIT_TIME ms // 用户线程new SparkContext的时候会在初始化完毕以后调用wait(), 这也是后面要resume的原因, 这是用户线程和AM线程同步的方式 val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) { val rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf val host = userConf.get(DRIVER_HOST_ADDRESS) val port = userConf.get(DRIVER_PORT) // sc初始化完毕以后向RM注册AM registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
privatedefcreateAllocator( driverRef: RpcEndpointRef, _sparkConf: SparkConf, rpcEnv: RpcEnv, appAttemptId: ApplicationAttemptId, distCacheConf: SparkConf): Unit = { // ... val appId = appAttemptId.getApplicationId().toString() val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString val localResources = prepareLocalResources(distCacheConf)
// Before we initialize the allocator, let's log the information about how executors will // be run up front, to avoid printing this out for every single executor being launched. // Use placeholders for information that changes such as executor IDs. // log... allocator = client.createAllocator( yarnConf, _sparkConf, appAttemptId, driverUrl, driverRef, securityMgr, localResources)
// Initialize the AM endpoint *after* the allocator has been initialized. This ensures // that when the driver sends an initial executor request (e.g. after an AM restart), // the allocator is ready to service requests. rpcEnv.setupEndpoint("YarnAM", newAMEndpoint(rpcEnv, driverRef)) // 申请一次资源 allocator.allocateResources() //... metricsSystem // 开启一个后台线程不断地执行allocator.allocateResources(), 重试指数退避 reporterThread = launchReporterThread() }
defupdateResourceRequests(): Unit = synchronized { // 1. 计算缺少的Executor数量 val missingPerProfile = targetNumExecutorsPerResourceProfileId.map { case (rpId, targetNum) => val starting = getOrUpdateNumExecutorsStartingForRPId(rpId).get val pending = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty).size val running = getOrUpdateRunningExecutorForRPId(rpId).size missingPerProfile.foreach { case (rpId, missing) => val hostToLocalTaskCount = hostToLocalTaskCountPerResourceProfileId.getOrElse(rpId, Map.empty) val pendingAllocate = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty) val numPendingAllocate = pendingAllocate.size // Split the pending container request into three groups: locality matched list, locality unmatched list and non-locality list. // Take the locality matched container request into // consideration of container placement, treat as allocated containers. // For locality unmatched and locality free container requests, cancel these container // requests, since required locality preference has been changed, recalculating using // container placement strategy. val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( hostToLocalTaskCount, pendingAllocate) if (missing > 0) { val resource = rpIdToYarnResource.get(rpId) // cancel "stale" requests for locations that are no longer needed staleRequests.foreach { stale => amClient.removeContainerRequest(stale) } val cancelledContainers = staleRequests.size // consider the number of new containers and cancelled stale containers available val availableContainers = missing + cancelledContainers // to maximize locality, include requests with no locality preference that can be cancelled val potentialContainers = availableContainers + anyHostRequests.size val allocatedHostToContainer = getOrUpdateAllocatedHostToContainersMapForRPId(rpId) val numLocalityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(rpId, 0) val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers( potentialContainers, numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainer, localRequests, rpIdToResourceProfile(rpId)) // 2. 创建容器请求 val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest] containerLocalityPreferences.foreach { caseContainerLocalityPreferences(nodes, racks) if nodes != null => newLocalityRequests += createContainerRequest(resource, nodes, racks, rpId) case _ => } if (availableContainers >= newLocalityRequests.size) { // more containers are available than needed for locality, fill in requests for any host for (i <- 0 until (availableContainers - newLocalityRequests.size)) { newLocalityRequests += createContainerRequest(resource, null, null, rpId) } } else { val numToCancel = newLocalityRequests.size - availableContainers // cancel some requests without locality preferences to schedule more local containers anyHostRequests.slice(0, numToCancel).foreach { nonLocal => amClient.removeContainerRequest(nonLocal) } } // 3. 向AMRMClient添加容器请求 newLocalityRequests.foreach { request => amClient.addContainerRequest(request) } } } }
defhandleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = { val containersToUse = newArrayBuffer[Container](allocatedContainers.size) // 1. 容器匹配逻辑
// Match incoming requests by host val remainingAfterHostMatches = newArrayBuffer[Container] for (allocatedContainer <- allocatedContainers) { matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost, containersToUse, remainingAfterHostMatches) } // a separate thread to perform the operation. val remainingAfterRackMatches = newArrayBuffer[Container] if (remainingAfterHostMatches.nonEmpty) { var exception: Option[Throwable] = None val thread = newThread("spark-rack-resolver") { overridedefrun(): Unit = { try { for (allocatedContainer <- remainingAfterHostMatches) { val rack = resolver.resolve(allocatedContainer.getNodeId.getHost) matchContainerToRequest(allocatedContainer, rack, containersToUse, remainingAfterRackMatches) } } catch { case e: Throwable => exception = Some(e) } } } thread.setDaemon(true) thread.start()
try { thread.join() } catch { case e: InterruptedException => thread.interrupt() throw e }
if (exception.isDefined) { throw exception.get } }
// Assign remaining that are neither node-local nor rack-local val remainingAfterOffRackMatches = newArrayBuffer[Container] for (allocatedContainer <- remainingAfterRackMatches) { matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse, remainingAfterOffRackMatches) }
/** * Launches executors in the allocated containers. */ privatedefrunAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized { for (container <- containersToUse) { // 生成 Executor ID val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId val executorId = executorIdCounter.toString val yarnResourceForRpId = rpIdToYarnResource.get(rpId) val rp = rpIdToResourceProfile(rpId) val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf) val containerMem = rp.executorResources.get(ResourceProfile.MEMORY). map(_.amount).getOrElse(defaultResources.executorMemoryMiB).toInt
val defaultCores = defaultResources.cores.get val containerCores = rp.getExecutorCores.getOrElse(defaultCores)
privatedefprepareCommand(): List[String] = { // Extra options for the JVM val javaOpts = ListBuffer[String]() // Set the JVM memory val executorMemoryString = s"${executorMemory}m" javaOpts += "-Xmx" + executorMemoryString // Set extra Java options for the executor, if defined sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts => val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId) javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell) } // Set the library path through a command prefix to append to the existing value of the // env variable. val prefixEnv = sparkConf.get(EXECUTOR_LIBRARY_PATH).map { libPath => Client.createLibraryPathPrefix(libPath, sparkConf) } javaOpts += "-Djava.io.tmpdir=" + newPath(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses RPC to connect to the scheduler, the RPC settings are needed as well as the // authentication settings. sparkConf.getAll .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) } .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId", resourceProfileId.toString) ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") // TODO: it would be nicer to just make sure there are no null commands here commands.map(s => if (s == null) "null"else s).toList }