一、Kafka基本概念
Producer: 消息和数据的生产者,向kafka的一个Topic发布消息的进程/代码/服务Consumer:消息和数据的消费者,订阅数据(topic)并且处理其发布的消息的进程/代码/服务Consumer Group:逻辑概念,对于同一个topic,会广播给不同的group,一个group中,只有一个consumer可以消费该消息Broker:物理概念,kafka集群中的每个kafka节点Topic:逻辑概念,kafka消息的类别,对数据进行区分、隔离Partition:物理概念,kafka下数据存储的基本单元。一个topic数据,会被分散存储到多个Partition,每一个Partition是有序的Replication:同一个Partition可能会多个Replication,多个Replication之间数据是一样的Replication Leader:一个Partition的多个Replica上,需要一个Leader负责该Partition上与Producer和Consumer交互ReplicaManager:负责管理当前broker所有分区和副本的信息,处理KafkaController发起的一些请求,副本状态的切换、添加/读取消息等
二、 Kafka特点
分布式
- 多分区
- 多副本
- 多订阅者
- 基于ZooKeeper调度
高性能
- 高吞吐量
- 低延迟
- 高并发
- 时间复杂度为O(1)
持久性和扩展性
- 数据可持久化
- 容错性
- 支持水平在线扩展
- 消息自动平衡
Kafka快的原因
- 多partition提升了并发
- zero-copy零拷贝
- 顺序写入
- 消息batch批量操作
- page cache页缓存
三、Kafka基本命令使用
- 创建主题(4个分区,2个副本)
1 | bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test |
- 查询集群描述
1 | bin/kafka-topics.sh --describe --zookeeper hadoop2:2181 |
- topic列表查询
1 | bin/kafka-topics.sh --zookeeper hadoop2:2181 --list |
- 新消费者列表查询
1 | bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server hadoop2:6667 --list |
- 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
1 | bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop2:6667 --group test |
- 生产者发送消息
1 | bin/kafka-console-producer.sh --broker-list hadoop2:6667 --topic kafka-test1 |
- 消费者消费消息
1 | bin/kafka-console-consumer.sh --bootstrap-server hadoop2:6667 --topic kafka-test1 --from-beginning |
四、Kafka常用配置
生产者的配置
参考链接: kafka生产者Producer参数设置及参数调优建议
acks
指定需要有多少个分区副本收到消息,生产者才会认为消息写入是成功的,这个参数对消息丢失的可能性有重要影响- acks=0
表示produce请求立即返回,不需要等待leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功 - acks=1
只要集群的leader节点收到消息,生产者就会收到一个来自服务器的成功
响应。如果消息无法到达leader节点(比如leader节点崩溃,新的leader还没有被选举出来),
生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个
没有收到消息的节点成为新leader,消息还是会丢失。这个时候的吞吐量取决于使用的是Kafka生产者——向Kafka写入数据 同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生
产者在收到服务器响应之前可以发送多少个消息)。 - acks=all/-1
分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为produce请求成功, 这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的
- acks=0
buffer.memory
设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息compression.type
设置消息压缩算法, 可以设置为snappy、gzip或lz4retries
生产者可以重发消息的次数batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里一起发送,该参数指定了一个批次可以使用的内存大小, 按照字节数计算(而不是消息个数). 不过批次没满的时候如果达到了linger.ms设置的上限也会把批次发送出去linger.ms
指定生产者在发送批次之前的等待时间, 配合batch.size设置client.id
消息来源的识别标志max.in.flight.requests.per.connection
指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试超时时间配置request.timeout.ms: 生产者在发送数据时等待服务器返回响应的时间metadata.fetch.timeout.ms: 生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间timeout.ms: broker 等待同步副本返回消息确认的时间
max.block.ms
该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间, 达到上限后会抛出超时异常max.request.size
用于控制生产者发送的请求大小recieive.buffer.bytes和send.buffer.bytes
这两个参数分别指定了TCP socket接收和发送数据包的缓冲区大小
消费者配置
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数, 类比生产者buffer.memory的配置。broker 在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者fetch.max.wait.ms
我们通过fetch.min.bytes告诉 Kafka,等到有足够的数据时才把它返回给消费者。而fetch.max.wait.ms则用于指定 broker 的等待时间,默认是 500ms, 类比生产者linger.ms配置max.partition.fetch.bytes
指定服务器从每个分区里返回给消费者的最大字节数session.timeout.ms和heartbeat.interval.msheartbeat.interval.ms指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。heartbeat.interval.ms必须比session.timeout.ms小,一般是session.timeout.ms的三分之一。auto.offset.reset
指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长
时间失效,包含偏移量的记录已经过时并被删除)该作何处理(latest/earliest/none/anything else)enable.auto.commit
指定了消费者是否自动提交偏移量,默认值是 true, 手动维护offset
时需要设置为 faslepartition.assignment.strategy
分区分配策略client.id
broker用来标识从客户端发送过来的消息max.poll.records
该属性用于控制单次调用 poll() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量receive.buffer.bytes和send.buffer.bytes
五、Kafka消息事务
消息投递语义
- At-most-once(最多一次): 可能出现消息丢失的情况
- At-least-once(最少一次): 可能出现消息重复的情况
- Exactly-once(正好一次): 理想的语义实现,但实现难度较高
灾难场景分析
Broker失败
Kafka有自己的备份机制,保证消息写入leader replica成功后会冗余n份,同步到其他replica,所以理论上可以容忍n-1个broker节点宕机
Producer 到 Broker的RPC失败
Kafka的消息可靠性是基于producer接收到broker的ack确认信息为准的,但是Broker在消息已经写入但还未返回ack确认信息之前就可能会发生故障,也可能在消息被写入topic之前就宕机了,因为producer端无法知道失败的原因,只能尝试重发消息,因此某些下游场景可能就会存在消息乱序或者consumer重复消费的情况
客户端失败
客户端也可能存在永久宕机或者暂时性心跳丢失的情况,追求正确的性的话,broker和consumer应该丢弃从zombie producer发送的消息。同时新的客户端实例启动后它需要能从失败实例的任何状态中恢复,并从安全点(safe checkpoint)开始处理,这需要消费的偏移量位置和实际产生的消息输出保持同步
Kafka的Exactly-once语义保证
幂等: partition内部的exactly-once语义保证
幂等是指执行多次同样的操作得到的结果是一致的, Producer的send()操作现在就是幂等的。在任何导致producer重试的情况下,相同的消息如果被producer发送多次,也只会写入一次。需要修改broker的配置: enable.idempotence = true开启此功能
原理,类似于TCP中可靠传输的累积确认机制实现:Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个topic也会维护pid-seq的映射,并且每Commit都会更新lastSeq。这样recordBatch到来时,broker会先检查RecordBatch再保存数据:如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则将其丢弃:
- 如果大1以上,说明中间有数据尚未写入,此时Broker拒绝此消息,Producer抛出
InvalidSequenceNumber。解决了前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成的数据乱序问题 - 如果小于等于Broker维护的序号,说明是重复消息,Broker直接丢弃该消息,Producer抛出
DuplicateSequenceNumber
事务性保证: 跨partition分区的原子操作
Kafka提供事务API对消息进行跨partition分区的原子性写操作:
1 | producer.initTransactions(); |
引入Transaction ID和epoch, Transaction ID与PID可能一一对应,区别在于Transaction ID由用户提供,而PID是Kafka内部实现的,对用户透明, 可以保证:
跨Session的数据幂等发送: 当具有相同Transaction ID的新的Producer实例被创建且工作时,epoch会单调递增,由于旧的Producer的epoch比新Producer的epoch小,旧的且拥有相同Transaction ID的Producer将不再工作跨Session的事务恢复: 如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作
从Consumer的角度来看,事务保证会相对弱一些。尤其是不能保证所有被某事务Commit过的所有消息都被一起消费,因为:
- 对于压缩的Topic而言,同一事务的某些消息可能被其它版本覆盖
- 事务包含的消息可能分布在多个Segment中(即使在同一个Partition内),当老的Segment被删除时,该事务的部分数据可能会丢失
- Consumer在一个事务内可能通过seek方法访问任意Offset的消息,从而可能丢失部分消息
- Consumer可能并不需要消费某一事务内的所有Partition,因此它将永远不会读取组成该事务的所有消息
开发实现思路
自定义offset管理, 需要保证对消息的处理和offset偏移量在同一个事务中,例如在消息处理的数据写入到MySQL的同时更新此时的消息offset消息偏移量
实现:
- 设置
enable.auto.commit = false, 关闭消息偏移量自动提交 - 处理消息的同时将offset消息偏移量保存,比如保存到MySQL
- 当partition分区发生变化的时候可能会发生分区再平衡,需要自定义类实现
ConsumerRebalanceListener接口捕捉事件变化,对偏移量进行处理 - 在重新完成分配分区后,消费者开始读取消息之前 通过调用
seek(TopicPartition, long)方法,移动到指定的分区的偏移量位置
参考链接
- Kafka client 消息接收的三种模式
- Kafka 0.11.0.0 是如何实现 Exactly-once 语义的
- Kafka 事务特性分析
- Kafka详解
- [Kafka设计解析(八)- Exactly Once语义与事务机制原理](