嘛,今天具体就从源码的角度介绍下这个

这货貌似在Spark1.2的时候就被引入了,居然现在才看真是惭愧啊。

ExecutorAllocationManager.scala

SparkContext里会调用这个类的start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}

schedule()在这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* This is called at a fixed interval to regulate the number of pending executor requests
* and number of executors running.
*
* First, adjust our requested executors based on the add time and our current needs.
* Then, if the remove time for an existing executor has expired, kill the executor.
*
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
// 这里的removeTimes是一个HashMap[String, Long](<ExecutorId, expireTime>)
// 超时未被占用,移除Executor
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
}
!expired
}
}

updateAndSyncNumExecutorsTarget(now)里调用addExecutors(),这里是每次翻倍申请Executor的地方。

然后回到最上面的start(),有一个Runnable()什么时候run()呢,可以看到这个Runnable()被传入了executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)。这里的executor的定义如下:

1
2
3
// Executor that handles the scheduling task.
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")

util/ThreadUtils.scala

1
2
3
4
5
6
7
8
> /**
> * Wrapper over newSingleThreadExecutor.
> */
> def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
> val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
> Executors.newSingleThreadExecutor(threadFactory)
> }
>

这应该是一个工厂模式,返回了一个ExecutorService。所以executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)应该可以被理解为每隔intervalMillis时间就被调用一下啦。

1
2
// Polling loop interval (ms)
private val intervalMillis: Long = 100

这个时间间隔被定义为了100ms。

我们现在知道了申请Executor的函数会每隔100ms被调用一次,那么这个函数做了什么呢?

如果还有task在排队,并且还没有被调度,就要addExecutors(maxNeeded),这里的maxNeeded取(task的总个数/每个task需要的Executor的个数)上取整。

先看一些默认值表示什么意思,值是多少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Lower and upper bounds on the number of executors.
//最小的Executor数量,默认为0
private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
//最大的Executor数量,默认为Int.MaxValue
private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
//初始请求Executor数量,为max{DYN_ALLOCATION_MIN_EXECUTORS,DYN_ALLOCATION_INITIAL_EXECUTORS(默认为DYN_ALLOCATION_MIN_EXECUTORS),EXECUTOR_INSTANCES(默认为0)}
private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
// The desired number of executors at this moment in time. If all our executors were to die, this
// is the number of executors we would immediately want from the cluster manager.
private var numExecutorsTarget = initialNumExecutors
// Number of executors to add in the next round
private var numExecutorsToAdd = 1

internal/package.scala

1
2
3
4
5
6
7
8
> private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
> ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0)
> private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
> ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
> private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS =
> ConfigBuilder("spark.dynamicAllocation.initialExecutors")
> .fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS)
>

util/Utils.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
> /**
> * Return the initial number of executors for dynamic allocation.
> */
> def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = {
> if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
> logWarning(s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key} less than " +
> s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " +
> "please update your configs.")
> }
>
> if (conf.get(EXECUTOR_INSTANCES).getOrElse(0) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
> logWarning(s"${EXECUTOR_INSTANCES.key} less than " +
> s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " +
> "please update your configs.")
> }
>
> val initialExecutors = Seq(
> conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
> conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
> conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
>
> logInfo(s"Using initial executors = $initialExecutors, max of " +
> s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, ${DYN_ALLOCATION_MIN_EXECUTORS.key} and " +
> s"${EXECUTOR_INSTANCES.key}")
> initialExecutors
> }
>

>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset the
* number of executors to add next round instead of continuing to double it.
*
* @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
* tasks could fill
* @return the number of additional executors actually requested.
*/
// 这里的参数maxNumExecutorNeeded = ceil(pending的task个数/每个task需要的Executor个数)
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
if (numExecutorsTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because our current target total " +
s"is already $numExecutorsTarget (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
val oldNumExecutorsTarget = numExecutorsTarget
// There's no point in wasting time ramping up to the number of executors we already have, so
// make sure our target is at least as much as our current allocation:
numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
// Boost our target with the number to add for this round:
numExecutorsTarget += numExecutorsToAdd
// Ensure that our target doesn't exceed what we need at the present moment:
// 保证请求不会超过现在pending的task所需要的Executor的个数
numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
// Ensure that our target fits within configured bounds:
// 保证不会超过设置最大的个数,不会小于设置最小的个数
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
// 现在需要的和上一把需要的差值(这一把多加了numExecutorsToAdd),
// 也就是这一把需要多申请delta个Executor
val delta = numExecutorsTarget - oldNumExecutorsTarget
// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
numExecutorsToAdd = 1
return 0
}
// 这里的client是SparkContext.this
val addRequestAcknowledged = testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
if (addRequestAcknowledged) {
val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
1
}
delta
} else {
logWarning(
s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
numExecutorsTarget = oldNumExecutorsTarget
0
}
}

SparkContext.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
> /**
> * Update the cluster manager on our scheduling needs. Three bits of information are included
> * to help it make decisions.
> * @param numExecutors The total number of executors we'd like to have. The cluster manager
> * shouldn't kill any running executor to reach this number, but,
> * if all existing executors were to die, this is the number of executors
> * we'd want to be allocated.
> * @param localityAwareTasks The number of tasks in all active stages that have a locality
> * preferences. This includes running, pending, and completed tasks.
> * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
> * that would like to like to run on that host.
> * This includes running, pending, and completed tasks.
> * @return whether the request is acknowledged by the cluster manager.
> */
> private[spark] override def requestTotalExecutors(
> numExecutors: Int,
> localityAwareTasks: Int,
> hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
> ): Boolean = {
> schedulerBackend match {
> // 所以就是去调用CoarseGrainedSchedulerBackend.requestTotalExecutors
> case b: CoarseGrainedSchedulerBackend =>
> b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
> case _ =>
> logWarning("Requesting executors is only supported in coarse-grained mode")
> false
> }
> }
>

scheduler/cluster/CoarseGrainedSchedulerBackend.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final override def requestTotalExecutors(
numExecutors: Int, // 表示当前一共要多少Executor(包括已经申请的吧)
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]
): Boolean = synchronized {
if (numExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of executor(s) " +
s"$numExecutors from the cluster manager. Please specify a positive number!")
}
this.localityAwareTasks = localityAwareTasks
this.hostToLocalTaskCount = hostToLocalTaskCount
numPendingExecutors =
math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
// 调用子类函数
doRequestTotalExecutors(numExecutors)
}

scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

1
2
3
4
5
6
7
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
// We don't truly know if we can fulfill the full amount of executors
// since at coarse grain it depends on the amount of slaves available.
logInfo("Capping the total amount of executors to " + requestedTotal)
executorLimitOption = Some(requestedTotal)
true
}

所以这整个过程就是在更新executorLimitOption,等到mesos主动offer资源的时候,可以用到这个值来判断offer的资源够不够用。

其他的内容就要看Mesos的resourceOffer了,就不属于这篇文章的介绍范围啦。再见!