Spark启动一个job的过程

前置条件

在 AM 执行完 resumeDriver() 后,Executor 已经分配并注册完成,
用户代码继续运行。当代码执行到 action 操作时,开始创建和提交 job。

详细步骤

  1. 代码执行到action操作, 这个时候已经有了有transformation操作形成的RDD DAG
    • 窄依赖: 父RDD的每个分区只被一个子RDD分区使用, 如(map, filter)
    • 宽依赖: 父RDD的每个分区都有可能被多个子RDD分区使用, 如(reduceByKey)
    • 注意对于宽窄依赖来说, 依赖的粒度是分区级别, 同时这个属性是某个transfomation的固有属性, 这是这个算子的计算方式决定的, 如果他产生一条数据需要父RDD中不止一条数据, 涉及到数据的聚合,这个时候这个操作就会是宽依赖的操作
    • 所有的action操作最后都会调用到SparkContext.runJob()
  2. SparkContext提交Job到DAGScheduler
    • sc.runJob进行参数清理和验证
    • 调用DAGScheduler.runJob() -> submitJob()
    • 生成唯一的JobId
    • 向DAGScheduler事件循环发送JobSubmitted事件
  3. DAGScheduler 划分Stage
    • 从最终的RDD开始, 向上遍历已经存在的RDD依赖链
    • 划分的规则: 碰到了宽依赖, 就切分出来新的Stage
    • 窄依赖的RDD会被划分到一个Stage中
    • 递归创建所有的Stage, 形成Stage DAG
  4. 创建ActiveJob, 并提交Stage
    • 使用finalStage创建ActiveJob对象
    • 调用SubmitStage开始调度
    • 采用递归策略, 先提交父Stage, 再提交子Stage, 来保证Stage之间的拓扑顺序的不变
  5. 将Stage分解成Tasks
    • 为Stage中的每个需要计算的partition创建一个Task
    • ShuffleMapStage -> ShuffleMapTask
    • ResultStage -> ResultTask
    • 将Task封装成TaskSet
  6. TaskScheduler调度Tasks到Executor
    • 创建TaskSetManager管理TaskSet
    • 根据调度策略和数据本地性分配Tasks
    • 将Tasks序列化后发送给Executor执行
  7. Executor执行Tasks并返回结果
    • Executor执行Task, 计算RDD的partition
    • ShuffleMapTask写入shuffle数据
    • ResultTask返回最后的结果
    • 将数据返回给Driver

源码解析

现在的时刻是AM执行完resumeDriver()的时机, 这个时候Executor已经分配好了, 用户的代码继续运行.

1
2
3
4
5
6
7
// 用户代码
val rdd1 = sc.textFile("data.txt")
val rdd2 = rdd1.map(line => line.split(","))
val rdd3 = rdd2.filter(arr => arr.length > 2)
val rdd4 = rdd3.map(arr => (arr(0), 1))
val rdd5 = rdd4.reduceByKey(_ + _)
val result = rdd5.collect()

1. 用户的transformation操作形成RDD DAG

RDD

Spark懒执行的实现方式就是, 执行transformation操作的时候, 实际上并不会真的去做这个生成新的RDD的操作, 而是记录这条操作路径, 而每个操作的必要信息都记录在RDD中
这里的核心信息有四个

  • prev: 父RDD的引用
  • deps: 这个RDD对应的操作的依赖类型
  • func: 这个操作要执行的函数(会被清理闭包)
  • partitions: 分区信息

RDD源码

  • 一定要重写的方法
    • getPartitions
    • getDependencies
    • compute
  • 如果在继承这个类的时候, 使用的构造函数是只将父RDD传入的构造方法, 说明这个子类是窄依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
abstract class RDD[T: ClassTag](  
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {

// 窄依赖类型的RDD使用的构造函数, 只用传入父RDD的引用, 依赖关系自动指定为窄依赖
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))

private[spark] def conf = sc.conf
// =======================================================================
// Methods that should be implemented by subclasses of RDD // =======================================================================
// compute a given partition.? TODO
def compute(split: Partition, context: TaskContext): Iterator[T]


// 返回这个RDD中的partition集合
protected def getPartitions: Array[Partition]

// 返回依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps

// 子类可选实现项: specify placement preferences.? TODO
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

// 子类可选实现项: 定制的分区器
@transient val partitioner: Option[Partitioner] = None

// ... else code
}

窄依赖类型RDD实现: MapPartitionsRDD

  • 继承的时候使用的构造函数是只指定父RDD的构造函数, 说明这是一个窄依赖的RDD
  • getPartitions: 因为是窄依赖, 使用父RDD的分区情况就行
  • compute: TODO
  • partitioner: 使用父RDD的分区器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](  
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev) {

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

override def getPartitions: Array[Partition] = firstParent[T].partitions

override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
// ...
}

