

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

其中的依赖关系指该RDD的每个分区计算时依赖哪些父RDD的分区,这些信息对于计算任务的调度、容错都非常重要。而根据不同的依赖关系,Spark将其划分为两种类型:窄依赖(narrow dependencies),父RDD的分区最多被一个子RDD的分区依赖;宽依赖(wide dependencies),子RDD的分区依赖于多个父RDD分区。进行依赖关系区分的意义在于:①窄依赖允许在一个集群结点上流水执行所有的计算,而宽依赖则需要首先计算好所有的父RDD数据,然后再在结点之间进行Shuffle操作;②在结点失效后进行恢复时,窄依赖会更加高效,只需要计算丢失的父分区即可,并且不同分区可以在多个结点上并行计算,而一个宽依赖的分区丢失时可能会导致整个过程的重新计算。



 * :: DeveloperApi ::
 * Base class for dependencies.
abstract class Dependency[T] extends Serializable {
    def rdd: RDD[T]


abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {


  // Our dependencies and partitions will be gotten by calling subclass's methods
  // below, and will be overwritten when we're checkpointed
  private var dependencies_ : Seq[Dependency[_]] = null
  @transient private var partitions_ : Array[Partition] = null


   * Implemented by subclasses to return the set of partitions in this RDD. This
   * method will only be called once, so it is safe to implement a time-consuming
   * computation in it.
  protected def getPartitions: Array[Partition]

   * Implemented by subclasses to return how this RDD depends on parent RDDs. This
   * method will only be called once, so it is safe to implement a time-consuming
   * computation in it.
  protected def getDependencies: Seq[Dependency[_]] = deps


   * Get the list of dependencies of this RDD, taking into account whether the
   * RDD is checkpointed or not.
  final def dependencies: Seq[Dependency[_]] = {
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
      if (dependencies_ == null) {
        dependencies_ = getDependencies

   * Get the array of partitions of this RDD, taking into account whether the
   * RDD is checkpointed or not.
  final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
      if (partitions_ == null) {
        partitions_ = getPartitions


Spark中用Dependency类来表示在经过转换操作后,父RDD与子RDD之间的联系(依赖关系)。如上所述,Dependency是一个只包含一个def rdd: RDD[T]方法的抽象类。它有如下几种派生类:

  • NarrowDependency
    • OneToOneDependency
    • PruneDependency
    • RangeDependency
  • ShuffleDependency


ShuffleDependency是一种会导致shuffle map stage(是一个在执行DAG时会执行shuffle操作的中间阶段,是后续阶段的输入)的依赖关系。

一个ShuffleDependency应该属于一个key-value对RDD(其rdd成员的类型为RDD[Product2[K, V]])。而每一个ShuffleDependency对象都有一个shuffleId



  • 当多个RDD的partitioner不同时,生成CoGroupedRDDSubtractedRDD
  • 通过shuffle操作生成的ShuffledRDDShuffledRowRDD


  • coalesce
    • repartition
  • cogroup
    • intersection
  • subtractByKey
    • substract
  • sortByKey
    • sortBy
  • repartitionAndSortWithinPartitions
  • combineByKeyWithClassTag
    • combineByKey
    • aggregateByKey
    • foldByKey
    • reduceByKey
    • countApproxDistinctByKey
    • groupByKey
  • partitionBy


Spark内部实现的RDD派生类有ParallelCollectionRDD, CoGroupedRDD, HadoopRDD, MapPartitionsRDD, CoalescedRDD, ShuffledRDD, PairRDD, UnionRDD等。





 * An RDD that applies the provided function to every partition of the parent RDD.
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

  override def clearDependencies() {
    prev = null

结果是MapPartitionsRDD的转换操作有:map, flatMap, filter, glom, mapPartitions, mapPartitionsWithIndex, PairRDDFunctions.mapValues, PairRDDFunctions.flatMapValues。

我们选择观察map和filter函数的实现,分析MapPartitionsRDD的生成过程。在RDD类中有map及filter函数的实现,其代码如下。可见,其生成过程为:创建一个MapPartitionsRDD对象,并将函数f作为参数传入,并将f变形为对一个分区的元素迭代调用f(map地调用或filter地调用), 在真正(lazy compute)计算时,调用compute函数,对每个分区执行f计算。

 * Return a new RDD by applying a function to all elements of this RDD.
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))


 * Return a new RDD containing only the elements that satisfy a predicate.
def filter(f: T => Boolean): RDD[T] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[T, T](
    (context, pid, iter) => iter.filter(cleanF),
    preservesPartitioning = true)

02 November 2016
