最近被拜托讲Spark,然后就对自己现有的知识进行了一下整理,讲讲自己的理解吧。

Spark是UC Berkeley的AMPLab的BDAS (Berkeley Data Analytics Stack)[1]的一环。BDAS是AMPLab为大数据设计的一套数据处理框架(超级厉害!非常炫酷!),如下图所示。可以看出他们在下一盘很大的棋啊。从应用层到数据处理接口到数据处理框架到存储到资源虚拟化,无所不包。我们关注的Spark就是这个框架中最中间的那个部分,Spark Core(好像TCP/IP五层模型里的IP层有木有!)。这个部分上承MLlib、GraphX等各种数据处理接口,下接Succinct、Alluxio、HDFS等各种数据存储系统。是这其中非常重要的一个环节。

Berkeley Data Analytics Stack[1]

###1 RDD

代码位置:rdd

了解了Spark的位置,我们大概就可以知道Spark作为一个数据处理引擎,起到了承上启下的作用,通用性和速度对它来说是非常重要的。快速也是他区别于其他分布式数据处理引擎例如MapReduce和Dryad的重要方面(当然这两个引擎也非常了不起)。那么为什么Spark这么快呢?答案是弹性分布式数据集 (RDD, Resilient Distributed Datasets)[2]。

弹性分布式数据集(下称RDD)是对分布式内存的一个抽象。看!因为存在内存里,所以快!每一个RDD都由以下五个部分组成。

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

A list of partitions是说数据被划分成一个一个的部分(partitions)。A function for computing each split是说每一个部分(partitions)都可以用一个方法来进行操作,这个方法叫做computer()。A list of dependencies是说这个RDD到其它RDD之间的依赖。至此可以看出RDD就是一个用总体的眼光来看数据划分和每个数据上可以有的动作的集合。各种RDD之间可以以一定的规律相互转化,这些转化形成了RDD之间的依赖。Spark中的各种操作就是通过组合各种不同的RDD来完成的[3]。

在Spark中,RDD之间的依赖分为窄依赖(Narrow Dependencies)和宽依赖(Wide Dependencies)两种[2],如下图。窄依赖类似于MapReduce当中的map操作,一个父RDD只会有唯一的子RDD;宽依赖类似于MapReduce当中的shuffle操作,一个父RDD可以有多个孩子(这个爹好累)。

Spark中的依赖[2]

###2 Scheduler

代码位置:scheduler

####2.1 DAG Scheduler

与MapReduce一样,Spark也会把job先组织成有向无环图(DAG)。那么这个DAG是怎么生成的呢?在Spark里,对RDD的操作分为两种,RDD的转化(transformation)和action(具体的解释可以看[3])。一个action发起了一个DAG的生成,有点DAG终结者的味道。当一个spark job遇到了一个action操作,DAGScheduler会递归往前找最后一个RDD的祖先RDD们,直到找到所有RDD。例如我们来看一个WordCount的DAG(请叫我灵魂画手)。

DAG of WordCount

同一列的圈圈属于同一个RDD的不同partitions。整个WordCount被一个shuffledRDD划分为两个部分,称为两个stage。在往前找祖先节点的时候,每遇到一个宽依赖,都把这个依赖的两边划分为两个不同的Stage。即,从上一个ShuffledRDD的开头(或整个job的开头)到另一个ShuffledRDD的开头为一个stage。我们要求一个Stage全部完成才能够开始下一个Stage,不然子RDD只能接收到一部分数据,没办法计算嘛。最后一个Stage称为ResultStage,其它都称为ShuffleMapStage。

当DAG生成好了之后,就可以从第一个Stage开始计算了。我们首先来看一看一个Stage中涉及到的一些概念。下图画了一个Stage。一横行表示一个RDD,可以看到这个RDD被划分成为了4个部分,一竖列表示RDD之间的转换,可以看到窄依赖的子RDD继承了父RDD的partitions,非常和谐。一竖列的转化可以称为一个Task。一个Task的内部是pipeline的,也就是说,一个task可以自顾自地连续执行,不需要顾及其它Task的感受,依次调用经过RDD的compute函数就可以了。这些task的集合被称为一个TaskSet。在调度的时候,一个TaskSet是被一起丢给调度器的。

