DAGScheduler分析

基本介绍

DAGScheduler通过计算将DAG中一系列RDD划分到不同的Stage,然后构建这些Stage之间的关系,最后将每个Stage按照Partition切分为多个Task,并以Task集合(即TaskSet)的形式提交给底层的TaskScheduler
DAGScheduler依赖的一些组件: DAGSchedulerEventProcessLoop, JobListenerActiveJob

JobListener与JobWaiter

JobListener用于监听作业中每个Task执行成功或失败,JobWaiter继承实现了JobListener, 用于等待整个Job执行完毕,然后调用给定的处理函数对返回结果进行处理并最终确定作业的成功或失败

源码实现的一些思考:

  • 通过Scala的异步编程 Future/Promise 实现了任务状态的检测监听, 这块还不太理解,等后面系统学习一下Scala再回顾一下
  • JobWaiter中的定义了一个cancel()方法取消Job执行实际上调用了 DAGScheduler 的cancelJob()方法
  • JobWaiter 中继承 JobListener 实现的taskSucceeded()方法中为了保证线程安全用到了synchronized给对象加锁, 联想到RDD中的stateLock,也用到了synchronized加锁

ActiveJob

ActiveJob表示已经激活的Job, 即DAGScheduler接收处理的Job

DAGSchedulerEventProcessLoop

DAGSchedulerEventProcessLoopDAGScheduler内部的事件循环处理器, 用于处理DAGSchedulerEvent类型的事件,实现原理类似于LiveListenerBus, 有4个方法实现:

  • onReceive

  • doOnReceive

  • onError

  • onStop

    其中doOnReceive()方法中定义了具体事件类型的跳转:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    case StageCancelled(stageId, reason) =>
    dagScheduler.handleStageCancellation(stageId, reason)

    case JobCancelled(jobId, reason) =>
    dagScheduler.handleJobCancellation(jobId, reason)

    case JobGroupCancelled(groupId) =>
    dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
    dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
    dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
    val workerLost = reason match {
    case SlaveLost(_, true) => true
    case _ => false
    }
    dagScheduler.handleExecutorLost(execId, workerLost)

    case WorkerRemoved(workerId, host, message) =>
    dagScheduler.handleWorkerRemoved(workerId, host, message)

    case BeginEvent(task, taskInfo) =>
    dagScheduler.handleBeginEvent(task, taskInfo)

    case SpeculativeTaskSubmitted(task) =>
    dagScheduler.handleSpeculativeTaskSubmitted(task)

    case GettingResultEvent(taskInfo) =>
    dagScheduler.handleGetTaskResult(taskInfo)

    case completion: CompletionEvent =>
    dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason, exception) =>
    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>
    dagScheduler.resubmitFailedStages()
    }

DAGScheduler与Job的提交

简要流程概括, 这里以执行count算子为例:

  • 首先执行count算子会创建调用SparkContext的runJob()方法

    1
    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
  • 然后接着传递调用DAGScheduler的runJob()方法

    1
    2
    3
    4
    5
    def runJob[T, U: ClassTag](...): Unit = {
    ....
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    ....
    }
  • DAGScheduler通过执行submitJob()方法提交Job

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    def runJob[T, U](...): Unit = {
    val start = System.nanoTime
    // 执行submitJob()方法
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
    case scala.util.Success(_) =>
    logInfo("Job %d finished: %s, took %f s".format(...))
    case scala.util.Failure(exception) =>
    logInfo("Job %d failed: %s, took %f s".format(...))
    ...
    }
    }
  • submitJob()生成一个JobWaiter的实例监听Job的执行情况, 向DAGSchedulerEventProcessLoop发送JobSubmitted事件

    1
    2
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(....))
  • 当eventProcessLoop对象投递了JobSubmitted事件之后,对象内的eventThread线程实例对事件进行处理,不断从事件队列中取出事件,调用onReceive函数处理事件,当匹配到JobSubmitted事件后,调用DAGScheduler的handleJobSubmitted函数并传入jobid、rdd等参数来处理Job

  • handleJobSubmitted执行过程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private[scheduler] def handleJobSubmitted(...) {
    var finalStage: ResultStage = null
    try {
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
    ....
    }
    .....

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

    .....

    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
    }

    由源码分析可以概括执行过程如下

    1. 首先调用createResultStage方法创建ResultStage, 这里就开始了Stage的构建过程, 详见下节Stage的构建过程
    2. 创建ActiveJob
    3. 将JobId与刚创建的ActiveJob的对应关系放入jobIdToActiveJob中
    4. 将刚创建的ActiveJob放入activeJobs集合中
    5. 使ResultStage的_activeJob属性持有刚创建的ActiveJob
    6. 获取当前Job的所有Stage对应的StageInfo(即数组stageInfos)
    7. 向listenerBus(LiveListenerBus)投递SparkListenerJobStart事件,进而引发所有关注此事件的监听器执行相应的操作
    8. 调用submitStage方法提交Stage
  • DAGScheduler完成任务提交后,在判断哪些Partition需要计算,就会为Partition生成Task,然后封装成TaskSet,提交至TaskScheduler。等待TaskScheduler最终向集群提交这些Task,监听这些Task的状态

