我们在之前的文章中介绍过一些RDD的创建,然而并没有解释过各个RDD背后的细节,各个RDD是怎么被拆分到不同的Partition的?拆成几份?每份存在哪?这都是我们还没有解决的问题。本文将从HadoopRDD着手,追溯HadoopRDD的生成过程与实现细节,来看一看Spark中的数据分区是怎么回事。

HadoopRDD在应用程序中调用SparkContext的textFile,从而调用hadoopFile被创建。

textFile: Read a text file from HDFS, a local system(available on all nodes), ir any Hadoop-supported file system URI, and return it as an RDD of Strings.
hadoopFile: Get an RDD for a Hadoop file with an arbitrary InputFormat 不可以直接cache,需要先map,否则会创建很多副本。

先以ctx.textFile为例。

val lines = ctx.textFile(arg(0), 1)

第一个参数是文件的位置,第二个参数是MinPartitions。

============================关于MinPartitions的讨论====================================

我们稍微看一下第二个参数。在SparkContext里有对第二个参数的默认值规定:

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

所以默认是划分成2份的。然后Parallelism也有默认规定。

def defaultParallelism: Int = taskScheduler.defaultParallelism

在TaskSchedulerImpl里有这样的定义:

override def defaultParallelism() = backend.defaultParallelism()

这里的backend是一个SchedulerBackend。在TaskScheduler初始化的时候传入,SchedulerBackend有两种类型,分别是local和cluster的。LocalBackend对应的是local的。cluster的又有SparkDeploySchedulerBackendCoarseGrainedSchedulerBackend,还有给mesos用的MesosSchedulerBackend什么的,然后还有个不知道干什么用的SimrSchedulerBackend。一般我们开启Spark的Stand-alone模式,使用的都是SparkDeploySchedulerBackend。这里我们主要关注Standalone模式,所以看SparkDeploySchedulerBackend就好。

于是回到我们对parallelism的疑问,看一下SparkDeploySchedulerBackend.defaultParallelism()。然后发现这个backend继承自CoarseGrainedSchedulerBackend:

override def defaultParallelism(): Int = {
  conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

意思就是说如果有设置就读设置啦,没有就选核数和2中较大的那个啦。通俗来说就是有几个核心就分几份。

=======================================讨论结束=========================================

让我们再回到ctx.textFile(arg(0), 1)

/**
 * Read a text file from HDFS, a local file system (available on all nodes), or any
 * Hadoop-supported file system URI, and return it as an RDD of Strings.
 */
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString)
}

先生成一个HadoopRDD,然后再通过map操作生成一个MappedRDD,后者就不讲了,小贴一段代码:

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

主要还是看刚才那个HadoopRDD。

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
  *
  * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
  * record, directly caching the returned RDD will create many references to the same object.
  * If you plan to directly cache Hadoop writable objects, you should first copy them using
  * a `map` function.
  * */
def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions
    ): RDD[(K, V)] = {
  // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
  val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
  val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
  new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions)
}

首先看注释。这里提到了说在缓存HadoopRDD之前要进行map操作。所以textFile乖乖照做了。然后首先要广播Hadoop configuration。为什么呢?注释里说因为它的大小是10KB左右,相当大了,所以要broadcast。(什么鬼)总之就是先广播了一下。然后这个广播的返回值是要传给HadoopRDD的。(后面看一下有什么鬼用)

==================================插播一段广播==================================

/**
 * Broadcast a read-only variable to the cluster, returning a
 * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
 * The variable will be sent to each cluster only once.
 */
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
  val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
  cleaner.foreach(_.registerBroadcastForCleanup(bc))
  bc
}

我们猜一下广播的意思应该是告诉每一个机器应该拿哪一部分数据?不是要从HDFS里取数据么。所以看下广播出去的数据是hadoopConfiguration看一下这是什么鬼。

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
  val env = SparkEnv.get
  val hadoopConf = SparkHadoopUtil.get.newConfiguration()
  // Explicitly check for S3 environment variables
  if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
      System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
    hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
    hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
    hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
    hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
  }
  // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
  conf.getAll.foreach { case (key, value) =>
    if (key.startsWith("spark.hadoop.")) {
      hadoopConf.set(key.substring("spark.hadoop.".length), value)
    }
  }
  val bufferSize = conf.get("spark.buffer.size", "65536")
  hadoopConf.set("io.file.buffer.size", bufferSize)
  hadoopConf
}

好像跟我们猜得不太一样,所以我们的疑问再放一放。

==================================广播结束啦==================================

然后设置了一下文件位置就直接new HadoopRDD了。既然新建了一个RDD,那么做了什么事情呢?我们知道一个RDD有五个属性。

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list f preferred locations to compute each split on (e.g. block locations for an HDFS file)

