今天是6月1号星期一,农历xxxx(艾玛我不知道)。传说中的小孩子过的一个节日。俗话说得好,每逢佳节倍思亲,等下去给娘亲打个电话就酱。
今天也是每周开组会的日子,机智的我就在刚才已经想好这周组会讲什么了,于是可以淡定地继续写日志了。因为不能絮絮叨叨好好写一篇纯有感而发的日志,所以只能在每篇日志开头絮絮叨叨。
顺便做个小结,一般小结都在最后做,但是因为太!重!要!所以开头说。这个任务执行的过程基本就是如下调用链,我们也会按照这个调用链往下讲:
TaskRunner.run->Task.run->Task.runTask->RDD.iterator->RDD.computeOrReadCheckpoint->RDD.compute
好了,还是接着上次的Spark任务调度,现在要在每个Worker上执行task了。上回 我们说到要在启动的Executor上执行任务了,就像这样:
1
2
3
4
5
6
7
8
9
override def run (){
...
val (taskFiles, taskJars, taskBytes) = Task .deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task [Any ]](taskBytes, Thread .currentThread.getContextClassLoader)
...
val value = task.run(taskId.toInt)
}
我们先看一下updateDependencies
做了什么事情。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private def updateDependencies (newFiles: HashMap [String , Long ], newJars: HashMap [String , Long ]) {
synchronized {
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1 L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils .fetchFile(name, new File (SparkFiles .getRootDirectory), conf, env.securityManager)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1 L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils .fetchFile(name, new File (SparkFiles .getRootDirectory), conf, env.securityManager)
currentJars(name) = timestamp
val localName = name.split("/" ).last
val url = new File (SparkFiles .getRootDirectory, localName).toURI.toURL
if (!urlClassLoader.getURLs.contains(url)) {
logInfo("Adding " + url + " to class loader" )
urlClassLoader.addURL(url)
}
}
}
}
注意到这里有一个时间戳的比较,如果当前本地的file或是jar包比依赖的file或jar的时间戳要旧,才会需要更新。
首先是对file的更新。执行了Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
,然后更新一下本地文件的时间戳。
然后是对jar的更新。前三行做了与更新file相同的事情,然后由于是jar包,需要添加到class loader中。
========================================关于Utils.fetchFile =========================================
首先看注释:
Download a file requested by the executor. Supports fetching the file in a variety of ways, including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
Throws SparkException if the target file already exists and has different contents than the requested file.
就是说根据URL的格式来判断是要读一个HTTP还是HDFS还是files on a standard filesystem。然后把这个文件获取出来。所以分了三个case,分别是”http|https|ftp”,”local file”,”HDFS file”。
首先是http|https|ftp
。就是建个连接,然后用FileOutputStream
把文件读下来。然后是local file
,就是执行Files.copy(sourceFile, targetFile)
。最后是HDFS file
,同样用FileOutputStream
把文件下下来。不管是哪种方式都会判断这个文件是否已经存在,如果存在就不需要下载/复制了。
文件读取结束之后,还会解压.tar.gz/.tgz/.tar等格式的文件,然后会赋给文件a+x
权限。考虑得真周到です。
========================================Utils.fetchFile结束 =========================================
更新完了这些依赖就开始val value = task.run(taskId.toInt)
了。这个函数长这样:
1
2
3
4
5
6
7
8
final def run (attemptId: Long ): T = {
context = new TaskContext (stageId, partitionId, attemptId, runningLocally = false )
taskThread = Thread .currentThread()
if (_killed) {
kill(interruptThread = false )
}
runTask(context)
}
首先new了一个TaskContext
。这个东西可以设置task完成之后调用什么函数 啊什么的,不过还么找到在哪设的。然后搞一个线程过来执行程序吧!这里注意到Task.scala其实是一个abstract class
。我们需要找到实现这个class的类才能找到runTask
的实现。在Spark Scheduling 阶段的介绍里我们已经说过,Task总共有两种类型——ShuffleMapTask和ResultTask。最后一个Task是ResultTask,其余的都是ShuffleMapTask。我们分别来看这两种Task都是怎么run的。
###1 仅包含ResultTask的情况(简单得多)###
1
2
3
4
5
6
7
8
override def runTask (context: TaskContext ): U = {
metrics = Some (context.taskMetrics)
try {
func(context, rdd.iterator(split, context))
} finally {
context.executeOnCompleteCallbacks()
}
}
就是简单调用一下func(在Master新建ResultTask的时候传入的),然后调用一下context.executeOnCompleteCallbacks()
做一些额外的事情,然后结束。这个callback函数在某些RDD里面会定义。比如HadoopRDD
。
让我们回到最初的那个栗子,由于没有shuffle这个例子就是一个典型的ResultTask:
1
2
val textFile = sc.textFile("readme.md" )
textFile.filter(line=>line.contains("spark" )).count()
我们回顾一下在Spark Scheduling 中把这两行代码转化成了一个ResultTask。看count()
操作的定义可以知道,runTask中的func在这里就是Utils.getIteratorSize _
看下注释大概意思就是说对iterator中的元素计数。然后count()
操作会把这个结果在sum一下,得到所有iterator的元素个数的和,以实现最终的计数功能。
对于RDD的计算是一个递归调用的过程。在最终的runTask里是func(context, rdd.iterator(split, context))
,就递归到了rdd.iterator
这里的rdd是FilteredRDD,我们从FilteredRDD的compute函数开始看。
1
2
override def compute (split: Partition , context: TaskContext ) =
firstParent[T ].iterator(split, context).filter(f)
可以知道这里的f是line=>line.contains("spark")
。然后我们找到RDD.firstParent
的定义:
1
2
3
4
protected [spark] def firstParent [U : ClassTag ] = {
dependencies.head.rdd.asInstanceOf[RDD [U ]]
}
所以小结一下这里就是先执行MappedRDD.compute
然后再对计算结果进行.filter(f)
,这个f
就是line=>line.contains(spark)
。ok,搞清楚了就看MappedRDD.compute
1
2
override def compute (split: Partition , context: TaskContext ) =
firstParent[T ].iterator(split, context).map(f)
那么这个f
又是啥?在SparkContext.map()
里可以找到答案是pair => pair._2.toString
。OK于是往上遍历到了HadoopRDD
。
我们在这篇文章 中说到:HadoopRDD的compute()
就是提供了一个对HDFS中数据的迭代器。
所以这里就是对每个split的进行pair=>pair._2.toString送到下一步。也就是说,现在对所有都只取value,并转换成String。
所以到这里我们已经看完了所有计算过程:首先用HadoopRDD得到每个的迭代器,然后进行map操作,保留value,生成MapRDD,然后用filter操作过滤掉不含Spark的行,生成FilteredRDD,最后对每个iterator的结果进行求和,至此runJob结束,再在master上对所有iterator的结果进行求和(sum)。
至此完成了所有任务。
###2 包含ShuffleMapTask的情况###
同样的,我们来看一下ShuffleMapTask的runTask:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
override def runTask (context: TaskContext ): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
metrics = Some (context.taskMetrics)
val blockManager = SparkEnv .get.blockManager
val shuffleBlockManager = blockManager.shuffleBlockManager
var shuffle: ShuffleWriterGroup = null
var success = false
try {
val ser = Serializer .getSerializer(dep.serializer)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2 [Any , Any ]]
val bucketId = dep.partitioner.getPartition(pair._1)
shuffle.writers(bucketId).write(pair)
}
var totalBytes = 0 L
var totalTime = 0 L
val compressedSizes: Array [Byte ] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker .compressSize(size)
}
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some (shuffleMetrics)
success = true
new MapStatus (blockManager.blockManagerId, compressedSizes)
} catch { case e: Exception =>
if (shuffle != null && shuffle.writers != null ) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
}
}
throw e
} finally {
if (shuffle != null && shuffle.writers != null ) {
try {
shuffle.releaseWriters(success)
} catch {
case e: Exception => logError("Failed to release shuffle writers" , e)
}
}
context.executeOnCompleteCallbacks()
}
}
首先要获取BlockManager
。这里要注意,在整个Spark的执行过程当中,只有SparkEnv.scala
中有一个new BlockManager
。因此,在所有需要获取数据块的地方,只能从SparkEnv.get.BlockManager
拿到BlockManager
。然后获取了一个ShuffleBlockManager
。注意在当初new ShffleBlockManager
的时候传入的参数就是BlockManager.this
呢。然后要获取所有block的writer,把每个map output与它们的bucket联系起来。说到bucket,我们可以看一下spark shuffle的过程图。
我们可以看一下spark shuffle的过程:
每一个map task(这就是为什么这个Task叫ShuffleMapTask)结束之后都要把文件写到对应的bucket里,便于reduce来获取。然后就默默调用RDD.iterator
执行啦。
所以来看一下RDD.iterator
:
1
2
3
4
5
6
7
8
9
10
11
12
final def iterator (split: Partition , context: TaskContext ): Iterator [T ] = {
if (storageLevel != StorageLevel .NONE ) {
SparkEnv .get.cacheManager.getOrCompute(this , split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
这里很容易看就先判断一下是不是已经cache了啊,我们这里就假设么被cache,直接跳到RDD.computeOrReadCheckpoint
。
1
2
3
4
5
6
7
private [spark] def computeOrReadCheckpoint (split: Partition , context: TaskContext ): Iterator [T ] =
{
if (isCheckpointed) firstParent[T ].iterator(split, context) else compute(split, context)
}
如果么得checkpoint,直接调用RDD.compute()
。这个东西就仁者见仁智者见智了,不同的RDD都由不同的compute()
。如果没有猜错,应该是从这个task的最后一个RDD开始啦。
现在我们来举一个复杂一点的栗子(一个带Shuffle的栗子):
1
2
val textFile = sc.textFile("README.md" )
textFile.flatMap(line => line.split(" " )).map(word => (word, 1 )).reduceByKey((a, b) => a + b)
看到这里,我们应该很熟悉textFile操作了,会生成一个HadoopRDD然后再通过map操作变成MappedRDD。所以从flatMap看起就可以了:
1
2
3
4
5
6
def flatMap [U : ClassTag ](f: T => TraversableOnce [U ]): RDD [U ] =
new FlatMappedRDD (this , sc.clean(f))
然后又生成了一个MappedRDD。由于MappedRDD是类型的RDD,所以可以隐式转换成PairRDDFunctions
,执行reduceByKey
操作。(只有类型的RDD才可以隐式转换然后用这些shuffle方法,我们在这篇文章 中已经说明过了)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def reduceByKey (partitioner: Partitioner , func: (V , V ) => V ): RDD [(K , V )] = {
combineByKey[V ]((v: V ) => v, func, func, partitioner)
}
def combineByKey [C ](createCombiner: V => C ,
mergeValue: (C , V ) => C ,
mergeCombiners: (C , C ) => C ,
numPartitions: Int ): RDD [(K , C )] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner (numPartitions))
}
def combineByKey [C ](createCombiner: V => C ,
mergeValue: (C , V ) => C ,
mergeCombiners: (C , C ) => C ,
partitioner: Partitioner ,
mapSideCombine: Boolean = true ,
serializer: Serializer = null ): RDD [(K , C )] = {
require(mergeCombiners != null , "mergeCombiners must be defined" )
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException ("Cannot use map-side combining with array keys." )
}
if (partitioner.isInstanceOf[HashPartitioner ]) {
throw new SparkException ("Default partitioner cannot partition array keys." )
}
}
val aggregator = new Aggregator [K , V , C ](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some (partitioner)) {
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator (context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true )
} else if (mapSideCombine) {
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true )
val partitioned = new ShuffledRDD [K , C , (K , C )](combined, partitioner)
.setSerializer(serializer)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator (context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true )
} else {
val values = new ShuffledRDD [K , V , (K , V )](self, partitioner).setSerializer(serializer)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator (context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true )
}
}
可以看到mapSideCombine这种情况首先combine了一下,然后new ShuffledRDD,然后再combine一下。其它两种情况都是这种情况的子情况。ok,首先combine了一下使用了
1
2
3
self.mapPartitionsWithContext((context, iter)=>{
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true )
RDD.scala中mapPartitionsWithContext
方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
@DeveloperApi
def mapPartitionsWithContext [U : ClassTag ](
f: (TaskContext , Iterator [T ]) => Iterator [U ],
preservesPartitioning: Boolean = false ): RDD [U ] = {
val func = (context: TaskContext , index: Int , iter: Iterator [T ]) => f(context, iter)
new MapPartitionsRDD (this , sc.clean(func), preservesPartitioning)
}
然后new了一个ShuffledRDD,对这个RDD同样执行了上述方法。
这里要注意new ShuffledRDD的第一个参数传入的是上一个RDD,这对后面ShuffledRDD.getDependencies造成了很大的影响。以至于后面的ShuffleDependency.getRDD其实得到的是ShuffledRDD的上一个RDD。
也就是说reduceByKey分了三个阶段:
转化成MapPartitionsRDD
转化成ShuffledRDD
转化成MapPartitionsRDD
这事情非常符合我们的认知,就是在Map阶段会进行一次combine(其实就是reduce),然后shuffle,然后reduce。
OK,现在问题就转到了我们关心的partition是如何划分的?Shuffle是如何获取数据的?这些问题。
在spark scheduling 中我们已经说过Spark会根据宽窄依赖来划分Stage,不同的Stage就有不同的task。ShuffledRDD之前的部分会被划分到ShuffleMapTask,而之后的部分会划分到ResultTask。这里我们从ShuffleMapTask看起。
####2.1 ShuffleMapTask####
在这个栗子中,这个阶段有
HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->MapPartitionsRDD
前两个RDD的产生是由于textFile操作,我们已经很熟悉了。FlatMap的意思就是把一个变成, , …在这个栗子里就是把文本划分出一个一个单词。然后又通过map,使得每个单词的计数为1,即变为, , 。然后在reduceByKey阶段生成了MapPartitionsRDD,这才是我们要研究的重点。
这个转换发生在combineByKey函数中。
1
2
3
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true )
这里的self就是刚才的MappedRDD本身。根据之前的说明,可以看到这里会生成一个MapPartitionsRDD
,我们就从这个RDD的compute函数开始看起。
1
2
override def compute (split: Partition , context: TaskContext ) =
f(context, split.index, firstParent[T ].iterator(split, context))
这里的f是(context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter),再往上看就是:
(context, iter)=>{aggregator.combineValuesByKey(iter, context)}
Aggregator.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def combineValuesByKey (iter: Iterator [_ <: Product2 [K , V ]],
context: TaskContext ): Iterator [(K , C )] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap [K ,C ]
var kv: Product2 [K , V ] = null
val update = (hadValue: Boolean , oldValue: C ) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
} else {
val combiners = new ExternalAppendOnlyMap [K , V , C ](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
Option (context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option (context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
combiners.iterator
}
}
这个函数就是返回一个combine之后的的迭代器。
得到了map-side combine后的迭代器之后,我们回到ShuffleMapTask的runTask。
1
2
3
4
5
6
for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2 [Any , Any ]]
val bucketId = dep.partitioner.getPartition(pair._1)
shuffle.writers(bucketId).write(pair)
}
scala的官方文档说:
Product2 is a cartesian product of 2 components.
所以pair就是一个笛卡尔积的形式。然后通过pair中的第一个分量,也就是单词,得到bucketId,并把这个pair写到对应的bucket里。
我们已经说过,ShuffledRDD实际上是把ShuffleDependency给上一个RDD,因此这里的dep.partitioner.getPartition得到的是ShuffledRDDPartition
总之就是把这个Task的结果写到bucket里了,下一阶段来读就是了╮(╯▽╰)╭至于是怎么写的,我们将在Spark BlockManager 给出详细的解释。
####2.2 ResultTask####
这个阶段有
ShuffledRDD->MapPartitionsRDD
我们去看一下ShuffledRDD的compute函数:
1
2
3
4
5
override def compute (split: Partition , context: TaskContext ): Iterator [P ] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency [K , V ]].shuffleId
val ser = Serializer .getSerializer(serializer)
SparkEnv .get.shuffleFetcher.fetch[P ](shuffledId, split.index, context, ser)
}
注意到这里有一个shuffleFetcher,通过(shuffledId, split.index)去远端获取数据了。感觉快要看到我们关心的地方了。
这个ShffleFetcher其实是一个abstract的类,它的子类是BlockStoreShuffleFetcher
。现在来看一下这个子类的fetch
方法。
========================关于shuffledId和split.index ====================
####1 ShuffledId####
早在spark scheduling 这篇文章里讲getMissingParentStages
的时候,有一个遍历rdd.dependencies
的语句。这句话会调用各个rdd override的getDependencies
,我们来看一下ShuffledRDD的getDependencies
(这个函数只会被调用一次 )
1
2
3
override def getDependencies : Seq [Dependency [_]] = {
List (new ShuffleDependency (prev, part, serializer))
}
再看一下Dependency.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@DeveloperApi
class ShuffleDependency [K , V ](
@transient rdd: RDD [_ <: Product2 [K , V ]],
val partitioner: Partitioner ,
val serializer: Serializer = null )
extends Dependency (rdd.asInstanceOf[RDD [Product2 [K , V ]]] ) {
val shuffleId: Int = rdd.context.newShuffleId()
rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this ))
}
这里的rdd.context是一个SparkContext,rdd.context.newShuffleId其实就是在上一个shuffleId的基础上自增1而已。
至此我们已经知道了ShuffledId是在划分stage的时候就分配了的,值是每次自增1的 。
####2 split.index####
这里的split来自iterator传进的参数,这个iterator往上追溯是在ShuffleMapTask新建的时候给赋值的。
ShuffleMapTask.scala
1
var split = if (rdd == null ) null else rdd.partitions(partitionId)
rdd.partitions(partitionId)得到的是一个Partition,这个Partition的index又是从哪来的呢?
我们在start from HadoopRDD 中曾经说过,当rdd.partitions
第一次被调用的时候,就会调用这个RDD override的getPartitions
。这里我们假设就是第一次调用吧(因为再也找不到上层了) 于是就有了这个结果:
ShufffledRDD.scala
1
2
3
override def getPartitions : Array [Partition ] = {
Array .tabulate[Partition ](part.numPartitions)(i => new ShuffledRDDPartition (i))
}
所以说其实ShuffledRDD有其独特的Partition。
1
2
3
4
private [spark] class ShuffledRDDPartition (val idx: Int ) extends Partition {
override val index = idx
override def hashCode (): Int = idx
}
所以split.index就是那个i。就是从0到part.numPartitions的i。
==========================解释结束 ==========================