Spark Streaming + Kafka实战
内容整理自:
接收Kafka数据
接收数据的方式有两种:
- 利用Receiver接收数据
- 直接从Kafka读取数据
利用Receiver接收数据
从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据
1 | import org.apache.spark.streaming.kafka.*; |
需要注意的地方:
在Receiver的方式中,
Spark中的partition和kafka中的partition并不是相关的
,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度对于不同的Group和topic我们可以使用
多个Receiver
创建不同的Dstream来并行接收数据,之后可以利用union
来统一成一个Dstream如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成
StorageLevel.MEMORY_AND_DISK_SER
,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
直接从kafka读取数据
在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch
这种方法相较于Receiver方式的优势在于:
简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性
向Kafka写入数据
Spark没有提供统一的接口用于写数据入Kafka, 因此我们需要使用Kafka接口自定义包装
1 | input.foreachRDD(rdd => { |
上面这种方法实现最简单,但缺点也很明显,我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的
因此对于每个partition的每条记录都需要创建KafkaProducer,相当于都要建立一次连接,低效且不灵活,优化方案:
- 懒加载方式定义KafkaProducer
1 | class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { |
- 利用广播变量的形式,将KafkaProducer广播到每一个executor
1 | // 广播KafkaSink |
- 在每个executor中数据写入Kafka
1 | input.foreachRDD(rdd => { |
Spark Streaming+Kafka调优
Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置
合理的批处理时间(batchDuration)
初始化StreamingContext
时设置,代表job处理的时间。不能过小,会导致Spark Streaming频繁提交作业,如果batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积, 最终导致Spark Streaming发生阻塞.
合理的Kafka拉取量(maxRatePerPartition)
配置参数:spark.streaming.kafka.maxRatePerPartition
, 默认没有上线,即Kafka中有多少数据都会全部拉出。
这个参数需要结合上面的batchDuration
配置,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量
缓存反复使用的DStream(RDD)
对经常复用的RDD,我们会选择cache()将其缓存下来,同理在Spark Streaming我们也可以选择将DStream缓存下来,防止过度的调度资源造成的网络开销
设置合理的GC
设置合理的资源配置
num-executors
: 用于设置Spark作业总共要用多少个Executor进程来执行executor-memory
: 该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联executor-cores
: 该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力
设置合理的parallelism
partition和parallelism
partition
: partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低parallelism
: parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的partition数量, 而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响
通过spark.default.parallelism
可以设置默认的分片数量,也可以在创建RDD时指定. 在SparkStreaming+Kafka的使用中,如果采用Direct连接方式,Spark中的partition和Kafka中的Partition是一一对应的,一般默认设置为Kafka中Partition的数量