RDD总结

抽象类RDD源码分析

基本定义

1
2
3
4
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging

变量定义

  • id: 当前RDD的唯一身份标识, val id: Int = sc.newRddId()

  • storageLevel: 当前RDD的存储级别, private var storageLevel: StorageLevel = StorageLevel.NONE

  • creationSite: 创建当前RDD的用户代码(e.g. textFile, parallelize)

  • scope: 当前RDD的操作作用域, 有一个withScope调用, 就像是一个 AOP,嵌入到所有RDD 的转换和操作的函数中,RDDOperationScope会把调用栈记录下来,用于绘制Spark UI的 DAG

  • checkpointData: 当前RDD的检查点数据

  • checkpointAllMarkedAncestors: 是否对所有标记了需要保存检查点的祖先保存检查点

  • doCheckpointCalled: 是否已经调用了doCheckPoint方法设置检查点, 此属性可以阻止对RDD多次设置检查点

  • stateLock: 用于锁定RDD的可变状态, 它被定义为一个常量整数,private val stateLock = new Integer(0) , 联想到操作系统中的信号量机制,在进行可变操作时会先执行stateLock.synchronized{}加锁, 那么为什么不直接用this给对象加锁呢?源码里是这样解释的:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * Lock for all mutable state of this RDD (persistence, partitions, dependencies, etc.). We do
    * not use `this` because RDDs are user-visible, so users might have added their own locking on
    * RDDs; sharing that could lead to a deadlock.
    *
    * One thread might hold the lock on many of these, for a chain of RDD dependencies; but
    * because DAGs are acyclic, and we only ever hold locks for one path in that DAG, there is no
    * chance of deadlock.
    *
    * The use of Integer is simply so this is serializable -- executors may reference the shared
    * fields (though they should never mutate them, that only happens on the driver).
    */

    大致意思就是因为RDD都是用户可见的, 用户线程和spark的调度器线程可以同时访问和修改RDD依赖项和分区, 所以用户可能在某些时候会主动加锁,而RDD是一个并行数据结构,大家都去抢夺锁的话就有可能会产生死锁(理解的比较片面,后续需要回顾理解, 详情见SPARK-28917)

  • barrier: 一个还在实验的特性


一些函数方法定义

RDD采用了模版方法的模式设计,抽象类RDD定义了模版方法及一些接口

一些接口

  • compute: 对RDD的分区进行计算
  • getPartitions: 获取当前RDD的所有分区
  • getDependencies: 获取当前RDD的所有依赖
  • getPreferredLocations: 获取某一分区的偏好位置

一些模版方法

  • partitions方法
    用于获取RDD的分区数组
1
2
3
4
5
6
7
8
9
10
11
12
13
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
stateLock.synchronized {
if (partitions_ == null) {
partitions_ = getPartitions
partitions_.zipWithIndex.foreach {...}
}
}
}
partitions_
}
}

可以看出, partitions方法查找分区数组的优先级为:
CheckPoint查找 -> 读取partitions_属性 -> 调用getPartitions方法获取

  • dependencies方法
    用于获取当前RDD的所有依赖的序列
1
2
3
4
5
6
7
8
9
10
11
12
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
stateLock.synchronized {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
}
}
dependencies_
}
}

其他方法

  • getStorageLevel
    返回当前RDD的StorageLevel
  • getNumPartitions
    获取这个RDD的分区数
  • getNarrowAncestors
    利用 DFS算法遍历当前RDD的依赖树获取其祖先依赖中属于窄依赖的RDD序列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
val ancestors = new mutable.HashSet[RDD[_]]

def visit(rdd: RDD[_]): Unit = {
val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
val narrowParents = narrowDependencies.map(_.rdd)
val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
narrowParentsNotVisited.foreach { parent =>
ancestors.add(parent)
visit(parent)
}
}

visit(this)

// In case there is a cycle, do not include the root itself
ancestors.filterNot(_ == this).toSeq
}

还有一些基本算子


