
Spark Version: 1.0.1


val textFile = sc.textFile("readme.md")



def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))



private[spark] def clean[F <: AnyRef](f: F): F = {



然后我们继续跟读代码。可以看到这段代码的注释TODO: cache outerClasses / innerClasses / accessedFields。读了上面这段我们就清楚了,它需要把闭包函数的outerClasses/innerClasses/accessedField全部都cache下来,免得这些可被闭包函数访问的区域被GC丢掉了。



def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum



* Run a job on all partitions in an RDD and return the results in an array.
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.size, false)


* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: Iterator[T] => U,
    partitions: Seq[Int],
    allowLocal: Boolean
): Array[U] = {
    runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)


* Run a function on a given set of partitions in an RDD and return the results as an array. The
* allowLocal flag specifies whether the scheduler can run the computation on the driver rather
* than shipping it out to the cluster, for short actions like first().
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    allowLocal: Boolean
): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)


* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark. The allowLocal
* flag specifies whether the scheduler can run the computation on the driver rather than
* shipping it out to the cluster, for short actions like first().
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    allowLocal: Boolean,
    resultHandler: (Int, U) => Unit) {
    if (dagScheduler == null) {
        throw new SparkException("SparkContext has been shutdown")
    val callSite = getCallSit //这是一个函数调用链的记录,不用太在意
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite)
    val start = System.nanoTime
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
        resultHandler, localProperties.get) //看这里看这里
    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")



def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: String,
    allowLocal: Boolean,
    resultHandler: (Int, U) => Unit,
    properties: Properties = null)
    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    waiter.awaitResult() match {
        case JobSucceeded => {}
        case JobFailed(exception: Exception) =>
            logInfo("Failed to run " + callSite)
            throw exception


* Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to cancel the job.
def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: String,
    allowLocal: Boolean,
    resultHandler: (Int, U) => Unit,
    properties: Properties = null): JobWaiter[U] =
    // Check to make sure we are not launching a task on a partition that does not exist.
    //这里的partition是在runJob的时候就传进来了,用0 until rdd.partition.size所以会存在的
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
        throw new IllegalArgumentException(
            "Attempting to access a non-existent partition: " + p + ". " +
            "Total number of partitions: " + maxPartitions)
    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
        return new JobWaiter[U](this, jobId, 0, resultHandler)

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessActor ! JobSubmitted(
        jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)

这里有一个问题,为什么当partitions.size==0时,直接return new JobWaiter[U](this, jobId, 0, resultHandler)?这里的resultHandler在之前被传入的值是(index, res) => results(index) = res,与用户指定的func并没有什么关系呀。
关于这个问题,我们现在跟踪的这个程序并不存在,因为partition是在使用0 until rdd.parition.size来初始化的,所以一定有partition,所以我们先放一放这个问题。




* An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
* results to the given handler function.



* The main event loop of the DAG scheduler.
def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
        dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
            listener, properties)


private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    allowLocal: Boolean,
    callSite: String,
    listener: JobListener,
    properties: Properties = null)
    var finalStage: Stage = null
    try {
        // New stage creation may throw an exception if, for example, jobs are run on a
        // HadoopRDD whose underlying HDFS files have been deleted.
        finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
    } catch {
        case e: Exception =>
            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
    if (finalStage != null) {
        val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
        logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
            job.jobId, callSite, partitions.length, allowLocal))
        logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
        logInfo("Parents of final stage: " + finalStage.parents)
        logInfo("Missing parents: " + getMissingParentStages(finalStage))
        if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
            // Compute very short actions like first() or take() with no parent stages locally.
            listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
        } else {
            jobIdToActiveJob(jobId) = job //这是一个HashMap
            activeJobs += job
            resultStageToJob(finalStage) = job //这是一个HashMap
            listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,properties))



然后判断了一下,如果任务比较小,而且没有parent stages就可以本地做了(直接runLocally(job))。否则需要submitStage(finalStage)




// Broken out for easier testing in DAGSchedulerSuite.
protected def runLocallyWithinThread(job: ActiveJob) {
    var jobResult: JobResult = JobSucceeded
    try {
        val rdd = job.finalStage.rdd
        val split = rdd.partitions(job.partitions(0))
        val taskContext =
             new TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true)
        try {
            val result = job.func(taskContext, rdd.iterator(split, taskContext))
            job.listener.taskSucceeded(0, result)
        } finally {
    } catch {
        case e: Exception =>
            val exception = new SparkDriverExecutionException(e)
            jobResult = JobFailed(exception)
    } finally {
        val s = job.finalStage
        stageIdToJobIds -= s.id    // clean up data structures that were populated for a local job,
        stageIdToStage -= s.id     // but that won't get cleaned up via the normal paths through
        stageToInfos -= s          // completion events or stage abort
        jobIdToStageIds -= job.jobId
        listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))

