Start from HadoopRDD
我们在之前的文章中介绍过一些RDD的创建,然而并没有解释过各个RDD背后的细节,各个RDD是怎么被拆分到不同的Partition的?拆成几份?每份存在哪?这都是我们还没有解决的问题。本文将从HadoopRDD着手,追溯HadoopRDD的生成过程与实现细节,来看一看Spark中的数据分区是怎么回事。
HadoopRDD在应用程序中调用SparkContext的textFile,从而调用hadoopFile被创建。
textFile: Read a text file from HDFS, a local system(available on all nodes), ir any Hadoop-supported file system URI, and return it as an RDD of Strings.
hadoopFile: Get an RDD for a Hadoop file with an arbitrary InputFormat 不可以直接cache,需要先map,否则会创建很多副本。
先以ctx.textFile为例。
val lines = ctx.textFile(arg(0), 1)
第一个参数是文件的位置,第二个参数是MinPartitions。
============================关于MinPartitions的讨论====================================
我们稍微看一下第二个参数。在SparkContext里有对第二个参数的默认值规定:
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
所以默认是划分成2份的。然后Parallelism也有默认规定。
def defaultParallelism: Int = taskScheduler.defaultParallelism
在TaskSchedulerImpl里有这样的定义:
override def defaultParallelism() = backend.defaultParallelism()
这里的backend是一个SchedulerBackend
。在TaskScheduler
初始化的时候传入,SchedulerBackend
有两种类型,分别是local和cluster的。LocalBackend
对应的是local的。cluster的又有SparkDeploySchedulerBackend
和CoarseGrainedSchedulerBackend
,还有给mesos用的MesosSchedulerBackend
什么的,然后还有个不知道干什么用的SimrSchedulerBackend
。一般我们开启Spark的Stand-alone模式,使用的都是SparkDeploySchedulerBackend
。这里我们主要关注Standalone模式,所以看SparkDeploySchedulerBackend
就好。
于是回到我们对parallelism的疑问,看一下SparkDeploySchedulerBackend.defaultParallelism()
。然后发现这个backend继承自CoarseGrainedSchedulerBackend
:
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
意思就是说如果有设置就读设置啦,没有就选核数和2中较大的那个啦。通俗来说就是有几个核心就分几份。
=======================================讨论结束=========================================
让我们再回到ctx.textFile(arg(0), 1)
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
先生成一个HadoopRDD,然后再通过map操作生成一个MappedRDD,后者就不讲了,小贴一段代码:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
主要还是看刚才那个HadoopRDD。
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* */
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions)
}
首先看注释。这里提到了说在缓存HadoopRDD之前要进行map操作。所以textFile
乖乖照做了。然后首先要广播Hadoop configuration。为什么呢?注释里说因为它的大小是10KB左右,相当大了,所以要broadcast。(什么鬼)总之就是先广播了一下。然后这个广播的返回值是要传给HadoopRDD的。(后面看一下有什么鬼用)
==================================插播一段广播==================================
/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
我们猜一下广播的意思应该是告诉每一个机器应该拿哪一部分数据?不是要从HDFS里取数据么。所以看下广播出去的数据是hadoopConfiguration
。看一下这是什么鬼。
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
val env = SparkEnv.get
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}
好像跟我们猜得不太一样,所以我们的疑问再放一放。
==================================广播结束啦==================================
然后设置了一下文件位置就直接new HadoopRDD
了。既然新建了一个RDD,那么做了什么事情呢?我们知道一个RDD有五个属性。
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list f preferred locations to compute each split on (e.g. block locations for an HDFS file)
首先看第一个属性partitions。Spark会为每个RDD准备一个@transient private var partitions_ : Array[Partition] = null
当第一次调用rdd.partitions(partitionId)
的时候,就会调用子类(例如HadoopRDD)override的getPartitions。
这里我们来看一下HadoopRDD的getPartitions:
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf() //这一句先放一放
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf) //这句见上一个注释
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
val inputSplits = inputFormat.getSplits(jobConf, minPartitions) //看这里看这里
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i)) //看这里看这里
}
array
}
首先要getInputFormat
。InputFormat
这个东西应该是来自Hadoop。课本上说它描述了MapReduce作业数据的输入形式和格式。我们接下来就看一下在Spark里,这东西是怎么用的。首先就是一些设置,我们先放着不看。然后从inputFormat.getSplits
开始看。这个就是把输入拆成一个一个的split,每一个split作为一个task的输入。即task的数量是又InputSplit决定的。然后就新建了inputSplits.size个HadoopPartition(这个类定义在HadoopRDD.scala里)并作为Array返回。所以这货就是Partition最后所指的东西了。
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = 41 * (41 + rddId) + idx
override val index: Int = idx
/**
* Get any environment variables that should be added to the users environment when running pipes
* @return a Map with the environment variables and corresponding values, it could be empty
*/
def getPipeEnvVars(): Map[String, String] = {
val envVars: Map[String, String] = if (inputSplit.value.isInstanceOf[FileSplit]) {
val is: FileSplit = inputSplit.value.asInstanceOf[FileSplit]
// map_input_file is deprecated in favor of mapreduce_map_input_file but set both
// since its not removed yet
Map("map_input_file" -> is.getPath().toString(),
"mapreduce_map_input_file" -> is.getPath().toString())
} else {
Map()
}
envVars
}
}
然后看一下HadoopRDD的compute函数:
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
(key, value)
}
override def close() {
try {
reader.close()
} catch {
case e: Exception => logWarning("Exception in RecordReader.close()", e)
}
}
}
new InterruptibleIterator[(K, V)](context, iter)
}
这个函数由两部分组成,一个是返回值new InterruptibleIterator[(K, V)](context, iter)
,另一个是生成这个返回值iter参数的部分。我们先看一下这个iter是怎么回事。
=============================================关于iter====================================
iter是一个NextIterator类型,这个类型在spark.util.NextIterator中定义。这里进行了一些初始化和getNext()
/close()
函数的override。
首先把theSplit用asInstanceOf转换成HadoopPartition(要求theSplit的类型是HadoopPartition的子类)。去追一下这个split可以知道这是一个Partition,来自Task被创建时候传入的partitionId,然后使用rdd.partitions(partitionId)取出。还记得我们当初创建Task的时候从p = 0->stage.numPartitions创建的吗?然后我们把p传入了Task的参数。就是那个!这时候我们就要把partition根据partitionId也就是p取出了。
然后就可以通过这个iter.getNext()取出这个split中的(key, value)直到取完。
小结一下也就是说,有了iter,集群中的每个机器(准确说是每个内核)只要用iter.getNext()就可以取出HDFS中的
===========================================iter讨论结束===================================
所以HadoopRDD的compute()
就是提供了一个对HDFS中数据的迭代器。