Spark缓存和checkpoint机制

持久化

在实际开发中,经常会选择复用RDD来减少性能消耗,这个时候就会通过cache()persist()方法来缓存复用RDD

存储级别

  • MEMORY_ONLY: 默认存储级别,存储在JVM内存中。如果内存空间不足,部分数据分区将不再缓存,以后需要的时候重新计算

  • MEMORY_AND_DISK: 先使用内存缓存,如果内存空间不足将数据缓存到磁盘上

  • DISK_ONLY: 只在磁盘上缓存

  • MEMORY_ONLY_SER: 以序列化的方式存储数据到内存中,能节省较多的空间,但在读取的时候会增加计算负担

  • MEMORY_AND_DISK_SER

  • MEMORY_ONLY_2DISK_ONLY_2MEMORY_AND_DISK_SER_2: 类似上面,只不过每个分区在集群中两个节点上建立副本

  • OFF_HEAP: 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory堆外内存

存储级别的选择

Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡:

  • 如果内存充足,那么就选择默认的存储级别即可
  • 如果内存不能存储全部的RDD,那么选择MEMORY_ONLY_SER,并挑选合适的序列化库进行序列化存储,节省空间
  • 除了计算代价特别高的情况下,尽量不要选择磁盘方式存储,因为重新计算的耗时跟从磁盘方式读取耗时差不多
  • 如果想要快速复原故障,那么建议使用多副本存储策略

cache和persist区别

  • cache()底层就是直接调用的persist()方法,设置的就是默认的MEMORY_ONLY存储级别
  • persist()方法可以通过传递一个 StorageLevel 对象来设置缓存的存储级别

checkpoint机制

分布式环境中难免因为网络,存储等原因出现计算失败的情况,为了避免计算失败后从头开始计算的大量开销,可以利用checkpoint机制保存计算过程中的信息,这样作业失败后直接从checkpoint点重新计算即可,提高效率

  • 当RDD的action算子触发计算结束后会执行checkpoint
  • 在spark streaming中每generate一个batch的RDD也会触发checkpoint操作
  • task计算失败的时候会从checkpoint读取数据进行计算

checkpoint的具体实现

  • LocalRDDCheckpointData:临时存储在本地executor的磁盘和内存上(不能仅使用内存,因为内存的eviction机制可能造成data loss)。该实现的特点是比较快,适合lineage信息需要经常被删除的场景(如GraphX),可容忍executor挂掉。

  • ReliableRDDCheckpointData:存储在外部可靠存储(如hdfs),可以达到容忍driver 挂掉情况。虽然效率没有存储本地高,但是容错级别最好

缓存和checkpoint区别

  • 缓存会把RDD的所有依赖关系一并缓存下来,因为如果某个节点挂了,缓存在内存中的数据就会丢失,这个时候需要通过依赖关系重新计算
  • checkpoint是将RDD的分区数据持久化存储起来,是多副本可靠存储,因此不需要保存依赖关系。不过checkpoint是需要把 job 重新从头算一遍, 所以最好先cache一下,可以提升性能

详细原理参考:

  1. 深入浅出Spark的Checkpoint机制
  2. spark源码分析之Checkpoint的过程