Berkeley曾经发过一篇文章专门解释过RDD是什么。在Spark的源码中也有大段注释解释了什么是RDD。它是一个数据的抽象加上附在它身上的一些操作。以下截取一段RDD.scala中的注释:

Internally, each RDD is characterized by five main properties:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list f preferred locations to compute each split on (e.g. block locations for an HDFS file)

本文主要列举各种RDD以及它们的生成方式。

###AsyncRDDActions

###BlockRDD

###CartesianRDD

###CheckpointRDD

###CoalescedRDD

CoGroupedRDD


A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a tuple with the list of values for that key.

###DoubleRDDFunction

###EmptyRDD

###FilteredRDD

###FlatMappedRDD

###FlatMappedValuesRDD

###GlommedRDD

###HadoopRDD
在应用程序中调用SparkContext的textFile,从而调用hadoopFile被创建。
textFile: Read a text file from HDFS, a local system(available on all nodes), ir any Hadoop-supported file system URI, and return it as an RDD of Strings.
hadoopFile: Get an RDD for a Hadoop file with an arbitrary InputFormat 不可以直接cache,需要先map,否则会创建很多副本。

1
val lines = ctx.textFile(arg(0), 1)

###JdbcRDD

###MapPartitionsRDD

MappedRDD


在通用RDD.map的时候被创建

map: Return a new RDD by applying a function to all elements of this RDD.

MappedValuesRDD


PairRDDFunctions的mapValues创建。

###NewHadoopRDD

###OrderedRDDFunctions

PairRDDFunctions


Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
Import ‘org.apache.spark.SparkContext._’ at the top of your program to use these functions.

所有式样的RDD都可以是这个类型(隐式转换过来的),可以使用这里面的所有方法。其中包括了大部分的涉及shuffle的RDD转换。
combineByKey:
foldByKey:
reduceByKey:
groupByKey

join: Return an RDD containing all pairs of elements with matching keys in ‘this’ and ‘other’. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in ‘this’ and (k, v2) is in ‘other’. Uses the given Partitioner to partition the output RDD.
该函数调用this.cogroup.flatMapValues

cogroup: For each key k in ‘this’ or ‘other’, return a resulting RDD that contains a tuple with the list of values for that key in ‘this’ as well as ‘other’
该函数生成一个CoGroupedRDD
然后该函数会调用cg.mapValues(cg是一个CoGroupedRDD)

subtractByKey

mapValues: Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.
这个函数会生成一个MappedValuesRDD

1
2
3
4
5
val links = lines.map{ s=>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache() //RDD可以随意用PairRDDFunctions
var ranks = links.mapValues(v => 1.0) //生成一个MappedValuesRDD

###ParallelCollectionRDD
在SparkContext中使用parallelize或者makeRDD创建
parallelize: Distribute a local Scala collection to form an RDD
makeRDD: Distribute a local Scala collection to form an RDD, with one or more location preferences(hostnames of Spark nodes) for each object. Create a new partition for each collection item.

SparkPi: 用Monte Carlo method求pi

1
2
3
4
5
val count= spark.parallelize(1 to n, slices).map{ i => //RDD通用的map
val x = random * 2 -1
val y = random *2 - 1
if(x*x+y*y < 1) 1 else 0
}.reduce(_+_)//RDD通用的reduce

其中map/reduce均为通用RDD定义的函数。前者生成一个MappedRDD

###PartitionerAwareUnionRDD

###PartitionPruningRDD

###PartitionwiseSampledRDD

###PipedRDD

###RDDCheckpointData

###SampledRDD

###SequenceFileRDDFunctions

###ShuffledRDD

###SubtractedRDD

###UnionRDD

###ZippedPartitionsRDD

###ZippedRDD

###ZippedWithIndexRDD