为什么需要RDD

从以下四个点解释

数据处理模型

通常数据处理的模型包括迭代计算、关系查询、MapReduce、流式处理等。Hadoop采用MapReduce模型,Storm采用流式处理模型,而Spark借助RDD实现了以上所有模型

依赖划分原则

一个RDD包含一个或多个分区, 每个分区实际上是一个数据集合的片段, RDD之间通过依赖关系串联起来构建成DAG.


依赖划分为NarrowDependcy和ShuffleDependcy两种, 参考:


RDD的依赖


Spark宽依赖与窄依赖

数据处理效率

ShuffleDependcy所依赖的上游RDD的计算过程允许在多个节点并发执行, 数据量大的时候可以通过适当调整分区数量,能有效提高Spark的数据处理效率

容错处理

传统关系型数据库往往采用日志记录的方式来容灾,Hadoop通过将数据备份到其他机器容灾。Spark则通过RDD依赖组成的DAG直接重新调度计算失败的Task,成功的Task可以从CheckPoint中读取。
(:在流式计算中,Spark需要通过记录日志和CheckPoint进行数据恢复

参考

What is Spark RDD and Why Do We Need it?


RDD的依赖

Dependency抽象类

Spark使用Dependency来表示RDD之间的依赖关系,其定义如下:

1
2
3
4
DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}

抽象类Dependendcy只定义了一个名叫rdd的方法, 此方法返回当前依赖的RDD
依赖又分为窄依赖(NarrowDependcy)宽依赖(ShuffleDependcy)

窄依赖(NarrowDependcy)

基本定义

如果子RDD依赖于父RDD中固定的Partion分区, 其之间的依赖关系属于窄依赖, 定义如下:

1
2
3
4
5
6
7
8
9
10
11
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]

override def rdd: RDD[T] = _rdd
}

还有两个子类OneToOneDependencyRangeDependency

OneToOneDependency

1
2
3
4
5
6
7
8
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

这个类重写了getParents, 直接把传递进去的id号封装成Seq返回。可以是多个子RDD, 但每一个子RDD的每一个分区和父RDD都是一对一的关系, 子RDD分区的个数和父RDD分区的个数一样, 如下两种情况

RangeDependency

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {

override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}

RangeDependency也是重写了getParents方法, 与OneToOneDependency不同的点在于可能有多个父RDD, 而一个子RDD的分区依赖于很多父RDD的分区, 但分区也都是一对一的关系, 即子RDD的每一个分区都且唯一对应着一个父RDD的分区, union操作中存在这种情况

宽依赖(ShuffleDependcy)

父RDD的一个分区中的数据同时被子RDD的多个分区所依赖, 就是说子RDD的分区和父RDD不再是一对一的关系, 可能是多对一的关系, 数据被切分利用

源码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
* explicitly then the default serializer, as specified by `spark.serializer`
* config option, will be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {

if (mapSideCombine) {
require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
}
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)

val shuffleId: Int = _rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

属性解释

  1. _rdd:泛型要求必须是Product2[K, V]及其子类的RDD
  2. partitioner:分区计算器Partitioner
  3. serializer:SparkEnv中创建的serializer,即org.apache.spark.serializer.JavaSerializer。
  4. keyOrdering:按照K进行排序的scala.math.Ordering的实现类。
  5. aggregator:对map任务的输出数据进行聚合的聚合器。
  6. mapSideCombine:是否在map端进行合并,默认为false。
  7. keyClassName:K的类名。
  8. valueClassName:V的类名。
  9. combinerClassName:组合器,将切割后的数据组合在一个RDD所使用的组合器
  10. shuffleId:当前ShuffleDependency的身份标识。
  11. shuffleHandle:当前ShuffleDependency的处理器

补充

  1. ShuffleDependency需要经过shuffle过程才能形成,而shuffle都是基于 PairRDD进行的,所以传入的RDD需要是key-value类型的
  2. newShuffleId的作用是得到唯一的shufflId(每次加1)