今天是6月1号星期一,农历xxxx(艾玛我不知道)。传说中的小孩子过的一个节日。俗话说得好,每逢佳节倍思亲,等下去给娘亲打个电话就酱。

今天也是每周开组会的日子,机智的我就在刚才已经想好这周组会讲什么了,于是可以淡定地继续写日志了。因为不能絮絮叨叨好好写一篇纯有感而发的日志,所以只能在每篇日志开头絮絮叨叨。

顺便做个小结,一般小结都在最后做,但是因为太!重!要!所以开头说。这个任务执行的过程基本就是如下调用链,我们也会按照这个调用链往下讲:

TaskRunner.run->Task.run->Task.runTask->RDD.iterator->RDD.computeOrReadCheckpoint->RDD.compute

好了,还是接着上次的Spark任务调度,现在要在每个Worker上执行task了。上回我们说到要在启动的Executor上执行任务了,就像这样:

1
2
3
4
5
6
7
8
9
override def run(){
...
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
...
val value = task.run(taskId.toInt)
//然后后面就是序列化结果传出来啊什么的,先放一下。
}

我们先看一下updateDependencies做了什么事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
**/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
if (!urlClassLoader.getURLs.contains(url)) {
logInfo("Adding " + url + " to class loader")
urlClassLoader.addURL(url)
}
}
}
}

注意到这里有一个时间戳的比较,如果当前本地的file或是jar包比依赖的file或jar的时间戳要旧,才会需要更新。

首先是对file的更新。执行了Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager),然后更新一下本地文件的时间戳。

然后是对jar的更新。前三行做了与更新file相同的事情,然后由于是jar包,需要添加到class loader中。

========================================关于Utils.fetchFile=========================================

首先看注释:

Download a file requested by the executor. Supports fetching the file in a variety of ways,
including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.

Throws SparkException if the target file already exists and has different contents than
the requested file.

就是说根据URL的格式来判断是要读一个HTTP还是HDFS还是files on a standard filesystem。然后把这个文件获取出来。所以分了三个case,分别是”http|https|ftp”,”local file”,”HDFS file”。

首先是http|https|ftp。就是建个连接,然后用FileOutputStream把文件读下来。然后是local file,就是执行Files.copy(sourceFile, targetFile)。最后是HDFS file,同样用FileOutputStream把文件下下来。不管是哪种方式都会判断这个文件是否已经存在,如果存在就不需要下载/复制了。

文件读取结束之后,还会解压.tar.gz/.tgz/.tar等格式的文件,然后会赋给文件a+x权限。考虑得真周到です。

========================================Utils.fetchFile结束=========================================

更新完了这些依赖就开始val value = task.run(taskId.toInt)了。这个函数长这样:

1
2
3
4
5
6
7
8
final def run(attemptId: Long): T = {
context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
runTask(context)
}

首先new了一个TaskContext。这个东西可以设置task完成之后调用什么函数啊什么的,不过还么找到在哪设的。然后搞一个线程过来执行程序吧!这里注意到Task.scala其实是一个abstract class。我们需要找到实现这个class的类才能找到runTask的实现。在Spark Scheduling阶段的介绍里我们已经说过,Task总共有两种类型——ShuffleMapTask和ResultTask。最后一个Task是ResultTask,其余的都是ShuffleMapTask。我们分别来看这两种Task都是怎么run的。

###1 仅包含ResultTask的情况(简单得多)###

1
2
3
4
5
6
7
8
override def runTask(context: TaskContext): U = {
metrics = Some(context.taskMetrics)
try {
func(context, rdd.iterator(split, context))
} finally {
context.executeOnCompleteCallbacks()
}
}

就是简单调用一下func(在Master新建ResultTask的时候传入的),然后调用一下context.executeOnCompleteCallbacks()做一些额外的事情,然后结束。这个callback函数在某些RDD里面会定义。比如HadoopRDD

让我们回到最初的那个栗子,由于没有shuffle这个例子就是一个典型的ResultTask:

1
2
val textFile = sc.textFile("readme.md")
textFile.filter(line=>line.contains("spark")).count()

我们回顾一下在Spark Scheduling中把这两行代码转化成了一个ResultTask。看count()操作的定义可以知道,runTask中的func在这里就是Utils.getIteratorSize _看下注释大概意思就是说对iterator中的元素计数。然后count()操作会把这个结果在sum一下,得到所有iterator的元素个数的和,以实现最终的计数功能。

对于RDD的计算是一个递归调用的过程。在最终的runTask里是func(context, rdd.iterator(split, context)),就递归到了rdd.iterator这里的rdd是FilteredRDD,我们从FilteredRDD的compute函数开始看。

1
2
override def compute(split: Partition, context: TaskContext) =
firstParent[T].iterator(split, context).filter(f)

可以知道这里的f是line=>line.contains("spark")。然后我们找到RDD.firstParent的定义:

1
2
3
4
/** Returns the first parent RDD */
protected[spark] def firstParent[U: ClassTag] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}

