Dynamic Resource Allocation
嘛,今天具体就从源码的角度介绍下这个
这货貌似在Spark1.2的时候就被引入了,居然现在才看真是惭愧啊。
ExecutorAllocationManager.scala
在SparkContext
里会调用这个类的start()
|
|
schedule()
在这里
|
|
updateAndSyncNumExecutorsTarget(now)
里调用addExecutors()
,这里是每次翻倍申请Executor的地方。
然后回到最上面的start()
,有一个Runnable()
什么时候run()
呢,可以看到这个Runnable()
被传入了executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
。这里的executor
的定义如下:
|
|
util/ThreadUtils.scala
12345678 > /**> * Wrapper over newSingleThreadExecutor.> */> def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {> val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()> Executors.newSingleThreadExecutor(threadFactory)> }>这应该是一个工厂模式,返回了一个
ExecutorService
。所以executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
应该可以被理解为每隔intervalMillis
时间就被调用一下啦。
|
|
这个时间间隔被定义为了100ms。
我们现在知道了申请Executor的函数会每隔100ms被调用一次,那么这个函数做了什么呢?
如果还有task在排队,并且还没有被调度,就要addExecutors(maxNeeded)
,这里的maxNeeded
取(task的总个数/每个task需要的Executor的个数)上取整。
先看一些默认值表示什么意思,值是多少。
|
|
internal/package.scala
12345678 > private[spark] val DYN_ALLOCATION_MIN_EXECUTORS => ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0)> private[spark] val DYN_ALLOCATION_MAX_EXECUTORS => ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)> private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS => ConfigBuilder("spark.dynamicAllocation.initialExecutors")> .fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS)>util/Utils.scala
123456789101112131415161718192021222324252627 > /**> * Return the initial number of executors for dynamic allocation.> */> def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = {> if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {> logWarning(s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key} less than " +> s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " +> "please update your configs.")> }>> if (conf.get(EXECUTOR_INSTANCES).getOrElse(0) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {> logWarning(s"${EXECUTOR_INSTANCES.key} less than " +> s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " +> "please update your configs.")> }>> val initialExecutors = Seq(> conf.get(DYN_ALLOCATION_MIN_EXECUTORS),> conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),> conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max>> logInfo(s"Using initial executors = $initialExecutors, max of " +> s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, ${DYN_ALLOCATION_MIN_EXECUTORS.key} and " +> s"${EXECUTOR_INSTANCES.key}")> initialExecutors> }>
>
|
|
SparkContext.scala
1234567891011121314151617181920212223242526272829 > /**> * Update the cluster manager on our scheduling needs. Three bits of information are included> * to help it make decisions.> * @param numExecutors The total number of executors we'd like to have. The cluster manager> * shouldn't kill any running executor to reach this number, but,> * if all existing executors were to die, this is the number of executors> * we'd want to be allocated.> * @param localityAwareTasks The number of tasks in all active stages that have a locality> * preferences. This includes running, pending, and completed tasks.> * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages> * that would like to like to run on that host.> * This includes running, pending, and completed tasks.> * @return whether the request is acknowledged by the cluster manager.> */> private[spark] override def requestTotalExecutors(> numExecutors: Int,> localityAwareTasks: Int,> hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]> ): Boolean = {> schedulerBackend match {> // 所以就是去调用CoarseGrainedSchedulerBackend.requestTotalExecutors> case b: CoarseGrainedSchedulerBackend =>> b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)> case _ =>> logWarning("Requesting executors is only supported in coarse-grained mode")> false> }> }>
scheduler/cluster/CoarseGrainedSchedulerBackend.scala
|
|
scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
|
|
所以这整个过程就是在更新executorLimitOption
,等到mesos主动offer资源的时候,可以用到这个值来判断offer的资源够不够用。
其他的内容就要看Mesos的resourceOffer了,就不属于这篇文章的介绍范围啦。再见!