一来就判断说Job Succeed了,后面如果catch到错,再改。计算好以后会把结果送到job.listener.taskSucceeded(0, result)



/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
        logDebug("submitStage(" + stage + ")")
        if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            if (missing == Nil) {
                logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
                submitMissingTasks(stage, jobId.get)
                runningStages += stage
            } else {
                for (parent <- missing) {
                    submitStage(parent) //递归啦!递归啦!递归啦!!!
                waitingStages += stage
    } else {
        abortStage(stage, "No active job for stage " + stage.id)

=================================找missing stage分割线===============================


private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage] //放missing的Stage的
    val visited = new HashSet[RDD[_]]
    def visit(rdd: RDD[_]) {
        if (!visited(rdd)) {
            visited += rdd
            if (getCacheLocs(rdd).contains(Nil)) {
                for (dep <- rdd.dependencies) {
                    dep match {
                        case shufDep: ShuffleDependency[_,_] =>
                             val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                            if (!mapStage.isAvailable) {
                                 missing += mapStage
                        case narrowDep: NarrowDependency[_] =>

就是顺着依赖链往上找所有missing的Stage,加到missing list中,返回。这里注意有一个宽依赖和窄依赖的判断。如果是窄依赖,直接递归上一个rdd,并不会新加一个Stage。只有对于要shuffle的宽依赖才会new Stage。对于宽依赖,会getShffleMapStage并放到missing list中。在getShffleMapStage中,会调用newOrUsedStage,这个函数是专门为shuffle stage准备的。那么它做了什么事情呢?

 * Create a shuffle map Stage for the given RDD.  The stage will also be associated with the
 * provided jobId.  If a stage for the shuffleId existed previously so that the shuffleId is
 * present in the MapOutputTracker, then the number and location of available outputs are
 * recovered from the MapOutputTracker
private def newOrUsedStage(
    rdd: RDD[_],
    numTasks: Int,
    shuffleDep: ShuffleDependency[_,_],
    jobId: Int,
    callSite: Option[String] = None)
  : Stage =
  //注意到如果shuffle是会new Stage的。
  val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
  if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
    val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
    for (i <- 0 until locs.size) {
      stage.outputLocs(i) = Option(locs(i)).toList   // locs(i) will be null if missing
    stage.numAvailableOutputs = locs.count(_ != null)
  } else {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)


=================================missing stage找完啦===============================

让我们回到找所有的missing stage,对于missing list != Nil的情况,说明我们还没有找到头,还没有划分好所有的Stage,此时递归调用submitStage(parent)找。直到missing list == Nil,就说明我们已经找到了所有的父依赖,并根据Shuffle划分好了所有的Stage。这个时候就可以submitMissingTasks了。

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // Get our pending tasks and remember them in our pendingTasks entry
    val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
    var tasks = ArrayBuffer[Task[_]]()
    if (stage.isShuffleMap) {
        for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
            val locs = getPreferredLocs(stage.rdd, p)
            tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
    } else {
        // This is a final stage; figure out its job's missing partitions
        val job = resultStageToJob(stage)
        for (id <- 0 until job.numPartitions if !job.finished(id)) {
            val partition = job.partitions(id)
            val locs = getPreferredLocs(stage.rdd, partition)
            tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
    val properties = if (jobIdToActiveJob.contains(jobId)) {
    } else {
        // this stage will be assigned to "default" pool

    // must be run listener before possible NotSerializableException
    // should be "StageSubmitted" first and then "JobEnded"
    listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))

    if (tasks.size > 0) {
        // Preemptively serialize a task to make sure it can be serialized. We are catching this
        // exception here because it would be fairly hard to catch the non-serializable exception
        // down the road, where we have several different implementations for local scheduler and
        // cluster schedulers.
        try {
        } catch {
            case e: NotSerializableException =>
            abortStage(stage, "Task not serializable: " + e.toString)
            runningStages -= stage

        logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
        myPending ++= tasks
        logDebug("New pending tasks: " + myPending)
        //看这里 看这里
          new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
          stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
    } else {
        logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
        runningStages -= stage

这个函数首先判断是不是一个shuffle的stage:如果是,对于每个partitionnew ShuffleMapTask;否则,对于每个partitionnew ResultTask。这里有一个小设计,每个partition可以选prefer machine。先看一下这个再回到主线。

===============================可以选择partition prefer的machine哟===============================


 * Synchronized method that might be called from other threads.
 * @param rdd whose partitions are to be looked at
 * @param partition to lookup locality information for
 * @return list of machines that are preferred by the partition
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
  // If the partition is cached, return the cache locations
  val cached = getCacheLocs(rdd)(partition)
  if (!cached.isEmpty) {
    return cached
  // If the RDD has some placement preferences (as is the case for input RDDs), get those
  val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
  if (!rddPrefs.isEmpty) {
    return rddPrefs.map(host => TaskLocation(host))
  // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
  // that has any placement preferences. Ideally we would choose based on transfer sizes,
  // but this will do for now.
  rdd.dependencies.foreach {
    case n: NarrowDependency[_] =>
      for (inPart <- n.getParents(partition)) {
        val locs = getPreferredLocs(n.rdd, inPart)
        if (locs != Nil) {
          return locs
    case _ =>




然后我们要对这个task进行序列化,不然不能提交。然后就taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttempId(), stage.jobId, properties))。OK终于看完stage阶段了,来到了task。看TaskScheduler.scala,然后发现这个类的所有实现都在TaskSchedulerImpl.scala。以下是这个类的一段注释。



Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
It can also work with a local setup by using a LocalBackend and setting isLocal to true.
It handles common logic, like determining a scheduling order across jobs, waking up to launch
speculative tasks, etc.

Clients should first call initialize() and start(), then submit task sets through the
runTasks method.

THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
threads, so it needs locks in public API methods to maintain its state. In addition, some
SchedulerBackends synchronize on themselves when they want to send events here, and then
acquire a lock on us, so we need to make sure that we don't try to lock the backend while
we are holding a lock on ourselves.


突然想要插播一下关于Backend什么时候被初始化到TaskSchedulerImpl里的。这段代码在SparkContext.scala里,有一个createTaskScheduler这个函数的作用是Creates a task scheduler based on a given master URL. Extracted for testing.这里猜一下它是在master被启动的时候就调用了的。这里面有对集群启动的方式进行判断,比如local啊mesos啊yarn啊什么的,我们平时用的应该就是spark自带的standalone,所以是SPARK_REGEX。在这里会有

val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)

可以看到使用的backend是在scheduler.client里的SparkDeploySchedulerBackend这是一个endpoint for executors to talk to us猜一下就是为worker准备的向master报告完成任务的endpoint





override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized { 
    val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
    activeTaskSets(taskSet.id) = manager
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient memory")
          } else {
    hasReceivedTask = true



// Make fake resource offers on all executors
def makeOffers() {
    executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))

注意到这里开始向executors launch tasks了。

// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val serializedTask = ser.serialize(task)
    if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
      val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
      scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => //这里面存的是TaskSetManager
        try { 
          var msg = "Serialized task %s:%d was %d bytes which " +
            "exceeds spark.akka.frameSize (%d bytes). " +
            "Consider using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
    else {
      freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
      executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))

首先对于每个task都要序列化(序列化了才能发出去啊),然后如果出了个什么问题(跟size有关,就是那个if),对于每一个TaskSetManager,新建一个msg,执行taskSet.abort(msg)。对于没问题,就先把这个Task需要用到的cpu核的数量从freeCores中减去(这里说明master对整个集群的core的数量有一个全局观,然后可以分配这样,至于怎么分配可以再研究一下),然后executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))。executorActor是CoarseGrainedSchedulerBackend中的一个HashMap[String, ActorRef]。因此,从executorActor(task.executorId)得到的实际上是一个ActorRef。这是一个akka的数据结构。那么这个消息发到哪里去了呢?答案是在executor.CoarseGrainedExecutorBackend.scala。这个类是在worker上启动了的,具体可以参考这篇文章。所以其实就相当于把序列化以后的Task通过akka发送给了worker。顺便说一句,我们就这么从scheduler包来到了executor包。


override def receive = {
  case LaunchTask(data) =>
    if (executor == null) {
      logError("Received LaunchTask command but executor was null")
    } else {
      val ser = SparkEnv.get.closureSerializer.newInstance()
      val taskDesc = ser.deserialize[TaskDescription](data.value)
      logInfo("Got assigned task " + taskDesc.taskId)
      executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)//看这里,看这里


def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
  val tr = new TaskRunner(context, taskId, serializedTask)
  runningTasks.put(taskId, tr)


override def run(){
    val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
    updateDependencies(taskFiles, taskJars)
    task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
    val value = task.run(taskId.toInt)

首先对Task反序列化,得到这个task依赖的jar包和file,然后使用updateDependencies(taskFiles, taskJars)。把这些依赖都拉过来。如果一切正常,调用val value = task.run(taskId.toInt)。值得一提的是,这里会对java的GC时间进行测量。



