架构概念
Kafka 架构概念
Kafka 作为一个高度可扩展可容错的消息系统,一个典型的 kafka 集群中包含若干 producer,若干 broker,若干 consumer,以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 consumer group 发生变化时进行 rebalance。producer 使用 push 模式将消息发布到 broker,consumer 使用 pull 模式从 broker 订阅并消费消息:
Kafka 专用术语:
- Broker:消息中间件处理结点,一个 Kafka 节点就是一个 broker,多个 broker 可以组成一个 Kafka 集群。
- Topic:一类消息,Kafka 集群能够同时负责多个 topic 的分发。
- Partition:topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
- Segment:partition 物理上由多个 segment 组成。
- offset:每个 partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。partition 中的每个消息都有一个连续的序列号叫做 offset,用于 partition 唯一标识一条消息。
- Producer:负责发布消息到 Kafka broker。
- Consumer:消息消费者,向 Kafka broker 读取消息的客户端。
- Consumer Group:每个 Consumer 属于一个特定的 Consumer Group。
Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据。批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费,总结一下其实就是四个要点:
- 顺序读写:因为硬盘是机械结构,每次读写都会寻址,写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最“讨厌”随机 I/O,最喜欢顺序 I/O。为了提高读写硬盘的速度,Kafka 就是使用顺序 I/O。
- 零拷贝:在 Linux Kernal 2.2 之后出现了一种叫做“零拷贝(zero-copy)”系统调用机制,就是跳过“用户缓冲区”的拷贝,建立一个磁盘空间和内存空间的直接映射,数据不再复制到“用户态缓冲区”系统上下文切换减少 2 次,可以提升一倍性能。
- 消息压缩:消息都是经过压缩传递、存储的,降低网络与磁盘的负担。
- 分批发送:批量处理是一种非常有效的提升系统吞吐量的方法,在 Kafka 内部,消息都是以“批”为单位处理的。
消息与主题
消息是 Kafka 通信的基本单位,由一个固定长度的消息头和一个可变长度的消息体构成。在老版本中,每一条消息称为 Message;在由 Java 重新实现的客户端中,每一条消息称为 Record。为了提高效率,消息会分批次写入 Kafka,批次就代指的是一组消息。Kafka 将一组消息抽象归纳为一个主题(Topic),也就是说,一个主题就是对消息的一个分类。生产者将消息发送到特定主题,消费者订阅主题或主题的某些分区进行消费。
Kafka 消息传递的事务特点如下:
- at most once:最多一次,这个和 JMS 中"非持久化"消息类似,发送一次,无论成败,将不会重发。消费者 fetch 消息,然后保存 offset,然后处理消息;当 client 保存 offset 之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理。那么此后"未处理"的消息将不能被 fetch 到,这就是"at most once"。
- at least once:消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功。消费者 fetch 消息,然后处理消息,然后保存 offset。如果消息处理成功之后,但是在保存 offset 阶段 zookeeper 异常导致保存操作未能执行成功,这就导致接下来再次 fetch 时可能获得上次已经处理过的消息,这就是"at least once",原因 offset 没有及时的提交给 zookeeper,zookeeper 恢复正常还是之前 offset 状态。
- exactly once:消息只会发送一次。kafka 中并没有严格的去实现(基于 2 阶段提交),我们认为这种策略在 kafka 中是没有必要的。
生产者(Producer)
生产者(Producer)负责将消息发送给代理,也就是向 Kafka 代理发送消息的客户端。生产者会将某 topic 的消息发布到相应的 partition 中。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。
Kafka 中的生产者设计主要考虑了以下方面:
- 负载均衡:由于消息 topic 由多个 partition 组成,且 partition 会均衡分布到不同 broker 上,因此,为了有效利用 broker 集群的性能,提高消息的吞吐量,producer 可以通过随机或者 hash 等方式,将消息平均发送到多个 partition 上,以实现负载均衡。
- 批量发送:是提高消息吞吐量重要的方式,Producer 端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给 broker,从而大大减少 broker 存储消息的 IO 操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。
消费者和消费组
consumer 是 kafka 当中的消费者,主要用于消费 kafka 当中的数据;在 Kafka 中每一个消费者都属于一个特定消费组(ConsumerGroup),我们可以为每个消费者指定一个消费组,以 groupId 代表消费组名称,通过 group.id 配置设置。如果不指定消费组,则该消费者属于默认消费组 test-consumer-group。
同时,每个消费者也有一个全局唯一的 id,通过配置项 client.id 指定,如果客户端没有指定消费者的 id,Kafka 会自动为该消费者生成一个全局唯一的 id,格式为 ${groupId}-${hostName}-${timestamp}-${UUID 前 8 位字符}
。同一个主题的一条消息只能被同一个消费组下某一个消费者消费,但不同消费组的消费者可同时消费该消息。消费组是 Kafka 用来实现对一个主题消息进行广播和单播的手段,实现消息广播只需指定各消费者均属于不同的消费组,消息单播则只需让各消费者属于同一个消费组。
总结而言,Kafka 中的消费者与消费组具备以下属性:
- 任何 Consumer 必须属于一个 Consumer Group
- 同一 Consumer Group 中的多个 Consumer 实例,不同时消费同一个 partition,等效于队列模式。如图,Consumer Group 1 的三个 Consumer 实例分别消费不同的 partition 的消息,即,TopicA-part0、TopicA-part1、TopicA-part2。
- 不同 Consumer Group 的 Consumer 实例可以同时消费同一个 partition,等效于发布订阅模式。如图,Consumer Group 1 的 Consumer1 和 Consumer Group 2 的 Consumer4,同时消费 TopicA-part0 的消息。
- Partition 内消息是有序的,Consumer 通过 pull 方式消费消息。
- Kafka 不删除已消费的消息
分区
Kafka 将一组消息归纳为一个主题,而每个主题又被分成一个或多个分区(Partition)。每个分区由一系列有序、不可变的消息组成,是一个有序队列。同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性;单一主题中的分区有序,但是无法保证主题中所有的分区有序。
每个分区在物理上对应为一个文件夹,分区的命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从 0 开始,编号最大值为分区的总数减 1。每个分区又有一至多个副本(Replica),分区的副本分布在集群的不同代理上,以提高可用性。从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志(Log)对象,即分区的副本与日志对象是一一对应的。每个主题对应的分区数可以在 Kafka 启动时所加载的配置文件中配置,也可以在创建主题时指定。当然,客户端还可以在主题创建后修改主题的分区数。
分区使得 Kafka 在并发处理上变得更加容易,分区数量决定了每个 Consumer Group 中并发消费者的最大数量。理论上来说,分区数越多吞吐量越高,但这要根据集群实际环境及业务场景而定。某一个主题下的分区数,对于消费该主题的同一个消费组下的消费者数量,应该小于等于该主题下的分区数。某一个主题有 4 个分区,那么消费组中的消费者应该小于等于 4,而且最好与分区数成整数倍 1 2 4 这样。同一个分区下的数据,在同一时刻,不能同一个消费组的不同消费者消费。
同时,分区也是 Kafka 保证消息被顺序消费以及对消息进行负载均衡的基础。Kafka 只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性。每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这是 Kafka 高吞吐率的一个重要保证。同时与传统消息系统不同的是,Kafka 并不会立即删除已被消费的消息,由于磁盘的限制消息也不会一直被存储,因此 Kafka 提供两种删除老数据的策略,一是基于消息已存储的时间长度,二是基于分区的大小。这两种策略都能通过配置文件进行配置。
偏移量
任何发布到分区的消息会被直接追加到日志文件(分区目录下以“.log”为文件名后缀的数据文件)的尾部,而每条消息在日志文件中的位置都会对应一个按序递增的偏移量。偏移量是一个分区下严格有序的逻辑值,它并不表示消息在磁盘上的物理位置。由于 Kafka 几乎不允许对消息进行随机读写,因此 Kafka 并没有提供额外索引机制到存储偏移量,也就是说并不会给偏移量再提供索引。
消费者可以通过控制消息偏移量来对消息进行消费,如消费者可以指定消费的起始偏移量。为了保证消息被顺序消费,消费者已消费的消息对应的偏移量也需要保存。需要说明的是,消费者对消息偏移量的操作并不会影响消息本身的偏移量。旧版消费者将消费偏移量保存到 ZooKeeper 当中,而新版消费者是将消费偏移量保存到 Kafka 内部一个主题当中。当然,消费者也可以自己在外部系统保存消费偏移量,而无需保存到 Kafka 中。
日志段(Segment)
一个日志又被划分为多个日志段(LogSegment),日志段是 Kafka 日志对象分片的最小单位。与日志对象一样,日志段也是一个逻辑概念,一个日志段对应磁盘上一个具体日志文件和两个索引文件。日志文件是以“.log”为文件名后缀的数据文件,用于保存消息实际数据。两个索引文件分别以“.index”和“.timeindex”作为文件名后缀,分别表示消息偏移量索引文件和消息时间戳索引文件。查找某个 offset 的消息,先二分法找出消息所在的 segment 文件(因为每个 segment 的命名都是以该文件中消息 offset 最小的值命名);然后,加载对应的.index 索引文件到内存,同样二分法找出小于等于给定 offset 的最大的那个 offset 记录(相对 offset,position);最后,根据 position 到.log 文件中,顺序查找出 offset 等于给定 offset 值的消息。
由于消息在 partition 的 segment 数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过期的 segment 文件),这种顺序磁盘 IO 存储设计是 Kafka 高性能很重要的原因。
代理(Broker)
Kafka 集群包含一个或多个服务器,我们将每一个 Kafka 实例称为代理(Broker),通常也称代理为 Kafka 服务器(KafkaServer)。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
在生产环境中 Kafka 集群一般包括一台或多台服务器,我们可以在一台服务器上配置一个或多个代理。每一个代理都有唯一的标识 id,这个 id 是一个非负整数。在一个 Kafka 集群中,每增加一个代理就需要为这个代理配置一个与该集群中其他代理不同的 id,id 值可以选择任意非负整数即可,只要保证它在整个 Kafka 集群中唯一,这个 id 就是代理的名字,也就是在启动代理时配置的 broker.id 对应的值,因此在本文中有时我们也称为 brokerId。由于给每个代理分配了不同的 brokerId,这样对代理进行迁移就变得更方便,从而对消费者来说是透明的,不会影响消费者对消息的消费。
每个集群中都会有一个 broker 同时充当了 集群控制器(Leader)的角色,它是由集群中的活跃成员选举出来的。每个集群中的成员都有可能充当 Leader,Leader 负责管理工作,包括将分区分配给 broker 和监控 broker。集群中,一个分区从属于一个 Leader,但是一个分区可以分配给多个 broker(非 Leader),这时候会发生分区复制。这种复制的机制为分区提供了消息冗余,如果一个 broker 失效,那么其他活跃用户会重新选举一个 Leader 接管。
ISR
Kafka 在 ZooKeeper 中动态维护了一个 ISR(In-sync Replica),即保存同步的副本列表,该列表中保存的是与 Leader 副本保持消息同步的所有副本对应的代理节点 id。如果一个 Follower 副本宕机(本书用宕机来特指某个代理失效的情景,包括但不限于代理被关闭,如代理被人为关闭或是发生物理故障、心跳检测过期、网络延迟、进程崩溃等)或是落后太多,则该 Follower 副本节点将从 ISR 列表中移除。对于 Kafka 节点,判断是”alive”有以下两个条件:
- 节点必须和 Zookeeper 保持心跳连接
- 如果节点是 follower,必须从 leader 节点上复制数据来备份,而且备份的数据相比 leader 而言,不能落后太多。
副本
由于 Kafka 副本的存在,就需要保证一个分区的多个副本之间数据的一致性,Kafka 会选择该分区的一个副本作为 Leader 副本,而该分区其他副本即为 Follower 副本,只有 Leader 副本才负责处理客户端读/写请求,Follower 副本从 Leader 副本同步数据。引入 Leader 副本后客户端只需与 Leader 副本进行交互,这样数据一致性及顺序性就有了保证。Follower 副本从 Leader 副本同步消息,对于 n 个副本只需 n−1 条通路即可,这样就使得系统更加简单而高效。副本 Follower 与 Leader 的角色并不是固定不变的,如果 Leader 失效,通过相应的选举算法将从其他 Follower 副本中选出新的 Leader 副本。
Kafka 解决了 fail/recover,一条消息只有被 ISR 里的所有 Follower 都从 Leader 复制过去才会被认为已提交。这样就避免了部分数据被写进了 Leader,还没来得及被任何 Follower 复制就宕机了,而造成数据丢失(Consumer 无法消费这些数据)。
ZooKeeper
Kafka 利用 ZooKeeper 保存相应元数据信息,Kafka 元数据信息包括如代理节点信息、Kafka 集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。Kafka 在启动或运行过程当中会在 ZooKeeper 上创建相应节点来保存元数据信息,Kafka 通过监听机制在这些节点注册相应监听器来监听节点元数据的变化,从而由 ZooKeeper 负责管理维护 Kafka 集群,同时通过 ZooKeeper 我们能够很方便地对 Kafka 集群进行水平扩展及数据迁移。
不过 2021 年3 月 30 日,Kafka 背后的企业 Confluent 发布博客表示,在即将发布的 2.8 版本里,用户可在完全不需要 ZooKeeper 的情况下运行 Kafka,该版本将依赖于 ZooKeeper 的控制器改造成了基于 Kafka Raft 的 Quorm 控制器。在之前的版本中,如果没有 ZooKeeper,Kafka 将无法运行。但管理部署两个不同的系统不仅让运维复杂度翻倍,还让 Kafka 变得沉重,进而限制了 Kafka 在轻量环境下的应用,同时 ZooKeeper 的分区特性也限制了 Kafka 的承载能力。