Spark Streaming + Kafka实战

内容整理自:

接收Kafka数据

接收数据的方式有两种:

  • 利用Receiver接收数据
  • 直接从Kafka读取数据

利用Receiver接收数据

从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据

1
2
3
4
5
import org.apache.spark.streaming.kafka.*;

JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);

需要注意的地方:

  • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度

  • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream

  • 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

直接从kafka读取数据

在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch

这种方法相较于Receiver方式的优势在于:

  • 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。

  • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。

  • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性

向Kafka写入数据

Spark没有提供统一的接口用于写数据入Kafka, 因此我们需要使用Kafka接口自定义包装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
input.foreachRDD(rdd => {
// 不能在这里新建KafkaProducer,因为KafkaProducer是不可序列化的
rdd.foreachPartition(partition => {
partition.foreach{
case x: String=>{
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message=new ProducerRecord[String, String]("KafkaPushTest1",null,x)
producer.send(message)

}

}
})
})

上面这种方法实现最简单,但缺点也很明显,我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的
-w1366

因此对于每个partition的每条记录都需要创建KafkaProducer,相当于都要建立一次连接,低效且不灵活,优化方案:

  • 懒加载方式定义KafkaProducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

lazy val producer = createProducer()

def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))

}

object KafkaSink {
import scala.collection.JavaConversions._

def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
val createProducerFunc = () => {
val producer = new KafkaProducer[K,V](config)
sys.addShutdownHook{
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}

def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)

}
  • 利用广播变量的形式,将KafkaProducer广播到每一个executor
1
2
3
4
5
6
7
8
9
10
11
12
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "10.111.32.81:9092")
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
log.warn("kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
  • 在每个executor中数据写入Kafka
1
2
3
4
5
6
7
input.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreach(record => {
kafkaProducer.value.send("KafkaPushTest2","KafkaPushTest2", record)
})
}
})

Spark Streaming+Kafka调优

Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置

合理的批处理时间(batchDuration)

初始化StreamingContext时设置,代表job处理的时间。不能过小,会导致Spark Streaming频繁提交作业,如果batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积, 最终导致Spark Streaming发生阻塞.

合理的Kafka拉取量(maxRatePerPartition)

配置参数:spark.streaming.kafka.maxRatePerPartition, 默认没有上线,即Kafka中有多少数据都会全部拉出。
这个参数需要结合上面的batchDuration配置,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量

缓存反复使用的DStream(RDD)

对经常复用的RDD,我们会选择cache()将其缓存下来,同理在Spark Streaming我们也可以选择将DStream缓存下来,防止过度的调度资源造成的网络开销

设置合理的GC

设置合理的资源配置

  • num-executors: 用于设置Spark作业总共要用多少个Executor进程来执行
  • executor-memory: 该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联
  • executor-cores: 该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力

设置合理的parallelism

partition和parallelism

  • partition: partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低

  • parallelism: parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的partition数量, 而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响

通过spark.default.parallelism可以设置默认的分片数量,也可以在创建RDD时指定. 在SparkStreaming+Kafka的使用中,如果采用Direct连接方式,Spark中的partition和Kafka中的Partition是一一对应的,一般默认设置为Kafka中Partition的数量

其他

参考: Spark进阶-Spark调优总结