RDD总结
抽象类RDD源码分析
基本定义
1 | abstract class RDD[T: ClassTag]( |
变量定义
id
: 当前RDD的唯一身份标识,val id: Int = sc.newRddId()
storageLevel
: 当前RDD的存储级别,private var storageLevel: StorageLevel = StorageLevel.NONE
creationSite
: 创建当前RDD的用户代码(e.g.textFile
,parallelize
)scope
: 当前RDD的操作作用域, 有一个withScope调用, 就像是一个 AOP,嵌入到所有RDD 的转换和操作的函数中,RDDOperationScope会把调用栈记录下来,用于绘制Spark UI的 DAGcheckpointData
: 当前RDD的检查点数据checkpointAllMarkedAncestors
: 是否对所有标记了需要保存检查点的祖先保存检查点doCheckpointCalled
: 是否已经调用了doCheckPoint方法设置检查点, 此属性可以阻止对RDD多次设置检查点stateLock
: 用于锁定RDD的可变状态, 它被定义为一个常量整数,private val stateLock = new Integer(0)
, 联想到操作系统中的信号量机制,在进行可变操作时会先执行stateLock.synchronized{}
加锁, 那么为什么不直接用this
给对象加锁呢?源码里是这样解释的:1
2
3
4
5
6
7
8
9
10
11
12/**
* Lock for all mutable state of this RDD (persistence, partitions, dependencies, etc.). We do
* not use `this` because RDDs are user-visible, so users might have added their own locking on
* RDDs; sharing that could lead to a deadlock.
*
* One thread might hold the lock on many of these, for a chain of RDD dependencies; but
* because DAGs are acyclic, and we only ever hold locks for one path in that DAG, there is no
* chance of deadlock.
*
* The use of Integer is simply so this is serializable -- executors may reference the shared
* fields (though they should never mutate them, that only happens on the driver).
*/大致意思就是因为RDD都是用户可见的, 用户线程和spark的调度器线程可以同时访问和修改RDD依赖项和分区, 所以用户可能在某些时候会主动加锁,而RDD是一个并行数据结构,大家都去抢夺锁的话就有可能会产生死锁(理解的比较片面,后续需要回顾理解, 详情见SPARK-28917)
barrier
: 一个还在实验的特性
一些函数方法定义
RDD采用了模版方法的模式设计,抽象类RDD定义了模版方法及一些接口
一些接口
compute
: 对RDD的分区进行计算getPartitions
: 获取当前RDD的所有分区getDependencies
: 获取当前RDD的所有依赖getPreferredLocations
: 获取某一分区的偏好位置
一些模版方法
partitions
方法
用于获取RDD的分区数组
1 | final def partitions: Array[Partition] = { |
可以看出, partitions
方法查找分区数组的优先级为:
从CheckPoint
查找 -> 读取partitions_
属性 -> 调用getPartitions
方法获取
dependencies
方法
用于获取当前RDD的所有依赖的序列
1 | final def dependencies: Seq[Dependency[_]] = { |
其他方法
getStorageLevel
返回当前RDD的StorageLevelgetNumPartitions
获取这个RDD的分区数getNarrowAncestors
利用DFS
算法遍历当前RDD的依赖树获取其祖先依赖中属于窄依赖的RDD序列
1 | private[spark] def getNarrowAncestors: Seq[RDD[_]] = { |
还有一些基本算子
为什么需要RDD
从以下四个点解释
数据处理模型
通常数据处理的模型包括迭代计算、关系查询、MapReduce、流式处理等。Hadoop采用MapReduce模型,Storm采用流式处理模型,而Spark借助RDD实现了以上所有模型
依赖划分原则
一个RDD包含一个或多个分区, 每个分区实际上是一个数据集合的片段, RDD之间通过依赖关系串联起来构建成DAG.
依赖划分为NarrowDependcy和ShuffleDependcy
两种, 参考:
数据处理效率
ShuffleDependcy
所依赖的上游RDD的计算过程允许在多个节点并发执行, 数据量大的时候可以通过适当调整分区数量,能有效提高Spark的数据处理效率
容错处理
传统关系型数据库往往采用日志记录的方式来容灾,Hadoop通过将数据备份到其他机器容灾。Spark则通过RDD依赖组成的DAG直接重新调度计算失败的Task,成功的Task可以从CheckPoint
中读取。
(:在流式计算中,Spark需要通过记录日志和CheckPoint进行数据恢复
参考
What is Spark RDD and Why Do We Need it?
RDD的依赖
Dependency
抽象类
Spark使用Dependency
来表示RDD之间的依赖关系,其定义如下:
1 | DeveloperApi |
抽象类Dependendcy只定义了一个名叫rdd的方法, 此方法返回当前依赖的RDD
依赖又分为窄依赖(NarrowDependcy)
和宽依赖(ShuffleDependcy)
窄依赖(NarrowDependcy)
基本定义
如果子RDD
依赖于父RDD
中固定的Partion
分区, 其之间的依赖关系属于窄依赖, 定义如下:
1 |
|
还有两个子类OneToOneDependency
和 RangeDependency
OneToOneDependency
1 | /** |
这个类重写了getParents
, 直接把传递进去的id号封装成Seq返回。可以是多个子RDD, 但每一个子RDD的每一个分区和父RDD都是一对一的关系, 子RDD分区的个数和父RDD分区的个数一样, 如下两种情况
RangeDependency
1 | /** |
RangeDependency
也是重写了getParents
方法, 与OneToOneDependency
不同的点在于可能有多个父RDD, 而一个子RDD的分区依赖于很多父RDD的分区, 但分区也都是一对一的关系, 即子RDD的每一个分区都且唯一对应着一个父RDD的分区, union操作中存在这种情况
宽依赖(ShuffleDependcy)
父RDD的一个分区中的数据同时被子RDD的多个分区所依赖, 就是说子RDD的分区和父RDD不再是一对一的关系, 可能是多对一的关系, 数据被切分利用
源码实现
1 | /** |
属性解释
_rdd
:泛型要求必须是Product2[K, V]及其子类的RDDpartitioner
:分区计算器Partitionerserializer
:SparkEnv中创建的serializer,即org.apache.spark.serializer.JavaSerializer。keyOrdering
:按照K进行排序的scala.math.Ordering的实现类。aggregator
:对map任务的输出数据进行聚合的聚合器。mapSideCombine
:是否在map端进行合并,默认为false。keyClassName
:K的类名。valueClassName
:V的类名。combinerClassName
:组合器,将切割后的数据组合在一个RDD所使用的组合器shuffleId
:当前ShuffleDependency的身份标识。shuffleHandle
:当前ShuffleDependency的处理器
补充
- ShuffleDependency需要经过shuffle过程才能形成,而shuffle都是基于 PairRDD进行的,所以传入的RDD需要是key-value类型的
newShuffleId
的作用是得到唯一的shufflId(每次加1)