首先看第一个属性partitions。Spark会为每个RDD准备一个@transient private var partitions_ : Array[Partition] = null当第一次调用rdd.partitions(partitionId)的时候,就会调用子类(例如HadoopRDD)override的getPartitions。

这里我们来看一下HadoopRDD的getPartitions:

override def getPartitions: Array[Partition] = {
  val jobConf = getJobConf() //这一句先放一放
  // add the credentials here as this can be called before SparkContext initialized
  SparkHadoopUtil.get.addCredentials(jobConf) //这句见上一个注释
  val inputFormat = getInputFormat(jobConf)
  if (inputFormat.isInstanceOf[Configurable]) {
    inputFormat.asInstanceOf[Configurable].setConf(jobConf)
  }
  val inputSplits = inputFormat.getSplits(jobConf, minPartitions)  //看这里看这里
  val array = new Array[Partition](inputSplits.size)
  for (i <- 0 until inputSplits.size) {
    array(i) = new HadoopPartition(id, i, inputSplits(i)) //看这里看这里
  }
  array
}

首先要getInputFormatInputFormat这个东西应该是来自Hadoop。课本上说它描述了MapReduce作业数据的输入形式和格式。我们接下来就看一下在Spark里,这东西是怎么用的。首先就是一些设置,我们先放着不看。然后从inputFormat.getSplits开始看。这个就是把输入拆成一个一个的split,每一个split作为一个task的输入。即task的数量是又InputSplit决定的。然后就新建了inputSplits.size个HadoopPartition(这个类定义在HadoopRDD.scala里)并作为Array返回。所以这货就是Partition最后所指的东西了。

/**
 * A Spark split class that wraps around a Hadoop InputSplit.
 */
private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
  extends Partition {

  val inputSplit = new SerializableWritable[InputSplit](s)

  override def hashCode(): Int = 41 * (41 + rddId) + idx

  override val index: Int = idx

  /**
   * Get any environment variables that should be added to the users environment when running pipes
   * @return a Map with the environment variables and corresponding values, it could be empty
   */
  def getPipeEnvVars(): Map[String, String] = {
    val envVars: Map[String, String] = if (inputSplit.value.isInstanceOf[FileSplit]) {
      val is: FileSplit = inputSplit.value.asInstanceOf[FileSplit]
      // map_input_file is deprecated in favor of mapreduce_map_input_file but set both
      // since its not removed yet
      Map("map_input_file" -> is.getPath().toString(),
        "mapreduce_map_input_file" -> is.getPath().toString())
    } else {
      Map()
    }
    envVars
  }
}

然后看一下HadoopRDD的compute函数:

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
  val iter = new NextIterator[(K, V)] {

    val split = theSplit.asInstanceOf[HadoopPartition]
    logInfo("Input split: " + split.inputSplit)
    var reader: RecordReader[K, V] = null
    val jobConf = getJobConf()
    val inputFormat = getInputFormat(jobConf)
    HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
      context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
    reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

    // Register an on-task-completion callback to close the input stream.
    context.addOnCompleteCallback{ () => closeIfNeeded() }
    val key: K = reader.createKey()
    val value: V = reader.createValue()
    override def getNext() = {
      try {
        finished = !reader.next(key, value)
      } catch {
        case eof: EOFException =>
          finished = true
      }
      (key, value)
    }

    override def close() {
      try {
        reader.close()
      } catch {
        case e: Exception => logWarning("Exception in RecordReader.close()", e)
      }
    }
  }
  new InterruptibleIterator[(K, V)](context, iter)
}

这个函数由两部分组成,一个是返回值new InterruptibleIterator[(K, V)](context, iter),另一个是生成这个返回值iter参数的部分。我们先看一下这个iter是怎么回事。

=============================================关于iter====================================

iter是一个NextIterator类型,这个类型在spark.util.NextIterator中定义。这里进行了一些初始化和getNext()/close()函数的override。

首先把theSplit用asInstanceOf转换成HadoopPartition(要求theSplit的类型是HadoopPartition的子类)。去追一下这个split可以知道这是一个Partition,来自Task被创建时候传入的partitionId,然后使用rdd.partitions(partitionId)取出。还记得我们当初创建Task的时候从p = 0->stage.numPartitions创建的吗?然后我们把p传入了Task的参数。就是那个!这时候我们就要把partition根据partitionId也就是p取出了。

然后就可以通过这个iter.getNext()取出这个split中的(key, value)直到取完。

小结一下也就是说,有了iter,集群中的每个机器(准确说是每个内核)只要用iter.getNext()就可以取出HDFS中的了。

===========================================iter讨论结束===================================

所以HadoopRDD的compute()就是提供了一个对HDFS中数据的迭代器。