构建Stage

前面我们看到handleJobSubmitted执行过程中调用createResultStage方法创建ResultStage

1
2
3
4
5
6
7
8
9
10
11
private def createResultStage(....): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

根据源码分析,详细的处理步骤如下:

  • 调用getOrCreateParentStages方法获取所有父Stage的列表,父Stage主要是宽依赖对应的Stage

  • getOrCreateParentStages处理步骤:

    1
    2
    3
    4
    5
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
    }
    1. 调用DAGScheduler的getShuffleDependencies方法获取当前给到的RDD的所有ShuffleDependency的序列, 逐个访问其依赖的非Shuffle的RDD,获取所有非Shuffle的RDD的ShuffleDependency依赖
    2. 调用DAGScheduler的getOrCreateShuffleMapStage方法为每一个ShuffleDependency获取或者创建对应的ShuffleMapStage, 并返回得到的ShuffleMapStage列表
  • getOrCreateShuffleMapStage方法处理步骤:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    private def getOrCreateShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) =>
    stage

    case None =>
    getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach{ dep =>
    if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
    createShuffleMapStage(dep, firstJobId)
    }
    }

    createShuffleMapStage(shuffleDep, firstJobId)
    }
    }
    1. 如果已经创建了ShuffleDependency对应的ShuffleMapStage,则直接返回此ShuffleMapStage
    2. 否则调用getMissingAncestorShuffleDependencies方法为所有还未创建过Stage的祖先ShufflleDependency创建ShuffleMapStage并注册
    3. 最后为当前ShuffleDependency创建ShuffleMapStage并注册
  • 生成Stage的身份标识id, 并创建ResultStage

  • 将ResultStage注册到stageIdToStage中

  • 调用updateJobIdStageIdMaps方法更新Job的身份标识与ResultStage及其所有祖先的映射关系

提交ResultStage

handleJobSubmitted中处理Job提交的最后一步是调用submitStage方法提交ResultStage
注意:所有parent Stage都计算完成,才能提交
submitStage方法中有三种调用逻辑:
- submitMissingTasks(stage,jobId.get):如果所有parent stage已经完成,则提交stage所包含的task
- submitStage(parent):有parent stage未完成,则递归提交
- abortStage:无效stage,直接停止

提交还未计算的Task

提交Task的入口是submitMissingTask方法,简单总结一下执行过程:

  • 首先调用stage.findMissingPartitions()方法找到还没有完成计算的分区的索引

  • 获取ActiveJob的properties, properties包含了当前Job的调度、group、描述等信息

  • 调用stage.makeNewStageAttempt()方法开始Stage的执行尝试,并向listenerBus投递SparkListenerStageSubmitted事件

  • 根据Stage类型进行序列化(序列化的时候会利用synchronized加锁)

    1
    2
    //For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
    //For ResultTask, serialize and broadcast (rdd, func)
  • 根据Stage类型分别创建对应的ShuffleMapTask和ResultTask

  • 创建TaskSet, 并调用TaskScheduler的submitTasks方法提交此TaskSet

    1
    2
    taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

Task执行结果的处理

参考链接