宽依赖类型的RDD实现: ShuffledRDD

  • getDependencies: 返回List[ShuffleDependency], 宽依赖的RDD的deps是List[ShuffleDependency]
  • partitioner: 涉及到shuffle操作, 所以需要传入shuffle后的分区使用的分区器
  • getPartitions: 包装一遍 ? TODO
  • compute: TODO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](  
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {

override def getDependencies: Seq[Dependency[_]] = {
val serializer = userSpecifiedSerializer.getOrElse {
val serializerManager = SparkEnv.get.serializerManager
if (mapSideCombine) {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
} else {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
}
}
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}

override val partitioner = Some(part)

override def getPartitions: Array[Partition] = {
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}
// ...

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
SparkEnv.get.shuffleManager.getReader(
dep.shuffleHandle, split.index, split.index + 1, context, metrics)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
// ...
}

transformation -> RDD DAG

每个transformation操作实际上在背后就是创建了对应类型的RDD, 并且设置不同的func

  • map和filter都是窄依赖的操作, 所以最后创建的都是窄依赖类型的RDD MapPartitionsRDD, 这里的clean是为了清理用户函数的闭包
  • 不同点在于map操作对应it操作是map, filter对应的是filter
1
2
3
4
5
6
7
8
9
10
11
12
def map[U: ClassTag](f: T => U): RDD[U] = withScope {  
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(_, _, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}

reduceByKey是宽依赖操作, 最后创建的是ShuffledRDD, 同时需要指定partitionner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {  
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
// ...
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}

小结

最后用户的代码

1
2
3
4
5
6
val rdd1 = sc.textFile("data.txt")
val rdd2 = rdd1.map(line => line.split(","))
val rdd3 = rdd2.filter(arr => arr.length > 2)
val rdd4 = rdd3.map(arr => (arr(0), 1))
val rdd5 = rdd4.reduceByKey(_ + _)
val result = rdd5.count()

最后就会形成 RDD DAG

HadoopRDD rdd1 <- MapPartionsRDD rdd2 <- MapPartionsRDD rdd 3 <- MapPartionsRDD rdd4 <- ShuffledRDD rdd5 (finalRdd)

最后的count()是action操作, 我们进入到下一阶段

2. Action 触发 Job提交

RDD.action()调用SparkContext.runJob()

所有的action类型的操作最后都是在调用SparkContext.runJob(rdd, func, …)

1
2
// RDD.scala
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

runJob有非常多种类型, 对应不同action操作传入的参数是不一样的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// count的入口runJob
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, rdd.partitions.indices)
}

// 清理函数闭包, 并且解析出partitions
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}

// 构建resultHandler, 返回的结果会存放在results中
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}

// 最后的核心入口函数, 这里的核心就是调用dagScheduler.runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
val callSite = getCallSite()
val cleanedFunc = clean(func)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

DAGScheduler.runJob()

调用链DAGScheduler.runJob -> DAGScheduler.submitJob ->

  • runJob()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def runJob[T, U](  
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
// key code: submitJob()
// 等待结束, 生成日志
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
// sucesss log
case scala.util.Failure(exception) =>
// failure log and stack Trace
throw exception
}
}
  • submitJob()
    • 这里对于partitions为空的时候做了快速失败处理, 直接post job开始和job结束的消息
    • 如果partitions非空, 也就是数据非空, 提交job (post JobSubmitted)
    • 生成唯一jobId
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def submitJob[T, U](  
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.

val jobId = nextJobId.getAndIncrement()
if (partitions.isEmpty) {
// fast fail
}

val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// 向DAGScheduler事件循环提交这个job任务
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
JobArtifactSet.getActiveOrDefault(sc),
Utils.cloneProperties(properties)))
waiter
}

3. DAGScheduler 事件循环处理 JobSubmitted

事件分发

DAGSchedulerEventProcessLoop: DAGScheduler事件循环处理器, 在收到了Event以后通过onReceive来分发事件, JobSubmitted Event会被分发到handleJobSubmitted方法上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)  
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {

private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer

/**
* The main event loop of the DAG scheduler. */
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} catch {
case ex: ShuffleStatusNotFoundException =>
dagScheduler.handleShuffleStatusNotFoundException(ex)
} finally {
timerContext.stop()
}
}

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, artifacts, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, artifacts,
properties)

case MapStageSubmitted(jobId, dependency, callSite, listener, artifacts, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, artifacts,
properties)

case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)

case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

case StageFailed(stageId, reason, exception) =>
dagScheduler.handleStageFailed(stageId, reason, exception)
}

开始Stage划分和Job创建

核心代码

  • createResultStage: 划分出来Stage, 得到最后的finalStage (也就是ResultStage, 同时也是在这一步创建的Stage DAG)
  • setActiveJob
  • submitStage: 提交finalStage也就是提交job
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private[scheduler] def handleJobSubmitted(  
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
artifacts: JobArtifactSet,
properties: Properties): Unit = {
// If this job belongs to a cancelled job group, skip running it

var finalStage: ResultStage = null
try {
// 核心代码
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)

val job = new ActiveJob(jobId, finalStage, callSite, listener, artifacts, properties)
clearCacheLocs()


jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)

// 提交job
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos =
stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)).toImmutableArraySeq
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
Utils.cloneProperties(properties)))
submitStage(finalStage)
}

4. Stage划分 (核心逻辑)

在这一步会从finalRDD开始, 向上遍历整个RDD DAG

  • 调用getShuffleDependenciesAndResourceProfiles找出finalRDD的最近的宽依赖们
  • 调用getOrCreateParentStages, 为每个ShuffleDependency创建一个ShuffleMapStage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private def createResultStage(  
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
// 创建finalRDD所处的Stage
val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
// 递归从finalRDD向前创建所有的父stage
val parents = getOrCreateParentStages(shuffleDeps, jobId)

// 创建最后的返回的数据的Stage
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
callSite, resourceProfile.id)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private[scheduler] def getShuffleDependenciesAndResourceProfiles(  
rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val resourceProfiles = new HashSet[ResourceProfile]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
Option(toVisit.getResourceProfile()).foreach(resourceProfiles += _)
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.prepend(dependency.rdd)
}
}
}
(parents, resourceProfiles)
}