副本
副本(replication)策略
Kafka 在 0.8 版本前没有提供 Partition 的 Replication 机制,一旦 Broker 宕机,其上的所有 Partition 就都无法提供服务,而 Partition 又没有备份数据,数据的可用性就大大降低了。所以 0.8 后提供了 Replication 机制来保证 Broker 的 failover。
Kafka 的高可靠性的保障来源于其健壮的副本(replication)策略,即多个服务端节点对其他节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫 Reblance)。Kafka 的 replica 副本单元是 topic 的 partition,一个 partition 的 replica 数量不能超过 broker 的数量,因为一个 broker 最多只会存储这个 partition 的一个副本。所有消息生产、消费请求都是由 partition 的 leader replica 来处理,其他 follower replica 负责从 leader 复制数据进行备份,保持和 Leader 的数据同步。如果没有 Leader 副本,那就需要所有的副本都同时负责读/写请求处理,同时还得保证这些副本之间数据的一致性,假设有 n 个副本则需要有 n×n 条通路来同步数据,这样数据的一致性和有序性就很难保证。
ISR
在 Kafka 中并不是所有的 Follower 都能被拿来替代 Leader,所以在 Kafka 的 Leader 节点中维护着一个 ISR(In sync Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:
- 一是它必须维护与 ZooKeeper 的 session(这个通过 ZooKeeper 的 Heartbeat 机制来实现)。
- 二是 Follower 必须能够及时将 Leader 的消息复制过来,不能“落后太多”。
Leader 会跟踪与其保持同步的 Replica 列表,如果一个 Follower 宕机,或者落后太多,Leader 将把它从 ISR 中移除。这里所描述的“落后太多”指 Follower 复制的消息落后于 Leader 后的条数超过预定值或者 Follower 超过一定时间未向 Leader 发送 fetch 请求。
数据可靠性
Producer 在发布消息到某个 Partition 时,先通过 ZooKeeper 找到该 Partition 的 Leader,然后无论该 Topic 的 Replication Factor 为多少,Producer 只将该消息发送到该 Partition 的 Leader。Leader 会将该消息写入其本地 Log。每个 Follower 都从 Leader 拉取数据:
- 消息所在 partition 的 ISR replicas 会定时异步从 leader 上批量复制数据 log
- 当所有 ISR replica 都返回 ack,告诉 leader 该消息已经写 log 成功后,leader 认为该消息 committed,并告诉 Producer 生产成功。这里和以上”alive”条件的第二点是不矛盾的,因为 leader 有超时机制,leader 等 ISR 的 follower 复制数据,如果一定时间不返回 ack(可能数据复制进度落后太多),则 leader 将该 follower replica 从 ISR 中剔除。
- 一旦 Leader 收到了 ISR 中的所有 Replica 的 ACK,该消息就被认为已经 commit 了,Leader 将增加 HW 并且向 Producer 发送 ACK。HW(高水位)是 Consumer 能够看到的此 partition 的位置,LEO 是每个 partition 的 log 最后一条 Message 的位置。HW 能保证 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取,不会造成消息丢失。
Kafka Replication 的数据流如下图所示:
ISR 机制下的数据复制,既不是完全的同步复制,也不是单纯的异步复制,这是 Kafka 高吞吐很重要的机制。同步复制要求所有能工作的 follower 都复制完,这条消息才会被认为 committed,这种复制方式极大的影响了吞吐量。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 committed,这种情况下如果 follower 都复制完都落后于 leader,而如果 leader 突然宕机,则会丢失数据。而 Kafka 的这种使用 ISR 的方式则很好的均衡了确保数据不丢失以及吞吐量,follower 可以批量的从 leader 复制数据,数据复制到内存即返回 ack,这样极大的提高复制性能,当然数据仍然是有丢失风险的。
对于 Producer 而言,它可以选择是否等待消息 commit。这种机制确保了只要 ISR 有一个或以上的 Follower,一条被 commit 的消息就不会丢失。当 Producer 向 Leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:
- 1(默认):这意味着 producer 在 ISR 中的 leader 已成功收到的数据并得到确认后发送下一条 message。如果 leader 宕机了,则会丢失数据。
- 0:这意味着 producer 无需等待来自 broker 的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
- -1:producer 需要等待 ISR 中的所有 follower 都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当 ISR 中只有 leader 时(其他节点都和 zk 断开连接,或者都没追上),这样就变成了 acks=1 的情况。
副本放置策略
为了更好的做负载均衡,Kafka 尽量将所有的 Partition 均匀分配到整个集群上。Kafka 分配 Replica 的算法如下:
- 将所有存活的 N 个 Brokers 和待分配的 Partition 排序
- 将第 i 个 Partition 分配到第(i mod n)个 Broker 上,这个 Partition 的第一个 Replica 存在于这个分配的 Broker 上,并且会作为 partition 的优先副本
- 将第 i 个 Partition 的第 j 个 Replica 分配到第((i + j) mod n)个 Broker 上
假设集群一共有 4 个 brokers,一个 topic 有 4 个 partition,每个 Partition 有 3 个副本。下图是每个 Broker 上的副本分配情况。
服务可用性
Leader 选举
Kafka 所有收发消息请求都由 Leader 节点处理,由以上数据可靠性设计可知,当 ISR 的 follower replica 故障后,leader 会及时地从 ISR 列表中把它剔除掉,并不影响服务可用性。而如果 Leader 发生了故障,则 Kafka 会重新选举 Leader:
- Kafka 在 Zookeeper 存储 partition 的 ISR 信息,并且能动态调整 ISR 列表的成员,只有 ISR 里的成员 replica 才会被选为 leader,并且 ISR 所有的 replica 都有可能成为 leader;
- Leader 节点宕机后,Zookeeper 能监控发现,并由 broker 的 controller 节点从 ISR 中选举出新的 leader,并通知 ISR 内的所有 broker 节点。
Leader 选举本质上是一个分布式锁,有两种方式实现基于 ZooKeeper 的分布式锁:
- 节点名称唯一性:多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁
- 临时顺序节点:所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁
容灾和数据一致性
分布式系统的容灾能力,跟其本身针对数据一致性考虑所选择的算法有关,例如,Zookeeper 的 Zab 算法,raft 算法等。Kafka 的 ISR 机制和这些 Majority Vote 算法对比如下:
- ISR 机制能容忍更多的节点失败。假如 replica 节点有 2f+1 个,每个 partition 最多能容忍 2f 个失败,且不丢失消息数据;但相对 Majority Vote 选举算法,只能最多容忍 f 个失败。
- 在消息 committed 持久化上,ISR 需要等 2f 个节点返回 ack,但 Majority Vote 只需等 f+1 个节点返回 ack,且不依赖处理最慢的 follower 节点,因此 Majority Vote 有优势
- ISR 机制能节省更多 replica 节点数。例如,要保证 f 个节点可用,ISR 方式至少要 f 个节点,而 Majority Vote 至少需要 2f+1 个节点。
如果所有 replica 都宕机了,有两种方式恢复服务:
- 等 ISR 任一节点恢复,并选举为 leader;
- 选择第一个恢复的节点(不一定是 ISR 中的节点)为 leader
第一种方式消息不会丢失(只能说这种方式最有可能不丢而已),第二种方式可能会丢消息,但能尽快恢复服务可用。这是可用性和一致性场景的两种考虑,Kafka 默认选择第二种,用户也可以自主配置。大部分考虑 CP 的分布式系统(假设 2f+1 个节点),为了保证数据一致性,最多只能容忍 f 个节点的失败,而 Kafka 为了兼顾可用性,允许最多 2f 个节点失败,因此是无法保证数据强一致的。
如图所示,一开始 ISR 数量等于 3,正常同步数据,红色部分开始,leader 发现其他两个 follower 复制进度太慢或者其他原因(网络分区、节点故障等),将其从 ISR 剔除后,leader 单节点存储数据;然后,leader 宕机,触发重新选举第二节点为 leader,重新开始同步数据,但红色部分的数据在新 leader 上是没有的;最后原 leader 节点恢复服务后,重新从新 leader 上复制数据,而红色部分的数据已经消费不到了。
因此,为了减少数据丢失的概率,可以设置 Kafka 的 ISR 最小 replica 数,低于该值后直接返回不可用,当然是以牺牲一定可用性和吞吐量为前提了。