所以小结一下这里就是先执行MappedRDD.compute然后再对计算结果进行.filter(f),这个f就是line=>line.contains(spark)。ok,搞清楚了就看MappedRDD.compute

1
2
override def compute(split: Partition, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)

那么这个f又是啥?在SparkContext.map()里可以找到答案是pair => pair._2.toString。OK于是往上遍历到了HadoopRDD

我们在这篇文章中说到:HadoopRDD的compute()就是提供了一个对HDFS中数据的迭代器。

所以这里就是对每个split的进行pair=>pair._2.toString送到下一步。也就是说,现在对所有都只取value,并转换成String。

所以到这里我们已经看完了所有计算过程:首先用HadoopRDD得到每个的迭代器,然后进行map操作,保留value,生成MapRDD,然后用filter操作过滤掉不含Spark的行,生成FilteredRDD,最后对每个iterator的结果进行求和,至此runJob结束,再在master上对所有iterator的结果进行求和(sum)。

至此完成了所有任务。

###2 包含ShuffleMapTask的情况###

同样的,我们来看一下ShuffleMapTask的runTask:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
override def runTask(context: TaskContext): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
metrics = Some(context.taskMetrics)
val blockManager = SparkEnv.get.blockManager
val shuffleBlockManager = blockManager.shuffleBlockManager
var shuffle: ShuffleWriterGroup = null
var success = false
try {
// Obtain all the block writers for shuffle blocks.
val ser = Serializer.getSerializer(dep.serializer)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
// Write the map output to its associated buckets.
//就在这里调用rdd.iterator执行啦~~~所以暂停看这句。
for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
shuffle.writers(bucketId).write(pair)
}
// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
var totalTime = 0L
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
}
// Update shuffle metrics.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
success = true
new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
}
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && shuffle.writers != null) {
try {
shuffle.releaseWriters(success)
} catch {
case e: Exception => logError("Failed to release shuffle writers", e)
}
}
// Execute the callbacks on task completion.
context.executeOnCompleteCallbacks()
}
}

首先要获取BlockManager。这里要注意,在整个Spark的执行过程当中,只有SparkEnv.scala中有一个new BlockManager。因此,在所有需要获取数据块的地方,只能从SparkEnv.get.BlockManager拿到BlockManager。然后获取了一个ShuffleBlockManager。注意在当初new ShffleBlockManager的时候传入的参数就是BlockManager.this呢。然后要获取所有block的writer,把每个map output与它们的bucket联系起来。说到bucket,我们可以看一下spark shuffle的过程图。

我们可以看一下spark shuffle的过程:

spark shuffle

每一个map task(这就是为什么这个Task叫ShuffleMapTask)结束之后都要把文件写到对应的bucket里,便于reduce来获取。然后就默默调用RDD.iterator执行啦。

所以来看一下RDD.iterator:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}

这里很容易看就先判断一下是不是已经cache了啊,我们这里就假设么被cache,直接跳到RDD.computeOrReadCheckpoint

1
2
3
4
5
6
7
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}

如果么得checkpoint,直接调用RDD.compute()。这个东西就仁者见仁智者见智了,不同的RDD都由不同的compute()。如果没有猜错,应该是从这个task的最后一个RDD开始啦。

现在我们来举一个复杂一点的栗子(一个带Shuffle的栗子):

1
2
val textFile = sc.textFile("README.md")
textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)

看到这里,我们应该很熟悉textFile操作了,会生成一个HadoopRDD然后再通过map操作变成MappedRDD。所以从flatMap看起就可以了:

1
2
3
4
5
6
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))