A Stage

####2.2 Task Scheduling

一个TaskSet被丢给的调度器就是一个Task Scheduling了。这是一个Task粒度的调度器,负责分配Task给不同的Executor。Task的调度器使用了[4]中的方法,这是一篇Spark作者在10年的EuroSys上发的文章。中心思想是尽量把Task放在RDD prefer的地方(这些地方就是数据所在的地方)。对于宽依赖,还要做一个数据的持久化(MOF),有助于快速从错误中恢复。现有的大部分关于Spark的优化工作都集中在Task Scheduling这个部分。例如,把task交给哪个worker节点以找到一个排队较短的worker节点啊(Sparrow[5]),对慢节点的优化啊(Hopper[6]),把task的调度和数据的调度放在一起考虑啊(Iridium[7]),让同一个stage的task尽量完成时间接近啊([8])等等。

###4 Executor

代码位置:executor

当每一个Task都被分配给一个Executor以后,就可以在这个Executor上执行了,Executor调用task.run()来执行一个Task。task.run()调用相应Task(ShuffleMapTask或ResultTask,都继承Task)的runTask()方法来真正运行。这里是RDD的compute函数真正执行的地方。由于被分配给Executor的是一个Task,因此就不存在什么依赖问题,所有数据像流一样依次通过一系列函数。因此Spark采用的是pipeline的执行方式。也就是说,所有的compute函数一起开始执行,数据只要依次通过这些compute函数就能得到这个task的最终计算结果。接下来,我们分别看一下这两种不同的Task是怎么run的。

无论如何,一个Task要做的第一件事情就是把接收到的rdd或是func反序列化。然后依次去执行RDD的compute就行了。

####4.1 ShuffleMapTask

代码位置:scheduler/ShuffleMapTask.scala

作为ShuffleMapTask,其作用是进行本阶段的计算,并把中间结果写入本地,等待下一个Stage的Task来读取。因此,接下来作为ShuffleMapTask需要创建一个ShuffleWriter,然后计算结果写入本地文件。这里的写法有两种(即有两种writer可以选择),一种是HashShuffleWriter,一种是SortShuffleWriter。选择哪一种是可以根据用户的任务特性自行设置的,设置方式到处都有就不再赘述,总的来说就是如果你的任务需要排序,用Sort会表现比较好,否则用Hash以避免不需要的排序。我们来看一下两种Writer有什么不同。

#####4.1.1 HashShuffleWriter

代码位置:shuffle/hash/HashShuffleWriter.scala

#####4.1.2 SortShuffleWriter

代码位置:shuffle/sort/SortShuffleWriter.scala

####4.2 ResultTask

代码位置:scheduler/ResultTask.scala

###Reference
[1] AMPLab-software
[2] Zaharia M, Chowdhury M, Das T, et al. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation. USENIX Association, 2012: 2-2.
[3] TRANSFORMATIONS AND ACTIONS
[4] Delay Scheduling
[5] Ousterhout K, Wendell P, Zaharia M, et al. Sparrow: distributed, low latency scheduling[C]//Proceedings of the Twenty-Fourth ACM Symposium on Operating Systems Principles. ACM, 2013: 69-84.
[6] Ren X, Ananthanarayanan G, Wierman A, et al. Hopper: Decentralized Speculation-aware Cluster Scheduling at Scale[C]//Proceedings of the 2015 ACM Conference on Special Interest Group on Data Communication. ACM, 2015: 379-392.
[7] Low Latency Geo-distributed Data Analytics
[8] GB/T 7714
Hung C C, Golubchik L, Yu M. Scheduling jobs across geo-distributed datacenters[C]//Proceedings of the Sixth ACM Symposium on Cloud Computing. ACM, 2015