Kafka消息偏移量机制
消费者提交偏移量
消费者往一个叫做_consumer_offset的特殊主题发送消息,消息里包含每个分区的偏移量。
如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新 的消费者加入群组,就会触发再均衡
,完成再均衡
之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理, 这个时候就可能会因为提交的偏移量与客户端处理的偏移量之间不一致产生的重复消费和消息丢失情况
重复消费
如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理
消息丢失
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两者之间的消息就会丢失
提交偏移量的方式
自动提交(at most once)
enable.auto.commit被设为true, 消费者会自动提交偏移量,提交时间间隔由auto.commit.interval.ms控制, 默认为5s
最简单的提交方式,但是不能清楚的知道消息处理的情况,容易产生消息重复消费和消息丢失的情况
主动提交当前偏移量(at least once)
利用KafkaConsumer API可以在必要的时候主动提交当前偏移量,而不是基于时间间隔
commitSync()
同步提交, 将会提交由 poll() 返回的最新偏移量, 可以保证可靠性,但因为提交时程序会处于阻塞状态,限制吞吐量commitAsync()
异步提交, 不保证消息可靠,支持自定义回调处理,用于记录提交错误或生成度量指标
提交特定的偏移量
提交偏移量的频率与处理消息批次的频率是一样的,就是说通常是处理完一批次消息提交一次偏移量,但有时候比如 poll() 方法返回一大批数据,为了避免因为再均衡(详见下节说明)引起的重复处理整批消息,我们想要在批次中间提交偏移量,这时可以通过调用commitSync()/commitAsync()方法时传入一个存有希望提交的分区和偏移量的map实现,但是因为消费者可能不只读取一个分区,你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂
从特定偏移量处开始处理记录
1 | // 获得主题 topic 所有可用分区 partition 的信息 |
再均衡
Kafka消费模式
从kafka消费消息,kafka客户端提供两种模式: 分区消费,分组消费
消费者数目跟分区数目的关系
- 一个消费者可以消费一个到全部分区数据
- 分组消费,同一个group内所有消费者消费一份完整的数据,此时一个分区数据只能被一个消费者消费,而一个消费者可以消费多个分区数据
- 同一个消费组内,消费者数目大于分区数目后,消费者会有空余=分区数-消费者数,这个时候可以选择新增group
分组消费的再平衡策略
我们知道一个group下的消费者共同消费主题的分区数据,一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。还有就是可能主题发送了变化,添加了新分区,也会发生分区重分配。
以上几种情况,分区的所有权从一个消费者转移到了另一个消费者, 就是分区再均衡
,它为消费者群组带来了高可用性和伸缩性,缺陷在于在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用
通过设置参数 partition.assignment.strategy
, 选择分配策略,有两种分配策略:
RangeAssignor
把主题的若干个连续的分区分配给消费者, 默认是这种
假设消费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2
RoundRobinAssignor
把主题的所有分区逐个分配给消费者
如果使用 RoundRobin 策略来给消费者 C1和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般来说, 如果所有消费者都订阅相同的主题(这种情况很常见), RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)
参考
分区心跳机制
消费者依赖于心跳机制向 GroupCoordinator 报活,发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系
具体参考:Kafka 源码解析:Group 协调管理机制
再均衡监听器
在提交偏移量, 消费者在退出和进行分区再均衡之前,会做一些清理工作, 我们可以通过定义监听器的方法做一些特殊的逻辑处理, 比如处理缓冲区记录, 进行数据库连接操作保存信息(比如利用Mysql
存储offset进行手动维护)等
实现很简单,在订阅消费主题时传入一个再均衡监听器实例就行(可以自己实现接口)
1 | consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { |