Spark缓存和checkpoint机制
持久化
在实际开发中,经常会选择复用RDD来减少性能消耗,这个时候就会通过cache()
和persist()
方法来缓存复用RDD
存储级别
MEMORY_ONLY
: 默认存储级别,存储在JVM内存中。如果内存空间不足,部分数据分区将不再缓存,以后需要的时候重新计算MEMORY_AND_DISK
: 先使用内存缓存,如果内存空间不足将数据缓存到磁盘上DISK_ONLY
: 只在磁盘上缓存MEMORY_ONLY_SER
: 以序列化的方式存储数据到内存中,能节省较多的空间,但在读取的时候会增加计算负担MEMORY_AND_DISK_SER
MEMORY_ONLY_2
、DISK_ONLY_2
、MEMORY_AND_DISK_SER_2
: 类似上面,只不过每个分区在集群中两个节点上建立副本OFF_HEAP
: 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory堆外内存
存储级别的选择
Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡:
- 如果内存充足,那么就选择默认的存储级别即可
- 如果内存不能存储全部的RDD,那么选择
MEMORY_ONLY_SER
,并挑选合适的序列化库进行序列化存储,节省空间 - 除了计算代价特别高的情况下,尽量不要选择磁盘方式存储,因为重新计算的耗时跟从磁盘方式读取耗时差不多
- 如果想要快速复原故障,那么建议使用多副本存储策略
cache和persist区别
cache()
底层就是直接调用的persist()
方法,设置的就是默认的MEMORY_ONLY
存储级别persist()
方法可以通过传递一个 StorageLevel 对象来设置缓存的存储级别
checkpoint机制
分布式环境中难免因为网络,存储等原因出现计算失败的情况,为了避免计算失败后从头开始计算的大量开销,可以利用checkpoint机制保存计算过程中的信息,这样作业失败后直接从checkpoint点重新计算即可,提高效率
- 当RDD的action算子触发计算结束后会执行checkpoint
- 在spark streaming中每generate一个batch的RDD也会触发checkpoint操作
- task计算失败的时候会从checkpoint读取数据进行计算
checkpoint的具体实现
LocalRDDCheckpointData
:临时存储在本地executor的磁盘和内存上(不能仅使用内存,因为内存的eviction机制可能造成data loss)。该实现的特点是比较快,适合lineage信息需要经常被删除的场景(如GraphX),可容忍executor挂掉。ReliableRDDCheckpointData
:存储在外部可靠存储(如hdfs),可以达到容忍driver 挂掉情况。虽然效率没有存储本地高,但是容错级别最好
缓存和checkpoint区别
- 缓存会把RDD的所有依赖关系一并缓存下来,因为如果某个节点挂了,缓存在内存中的数据就会丢失,这个时候需要通过依赖关系重新计算
- checkpoint是将RDD的分区数据持久化存储起来,是多副本可靠存储,因此不需要保存依赖关系。不过checkpoint是需要把 job 重新从头算一遍, 所以最好先cache一下,可以提升性能
详细原理参考: