一、Kafka基本概念

  • Producer: 消息和数据的生产者,向kafka的一个Topic发布消息的进程/代码/服务
  • Consumer:消息和数据的消费者,订阅数据(topic)并且处理其发布的消息的进程/代码/服务
  • Consumer Group:逻辑概念,对于同一个topic,会广播给不同的group,一个group中,只有一个consumer可以消费该消息
  • Broker:物理概念,kafka集群中的每个kafka节点
  • Topic:逻辑概念,kafka消息的类别,对数据进行区分、隔离
  • Partition:物理概念,kafka下数据存储的基本单元。一个topic数据,会被分散存储到多个Partition,每一个Partition是有序的
  • Replication:同一个Partition可能会多个Replication,多个Replication之间数据是一样的
  • Replication Leader:一个Partition的多个Replica上,需要一个Leader负责该Partition上与Producer和Consumer交互
  • ReplicaManager:负责管理当前broker所有分区和副本的信息,处理KafkaController发起的一些请求,副本状态的切换、添加/读取消息等

二、 Kafka特点

分布式

  • 多分区
  • 多副本
  • 多订阅者
  • 基于ZooKeeper调度

高性能

  • 高吞吐量
  • 低延迟
  • 高并发
  • 时间复杂度为O(1)

持久性和扩展性

  • 数据可持久化
  • 容错性
  • 支持水平在线扩展
  • 消息自动平衡

Kafka快的原因

  • 多partition提升了并发
  • zero-copy零拷贝
  • 顺序写入
  • 消息batch批量操作
  • page cache页缓存

三、Kafka基本命令使用

  • 创建主题(4个分区,2个副本)
1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
  • 查询集群描述
1
bin/kafka-topics.sh --describe --zookeeper hadoop2:2181
  • topic列表查询
1
bin/kafka-topics.sh --zookeeper hadoop2:2181 --list
  • 新消费者列表查询
1
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server hadoop2:6667 --list
  • 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
1
bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop2:6667 --group test
  • 生产者发送消息
1
bin/kafka-console-producer.sh --broker-list hadoop2:6667 --topic kafka-test1
  • 消费者消费消息
1
bin/kafka-console-consumer.sh --bootstrap-server hadoop2:6667 --topic kafka-test1 --from-beginning

四、Kafka常用配置

生产者的配置

参考链接: kafka生产者Producer参数设置及参数调优建议

  1. acks
    指定需要有多少个分区副本收到消息,生产者才会认为消息写入是成功的,这个参数对消息丢失的可能性有重要影响

    • acks=0
      表示produce请求立即返回,不需要等待leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功
    • acks=1
      只要集群的leader节点收到消息,生产者就会收到一个来自服务器的成功
      响应。如果消息无法到达leader节点(比如leader节点崩溃,新的leader还没有被选举出来),
      生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个
      没有收到消息的节点成为新leader,消息还是会丢失。这个时候的吞吐量取决于使用的是Kafka生产者——向Kafka写入数据 同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生
      产者在收到服务器响应之前可以发送多少个消息)。
    • acks=all/-1
      分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为produce请求成功, 这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的
  2. buffer.memory
    设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息

  3. compression.type
    设置消息压缩算法, 可以设置为snappy、gzip或lz4

  4. retries
    生产者可以重发消息的次数

  5. batch.size
    当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里一起发送,该参数指定了一个批次可以使用的内存大小, 按照字节数计算(而不是消息个数). 不过批次没满的时候如果达到了 linger.ms 设置的上限也会把批次发送出去

  6. linger.ms
    指定生产者在发送批次之前的等待时间, 配合batch.size设置

  7. client.id
    消息来源的识别标志

  8. max.in.flight.requests.per.connection
    指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试

  9. 超时时间配置

    • request.timeout.ms: 生产者在发送数据时等待服务器返回响应的时间
    • metadata.fetch.timeout.ms: 生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间
    • timeout.ms: broker 等待同步副本返回消息确认的时间
  10. max.block.ms
    该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间, 达到上限后会抛出超时异常

  11. max.request.size
    用于控制生产者发送的请求大小

  12. recieive.buffer.bytessend.buffer.bytes
    这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小