然后又生成了一个MappedRDD。由于MappedRDD是类型的RDD,所以可以隐式转换成PairRDDFunctions,执行reduceByKey操作。(只有类型的RDD才可以隐式转换然后用这些shuffle方法,我们在这篇文章中已经说明过了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
/**
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
}
/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C, //mergeValue和mergeCombiner都是(a,b)=>a+b
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
//如果key是array,就不能做map-side combining,为啥捏?
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
//这里要求partitioner不能是HashPartitioner类型(如果key是array)
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) { //这是什么情况?
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
//如果是map端的combine是要new ShuffleRDD的,这个值默认是true的。
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializer)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
//如果不要求进行mapSideCombine
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}

可以看到mapSideCombine这种情况首先combine了一下,然后new ShuffledRDD,然后再combine一下。其它两种情况都是这种情况的子情况。ok,首先combine了一下使用了

1
2
3
self.mapPartitionsWithContext((context, iter)=>{
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)

RDD.scala中mapPartitionsWithContext方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* :: DeveloperApi ::
* Return a new RDD by applying a function to each partition of this RDD. This is a variant of
* mapPartitions that also passes the TaskContext into the closure.
*/
@DeveloperApi
def mapPartitionsWithContext[U: ClassTag](
f: (TaskContext, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}

然后new了一个ShuffledRDD,对这个RDD同样执行了上述方法。

这里要注意new ShuffledRDD的第一个参数传入的是上一个RDD,这对后面ShuffledRDD.getDependencies造成了很大的影响。以至于后面的ShuffleDependency.getRDD其实得到的是ShuffledRDD的上一个RDD。

也就是说reduceByKey分了三个阶段:

  • 转化成MapPartitionsRDD
  • 转化成ShuffledRDD
  • 转化成MapPartitionsRDD

这事情非常符合我们的认知,就是在Map阶段会进行一次combine(其实就是reduce),然后shuffle,然后reduce。

OK,现在问题就转到了我们关心的partition是如何划分的?Shuffle是如何获取数据的?这些问题。

spark scheduling中我们已经说过Spark会根据宽窄依赖来划分Stage,不同的Stage就有不同的task。ShuffledRDD之前的部分会被划分到ShuffleMapTask,而之后的部分会划分到ResultTask。这里我们从ShuffleMapTask看起。

####2.1 ShuffleMapTask####

在这个栗子中,这个阶段有

HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->MapPartitionsRDD

前两个RDD的产生是由于textFile操作,我们已经很熟悉了。FlatMap的意思就是把一个变成, , …在这个栗子里就是把文本划分出一个一个单词。然后又通过map,使得每个单词的计数为1,即变为, , 。然后在reduceByKey阶段生成了MapPartitionsRDD,这才是我们要研究的重点。

这个转换发生在combineByKey函数中。

1
2
3
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)

这里的self就是刚才的MappedRDD本身。根据之前的说明,可以看到这里会生成一个MapPartitionsRDD,我们就从这个RDD的compute函数开始看起。

1
2
override def compute(split: Partition, context: TaskContext) =
f(context, split.index, firstParent[T].iterator(split, context))

这里的f是(context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter),再往上看就是:

(context, iter)=>{aggregator.combineValuesByKey(iter, context)}

Aggregator.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
combiners.iterator
}
}

这个函数就是返回一个combine之后的的迭代器。

得到了map-side combine后的迭代器之后,我们回到ShuffleMapTask的runTask。

1
2
3
4
5
6
// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
shuffle.writers(bucketId).write(pair)
}

scala的官方文档说:

Product2 is a cartesian product of 2 components.

所以pair就是一个笛卡尔积的形式。然后通过pair中的第一个分量,也就是单词,得到bucketId,并把这个pair写到对应的bucket里。

我们已经说过,ShuffledRDD实际上是把ShuffleDependency给上一个RDD,因此这里的dep.partitioner.getPartition得到的是ShuffledRDDPartition

总之就是把这个Task的结果写到bucket里了,下一阶段来读就是了╮(╯▽╰)╭至于是怎么写的,我们将在Spark BlockManager给出详细的解释。

####2.2 ResultTask####

这个阶段有

ShuffledRDD->MapPartitionsRDD

我们去看一下ShuffledRDD的compute函数:

1
2
3
4
5
override def compute(split: Partition, context: TaskContext): Iterator[P] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
val ser = Serializer.getSerializer(serializer)
SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)
}

注意到这里有一个shuffleFetcher,通过(shuffledId, split.index)去远端获取数据了。感觉快要看到我们关心的地方了。

这个ShffleFetcher其实是一个abstract的类,它的子类是BlockStoreShuffleFetcher。现在来看一下这个子类的fetch方法。

========================关于shuffledId和split.index====================

####1 ShuffledId####

早在spark scheduling这篇文章里讲getMissingParentStages的时候,有一个遍历rdd.dependencies的语句。这句话会调用各个rdd override的getDependencies,我们来看一下ShuffledRDD的getDependencies这个函数只会被调用一次

1
2
3
override def getDependencies: Seq[Dependency[_]] = {
List(new ShuffleDependency(prev, part, serializer))
}

再看一下Dependency.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = null)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
val shuffleId: Int = rdd.context.newShuffleId()
rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

这里的rdd.context是一个SparkContext,rdd.context.newShuffleId其实就是在上一个shuffleId的基础上自增1而已。

至此我们已经知道了ShuffledId是在划分stage的时候就分配了的,值是每次自增1的

####2 split.index####

这里的split来自iterator传进的参数,这个iterator往上追溯是在ShuffleMapTask新建的时候给赋值的。

ShuffleMapTask.scala

1
var split = if (rdd == null) null else rdd.partitions(partitionId)

rdd.partitions(partitionId)得到的是一个Partition,这个Partition的index又是从哪来的呢?

我们在start from HadoopRDD中曾经说过,当rdd.partitions第一次被调用的时候,就会调用这个RDD override的getPartitions这里我们假设就是第一次调用吧(因为再也找不到上层了)于是就有了这个结果:

ShufffledRDD.scala

1
2
3
override def getPartitions: Array[Partition] = {
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}

所以说其实ShuffledRDD有其独特的Partition。

1
2
3
4
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
override val index = idx //这就是split.index
override def hashCode(): Int = idx
}

所以split.index就是那个i。就是从0到part.numPartitions的i。

==========================解释结束==========================