事务消息

高可用及幂等

在分布式系统中一般有三种处理语义:

  • at-least-once:至少一次,有可能会有多次。如果 producer 收到来自 ack 的确认,则表示该消息已经写入到 Kafka 了,此时刚好是一次,也就是我们后面的 exactly-once。但是如果 producer 超时或收到错误,并且 request.required.acks 配置的不是-1,则会重试发送消息,客户端会认为该消息未写入 Kafka。如果 broker 在发送 Ack 之前失败,但在消息成功写入 Kafka 之后,这一次重试将会导致我们的消息会被写入两次,所以消息就不止一次地传递给最终 consumer,如果 consumer 处理逻辑没有保证幂等的话就会得到不正确的结果。在这种语义中会出现乱序,也就是当第一次 ack 失败准备重试的时候,但是第二消息已经发送过去了,这个时候会出现单分区中乱序的现象,我们需要设置 Prouducer 的参数 max.in.flight.requests.per.connection,flight.requests 是 Producer 端用来保存发送请求且没有响应的队列,保证 Producer 端未响应的请求个数为 1。

  • at-most-once:如果在 ack 超时或返回错误时 producer 不重试,也就是我们讲 request.required.acks=-1,则该消息可能最终没有写入 kafka,所以 consumer 不会接收消息。

  • exactly-once:刚好一次,即使 producer 重试发送消息,消息也会保证最多一次地传递给 consumer。该语义是最理想的,也是最难实现的。在 0.10 之前并不能保证 exactly-once,需要使用 consumer 自带的幂等性保证。0.11.0 使用事务保证了。

如何实现 exactly-once

要实现 exactly-once 在 Kafka 0.11.0 中有两个官方策略。

单 Producer 单 Topic

每个 producer 在初始化的时候都会被分配一个唯一的 PID,对于每个唯一的 PID,Producer 向指定的 Topic 中某个特定的 Partition 发送的消息都会携带一个从 0 单调递增的 sequence number。在我们的 Broker 端也会维护一个维度为,每次提交一次消息的时候都会对齐进行校验:

  • 如果消息序号比 Broker 维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时 Broker 拒绝该消息,Producer 抛出 InvalidSequenceNumber。
  • 如果消息序号小于等于 Broker 维护的序号,说明该消息已被保存,即为重复消息,Broker 直接丢弃该消息,Producer 抛出 DuplicateSequenceNumber
  • 如果消息序号刚好大一,就证明是合法的。

上面所说的解决了两个问题:

  • 当 Prouducer 发送了一条消息之后失败,broker 并没有保存,但是第二条消息却发送成功,造成了数据的乱序。
  • 当 Producer 发送了一条消息之后,broker 保存成功,ack 回传失败,producer 再次投递重复的消息。

上面所说的都是在同一个 PID 下面,意味着必须保证在单个 Producer 中的同一个 seesion 内,如果 Producer 挂了,被分配了新的 PID,这样就无法保证了,所以 Kafka 中又有事务机制去保证。

事务

在 kafka 中事务的作用是:

  • 实现 exactly-once 语义
  • 保证操作的原子性,要么全部成功,要么全部失败。
  • 有状态的操作的恢复

事务可以保证就算跨多个,在本次事务中的对消费队列的操作都当成原子性,要么全部成功,要么全部失败。并且,有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。在 kafka 的事务中,应用程序必须提供一个唯一的事务 ID,即 Transaction ID,并且宕机重启之后,也不会发生改变,Transactin ID 与 PID 可能一一对应。区别在于 Transaction ID 由用户提供,而 PID 是内部的实现对用户透明。为了 Producer 重启之后,旧的 Producer 具有相同的 Transaction ID 失效,每次 Producer 通过 Transaction ID 拿到 PID 的同时,还会获取一个单调递增的 epoch。由于旧的 Producer 的 epoch 比新 Producer 的 epoch 小,Kafka 可以很容易识别出该 Producer 是老的 Producer 并拒绝其请求。

为了实现这一点,Kafka 0.11.0.0 引入了一个服务器端的模块,名为 Transaction Coordinator,用于管理 Producer 发送的消息的事务性。该 Transaction Coordinator 维护 Transaction Log,该 log 存于一个内部的 Topic 内。由于 Topic 数据具有持久性,因此事务的状态也具有持久性。Producer 并不直接读写 Transaction Log,它与 Transaction Coordinator 通信,然后由 Transaction Coordinator 将该事务的状态插入相应的 Transaction Log。Transaction Log 的设计与 Offset Log 用于保存 Consumer 的 Offset 类似。

上一页
下一页