消费者配置

  1. fetch.min.bytes
    该属性指定了消费者从服务器获取记录的最小字节数, 类比生产者buffer.memory的配置。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者

  2. fetch.max.wait.ms
    我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 fetch.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms, 类比生产者linger.ms配置

  3. max.partition.fetch.bytes
    指定服务器从每个分区里返回给消费者的最大字节数

  4. session.timeout.msheartbeat.interval.ms
    heartbeat.interval.ms指定了 poll() 方法向协调器发送心跳的频率, session.timeout.ms 则指定了消费者可以多久不发送心跳。heartbeat.interval.ms必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。

  5. auto.offset.reset
    指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长
    时间失效,包含偏移量的记录已经过时并被删除)该作何处理(latest/earliest/none/anything else)

  6. enable.auto.commit
    指定了消费者是否自动提交偏移量,默认值是 true, 手动维护offset
    时需要设置为 fasle

  7. partition.assignment.strategy
    分区分配策略

  8. client.id
    broker用来标识从客户端发送过来的消息

  9. max.poll.records
    该属性用于控制单次调用 poll() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量

  10. receive.buffer.bytessend.buffer.bytes

五、Kafka消息事务

消息投递语义

  • At-most-once(最多一次): 可能出现消息丢失的情况
  • At-least-once(最少一次): 可能出现消息重复的情况
  • Exactly-once(正好一次): 理想的语义实现,但实现难度较高

灾难场景分析

Broker失败
Kafka有自己的备份机制,保证消息写入leader replica成功后会冗余n份,同步到其他replica,所以理论上可以容忍n-1个broker节点宕机

Producer 到 Broker的RPC失败
Kafka的消息可靠性是基于producer接收到broker的ack确认信息为准的,但是Broker在消息已经写入但还未返回ack确认信息之前就可能会发生故障,也可能在消息被写入topic之前就宕机了,因为producer端无法知道失败的原因,只能尝试重发消息,因此某些下游场景可能就会存在消息乱序或者consumer重复消费的情况

客户端失败
客户端也可能存在永久宕机或者暂时性心跳丢失的情况,追求正确的性的话,broker和consumer应该丢弃从zombie producer发送的消息。同时新的客户端实例启动后它需要能从失败实例的任何状态中恢复,并从安全点(safe checkpoint)开始处理,这需要消费的偏移量位置和实际产生的消息输出保持同步

Kafka的Exactly-once语义保证

幂等: partition内部的exactly-once语义保证

幂等是指执行多次同样的操作得到的结果是一致的, Producer的send()操作现在就是幂等的。在任何导致producer重试的情况下,相同的消息如果被producer发送多次,也只会写入一次。需要修改broker的配置: enable.idempotence = true开启此功能

原理,类似于TCP中可靠传输的累积确认机制实现:
Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个topic也会维护pid-seq的映射,并且每Commit都会更新lastSeq。这样recordBatch到来时,broker会先检查RecordBatch再保存数据:如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则将其丢弃:

  • 如果大1以上,说明中间有数据尚未写入,此时Broker拒绝此消息,Producer抛出InvalidSequenceNumber。解决了前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成的数据乱序问题
  • 如果小于等于Broker维护的序号,说明是重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber

事务性保证: 跨partition分区的原子操作

Kafka提供事务API对消息进行跨partition分区的原子性写操作:

1
2
3
4
5
6
7
8
9
10
11
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}

引入Transaction IDepoch, Transaction ID与PID可能一一对应,区别在于Transaction ID由用户提供,而PID是Kafka内部实现的,对用户透明, 可以保证:

  • 跨Session的数据幂等发送: 当具有相同Transaction ID的新的Producer实例被创建且工作时,epoch会单调递增,由于旧的Producer的epoch比新Producer的epoch小,旧的且拥有相同Transaction ID的Producer将不再工作

  • 跨Session的事务恢复: 如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作

从Consumer的角度来看,事务保证会相对弱一些。尤其是不能保证所有被某事务Commit过的所有消息都被一起消费,因为:

  • 对于压缩的Topic而言,同一事务的某些消息可能被其它版本覆盖
  • 事务包含的消息可能分布在多个Segment中(即使在同一个Partition内),当老的Segment被删除时,该事务的部分数据可能会丢失
  • Consumer在一个事务内可能通过seek方法访问任意Offset的消息,从而可能丢失部分消息
  • Consumer可能并不需要消费某一事务内的所有Partition,因此它将永远不会读取组成该事务的所有消息

开发实现思路

自定义offset管理, 需要保证对消息的处理和offset偏移量在同一个事务中,例如在消息处理的数据写入到MySQL的同时更新此时的消息offset消息偏移量

实现:

  1. 设置enable.auto.commit = false, 关闭消息偏移量自动提交
  2. 处理消息的同时将offset消息偏移量保存,比如保存到MySQL
  3. 当partition分区发生变化的时候可能会发生分区再平衡,需要自定义类实现ConsumerRebalanceListener接口捕捉事件变化,对偏移量进行处理
  4. 在重新完成分配分区后,消费者开始读取消息之前 通过调用seek(TopicPartition, long)方法,移动到指定的分区的偏移量位置

参考链接