Spark启动一个job的过程 前置条件 在 AM 执行完 resumeDriver() 后,Executor 已经分配并注册完成, 用户代码继续运行。当代码执行到 action 操作时,开始创建和提交 job。
详细步骤
代码执行到action操作, 这个时候已经有了有transformation操作形成的RDD DAG
窄依赖: 父RDD的每个分区只被一个子RDD分区使用, 如(map, filter)
宽依赖: 父RDD的每个分区都有可能被多个子RDD分区使用, 如(reduceByKey)
注意对于宽窄依赖来说, 依赖的粒度是分区级别, 同时这个属性是某个transfomation的固有属性, 这是这个算子的计算方式决定的, 如果他产生一条数据需要父RDD中不止一条数据, 涉及到数据的聚合,这个时候这个操作就会是宽依赖的操作
所有的action操作最后都会调用到SparkContext.runJob()
SparkContext提交Job到DAGScheduler
sc.runJob进行参数清理和验证
调用DAGScheduler.runJob() -> submitJob()
生成唯一的JobId
向DAGScheduler事件循环发送JobSubmitted事件
DAGScheduler 划分Stage
从最终的RDD开始, 向上遍历已经存在的RDD依赖链
划分的规则: 碰到了宽依赖, 就切分出来新的Stage
窄依赖的RDD会被划分到一个Stage中
递归创建所有的Stage, 形成Stage DAG
创建ActiveJob, 并提交Stage
使用finalStage创建ActiveJob对象
调用SubmitStage开始调度
采用递归策略, 先提交父Stage, 再提交子Stage, 来保证Stage之间的拓扑顺序的不变
将Stage分解成Tasks
为Stage中的每个需要计算的partition创建一个Task
ShuffleMapStage -> ShuffleMapTask
ResultStage -> ResultTask
将Task封装成TaskSet
TaskScheduler调度Tasks到Executor
创建TaskSetManager管理TaskSet
根据调度策略和数据本地性分配Tasks
将Tasks序列化后发送给Executor执行
Executor执行Tasks并返回结果
Executor执行Task, 计算RDD的partition
ShuffleMapTask写入shuffle数据
ResultTask返回最后的结果
将数据返回给Driver
源码解析
现在的时刻是AM执行完resumeDriver()的时机, 这个时候Executor已经分配好了, 用户的代码继续运行.
1 2 3 4 5 6 7 val rdd1 = sc.textFile("data.txt" )val rdd2 = rdd1.map(line => line.split("," ))val rdd3 = rdd2.filter(arr => arr.length > 2 )val rdd4 = rdd3.map(arr => (arr(0 ), 1 ))val rdd5 = rdd4.reduceByKey(_ + _)val result = rdd5.collect()
RDD Spark懒执行的实现方式就是, 执行transformation操作的时候, 实际上并不会真的去做这个生成新的RDD的操作, 而是记录这条操作路径, 而每个操作的必要信息都记录在RDD中 这里的核心信息有四个
prev: 父RDD的引用
deps: 这个RDD对应的操作的依赖类型
func: 这个操作要执行的函数(会被清理闭包)
partitions: 分区信息
RDD源码
一定要重写的方法
getPartitions
getDependencies
compute
如果在继承这个类的时候, 使用的构造函数是只将父RDD传入的构造方法, 说明这个子类是窄依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 abstract class RDD [T : ClassTag ]( @transient private var _sc: SparkContext , @transient private var deps: Seq [Dependency [_]] ) extends Serializable with Logging { def this (@transient oneParent: RDD [_]) = this (oneParent.context, List (new OneToOneDependency (oneParent))) private [spark] def conf = sc.conf def compute (split: Partition , context: TaskContext ): Iterator [T ] protected def getPartitions : Array [Partition ] protected def getDependencies : Seq [Dependency [_]] = deps protected def getPreferredLocations (split: Partition ): Seq [String ] = Nil @transient val partitioner: Option [Partitioner ] = None }
窄依赖类型RDD实现: MapPartitionsRDD
继承的时候使用的构造函数是只指定父RDD的构造函数, 说明这是一个窄依赖的RDD
getPartitions: 因为是窄依赖, 使用父RDD的分区情况就行
compute: TODO
partitioner: 使用父RDD的分区器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private [spark] class MapPartitionsRDD [U : ClassTag , T : ClassTag ]( var prev: RDD [T ], f: (TaskContext , Int , Iterator [T ] ) => Iterator [U ], preservesPartitioning: Boolean = false , isFromBarrier: Boolean = false , isOrderSensitive: Boolean = false ) extends RDD [U ](prev) { override val partitioner = if (preservesPartitioning) firstParent[T ].partitioner else None override def getPartitions : Array [Partition ] = firstParent[T ].partitions override def compute (split: Partition , context: TaskContext ): Iterator [U ] = f(context, split.index, firstParent[T ].iterator(split, context)) }
宽依赖类型的RDD实现: ShuffledRDD
getDependencies: 返回List[ShuffleDependency], 宽依赖的RDD的deps是List[ShuffleDependency]
partitioner: 涉及到shuffle操作, 所以需要传入shuffle后的分区使用的分区器
getPartitions: 包装一遍 ? TODO
compute: TODO
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class ShuffledRDD [K : ClassTag , V : ClassTag , C : ClassTag ]( @transient var prev: RDD [_ <: Product2 [K , V ]], part: Partitioner ) extends RDD [(K , C )](prev.context, Nil ) { override def getDependencies : Seq [Dependency [_]] = { val serializer = userSpecifiedSerializer.getOrElse { val serializerManager = SparkEnv .get.serializerManager if (mapSideCombine) { serializerManager.getSerializer(implicitly[ClassTag [K ]], implicitly[ClassTag [C ]]) } else { serializerManager.getSerializer(implicitly[ClassTag [K ]], implicitly[ClassTag [V ]]) } } List (new ShuffleDependency (prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) } override val partitioner = Some (part) override def getPartitions : Array [Partition ] = { Array .tabulate[Partition ](part.numPartitions)(i => new ShuffledRDDPartition (i)) } override def compute (split: Partition , context: TaskContext ): Iterator [(K , C )] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency [K , V , C ]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv .get.shuffleManager.getReader( dep.shuffleHandle, split.index, split.index + 1 , context, metrics) .read() .asInstanceOf[Iterator [(K , C )]] } }
每个transformation操作实际上在背后就是创建了对应类型的RDD, 并且设置不同的func
map和filter都是窄依赖的操作, 所以最后创建的都是窄依赖类型的RDD MapPartitionsRDD, 这里的clean是为了清理用户函数的闭包
不同点在于map操作对应it操作是map, filter对应的是filter
1 2 3 4 5 6 7 8 9 10 11 12 def map [U : ClassTag ](f: T => U ): RDD [U ] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD [U , T ](this , (_, _, iter) => iter.map(cleanF)) } def filter (f: T => Boolean ): RDD [T ] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD [T , T ]( this , (_, _, iter) => iter.filter(cleanF), preservesPartitioning = true ) }
reduceByKey是宽依赖操作, 最后创建的是ShuffledRDD, 同时需要指定partitionner
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def reduceByKey (partitioner: Partitioner , func: (V , V ) => V ): RDD [(K , V )] = self.withScope { combineByKeyWithClassTag[V ]((v: V ) => v, func, func, partitioner) } def combineByKeyWithClassTag [C ]( createCombiner: V => C , mergeValue: (C , V ) => C , mergeCombiners: (C , C ) => C , partitioner: Partitioner , mapSideCombine: Boolean = true , serializer: Serializer = null )(implicit ct: ClassTag [C ]): RDD [(K , C )] = self.withScope { if (self.partitioner == Some (partitioner)) { self.mapPartitions(iter => { val context = TaskContext .get() new InterruptibleIterator (context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true ) } else { new ShuffledRDD [K , V , C ](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
小结 最后用户的代码
1 2 3 4 5 6 val rdd1 = sc.textFile("data.txt" )val rdd2 = rdd1.map(line => line.split("," ))val rdd3 = rdd2.filter(arr => arr.length > 2 )val rdd4 = rdd3.map(arr => (arr(0 ), 1 ))val rdd5 = rdd4.reduceByKey(_ + _)val result = rdd5.count()
最后就会形成 RDD DAG
HadoopRDD rdd1 <- MapPartionsRDD rdd2 <- MapPartionsRDD rdd 3 <- MapPartionsRDD rdd4 <- ShuffledRDD rdd5 (finalRdd )
最后的count()是action操作, 我们进入到下一阶段
2. Action 触发 Job提交 RDD.action()调用SparkContext.runJob() 所有的action类型的操作最后都是在调用SparkContext.runJob(rdd, func, …)
1 2 def count (): Long = sc.runJob(this , Utils .getIteratorSize _).sum
runJob有非常多种类型, 对应不同action操作传入的参数是不一样的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 def runJob [T , U : ClassTag ](rdd: RDD [T ], func: Iterator [T ] => U ): Array [U ] = { runJob(rdd, func, rdd.partitions.indices) } def runJob [T , U : ClassTag ]( rdd: RDD [T ], func: Iterator [T ] => U , partitions: Seq [Int ]): Array [U ] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext , it: Iterator [T ]) => cleanedFunc(it), partitions) } def runJob [T , U : ClassTag ]( rdd: RDD [T ], func: (TaskContext , Iterator [T ]) => U , partitions: Seq [Int ]): Array [U ] = { val results = new Array [U ](partitions.size) runJob[T , U ](rdd, func, partitions, (index, res) => results(index) = res) results } def runJob [T , U : ClassTag ]( rdd: RDD [T ], func: (TaskContext , Iterator [T ]) => U , partitions: Seq [Int ], resultHandler: (Int , U ) => Unit ): Unit = { val callSite = getCallSite() val cleanedFunc = clean(func) dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
DAGScheduler.runJob() 调用链DAGScheduler.runJob -> DAGScheduler.submitJob ->
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def runJob [T , U ]( rdd: RDD [T ], func: (TaskContext , Iterator [T ]) => U , partitions: Seq [Int ], callSite: CallSite , resultHandler: (Int , U ) => Unit , properties: Properties ): Unit = { val start = System .nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils .awaitReady(waiter.completionFuture, Duration .Inf ) waiter.completionFuture.value.get match { case scala.util.Success (_) => case scala.util.Failure (exception) => throw exception } }
submitJob()
这里对于partitions为空的时候做了快速失败处理, 直接post job开始和job结束的消息
如果partitions非空, 也就是数据非空, 提交job (post JobSubmitted)
生成唯一jobId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def submitJob [T , U ]( rdd: RDD [T ], func: (TaskContext , Iterator [T ]) => U , partitions: Seq [Int ], callSite: CallSite , resultHandler: (Int , U ) => Unit , properties: Properties ): JobWaiter [U ] = { val jobId = nextJobId.getAndIncrement() if (partitions.isEmpty) { } val func2 = func.asInstanceOf[(TaskContext , Iterator [_]) => _] eventProcessLoop.post(JobSubmitted ( jobId, rdd, func2, partitions.toArray, callSite, waiter, JobArtifactSet .getActiveOrDefault(sc), Utils .cloneProperties(properties))) waiter }
3. DAGScheduler 事件循环处理 JobSubmitted 事件分发 DAGSchedulerEventProcessLoop: DAGScheduler事件循环处理器, 在收到了Event以后通过onReceive来分发事件, JobSubmitted Event会被分发到handleJobSubmitted方法上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private [scheduler] class DAGSchedulerEventProcessLoop (dagScheduler: DAGScheduler ) extends EventLoop [DAGSchedulerEvent ]("dag-scheduler-event-loop" ) with Logging { private [this ] val timer = dagScheduler.metricsSource.messageProcessingTimer override def onReceive (event: DAGSchedulerEvent ): Unit = { val timerContext = timer.time() try { doOnReceive(event) } catch { case ex: ShuffleStatusNotFoundException => dagScheduler.handleShuffleStatusNotFoundException(ex) } finally { timerContext.stop() } } private def doOnReceive (event: DAGSchedulerEvent ): Unit = event match { case JobSubmitted (jobId, rdd, func, partitions, callSite, listener, artifacts, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, artifacts, properties) case MapStageSubmitted (jobId, dependency, callSite, listener, artifacts, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, artifacts, properties) case StageCancelled (stageId, reason) => dagScheduler.handleStageCancellation(stageId, reason) case JobCancelled (jobId, reason) => dagScheduler.handleJobCancellation(jobId, reason) case StageFailed (stageId, reason, exception) => dagScheduler.handleStageFailed(stageId, reason, exception) }
开始Stage划分和Job创建 核心代码
createResultStage: 划分出来Stage, 得到最后的finalStage (也就是ResultStage, 同时也是在这一步创建的Stage DAG)
setActiveJob
submitStage: 提交finalStage也就是提交job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private [scheduler] def handleJobSubmitted ( jobId: Int , finalRDD: RDD [_], func: (TaskContext , Iterator [_]) => _, partitions: Array [Int ], callSite: CallSite , listener: JobListener , artifacts: JobArtifactSet , properties: Properties ): Unit = { var finalStage: ResultStage = null try { finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } barrierJobIdToNumTasksCheckFailures.remove(jobId) val job = new ActiveJob (jobId, finalStage, callSite, listener, artifacts, properties) clearCacheLocs() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)).toImmutableArraySeq listenerBus.post( SparkListenerJobStart (job.jobId, jobSubmissionTime, stageInfos, Utils .cloneProperties(properties))) submitStage(finalStage) }
4. Stage划分 (核心逻辑) 在这一步会从finalRDD开始, 向上遍历整个RDD DAG
调用getShuffleDependenciesAndResourceProfiles找出finalRDD的最近的宽依赖们
调用getOrCreateParentStages, 为每个ShuffleDependency创建一个ShuffleMapStage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private def createResultStage ( rdd: RDD [_], func: (TaskContext , Iterator [_]) => _, partitions: Array [Int ], jobId: Int , callSite: CallSite ): ResultStage = { val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd) val resourceProfile = mergeResourceProfilesForStage(resourceProfiles) val parents = getOrCreateParentStages(shuffleDeps, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage (id, rdd, func, partitions, parents, jobId, callSite, resourceProfile.id) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private [scheduler] def getShuffleDependenciesAndResourceProfiles ( rdd: RDD [_]): (HashSet [ShuffleDependency [_, _, _]], HashSet [ResourceProfile ]) = { val parents = new HashSet [ShuffleDependency [_, _, _]] val resourceProfiles = new HashSet [ResourceProfile ] val visited = new HashSet [RDD [_]] val waitingForVisit = new ListBuffer [RDD [_]] waitingForVisit += rdd while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.remove(0 ) if (!visited(toVisit)) { visited += toVisit Option (toVisit.getResourceProfile()).foreach(resourceProfiles += _) toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency [_, _, _] => parents += shuffleDep case dependency => waitingForVisit.prepend(dependency.rdd) } } } (parents, resourceProfiles) }