消息消费模型

Pulsar 消息消费模型

Apache Pulsar 通过订阅,抽象出了统一的: producer-topic-subscription-consumer 消费模型。Pulsar 的消息模型既支持队列模型,也支持流模型。在 Pulsar 的消息消费模型中,Topic 是用于发送消息的通道。每一个 Topic 对应着 Apache BookKeeper 中的一个分布式日志。发布者发布的每条消息只在 Topic 中存储一次;存储的过程中,BookKeeper 会将消息复制存储在多个存储节点上;Topic 中的每条消息,可以根据消费者的订阅需求,多次被使用,每个订阅对应一个消费者组(Consumer Group)。

主题(Topic)是消费消息的真实来源。尽管消息仅在主题(Topic)上存储一次,但是用户可以有不同的订阅方式来消费这些消息:

  • 消费者被组合在一起以消费消息,每个消费组是一个订阅。
  • 每个 Topic 可以有不同的消费组。
  • 每组消费者都是对主题的一个订阅。
  • 每组消费者可以拥有自己不同的消费方式: 独占(Exclusive),故障切换(Failover)或共享(Share)。

Pulsar 通过这种模型,将队列模型和流模型这两种模型结合在了一起,提供了统一的 API 接口。这种模型,既不会影响消息系统的性能,也不会带来额外的开销,同时还为用户提供了更多灵活性,方便用户程序以最匹配模式来使用消息系统。

三种订阅模式

独占订阅(Stream 流模型)

顾名思义,独占订阅中,在任何时间,一个消费者组(订阅)中有且只有一个消费者来消费 Topic 中的消息。下图是独占订阅的示例。在这个示例中有一个有订阅 A 的活跃消费者 A-0,消息 m0 到 m4 按顺序传送并由 A-0 消费。如果另一个消费者 A-1 想要附加到订阅 A,则是不被允许的。

独占订阅

故障切换(Stream 流模型)

使用故障切换订阅,多个消费者(Consumer)可以附加到同一订阅。但是,一个订阅中的所有消费者,只会有一个消费者被选为该订阅的主消费者。其他消费者将被指定为故障转移消费者。当主消费者断开连接时,分区将被重新分配给其中一个故障转移消费者,而新分配的消费者将成为新的主消费者。发生这种情况时,所有未确认(ack)的消息都将传递给新的主消费者。这类似于 Apache Kafka 中的 Consumer partition rebalance。

下图是故障切换订阅的示例。消费者 B-0 和 B-1 通过订阅 B 订阅消费消息。B-0 是主消费者并接收所有消息。B-1 是故障转移消费者,如果消费者 B-0 出现故障,它将接管消费。

故障切换模型

共享订阅(Queue 队列模型)

使用共享订阅,在同一个订阅背后,用户按照应用的需求挂载任意多的消费者。订阅中的所有消息以循环分发形式发送给订阅背后的多个消费者,并且一个消息仅传递给一个消费者。当消费者断开连接时,所有传递给它但是未被确认(ack)的消息将被重新分配和组织,以便发送给该订阅上剩余的剩余消费者。

下图是共享订阅的示例。消费者 C-1,C-2 和 C-3 都在同一主题上消费消息。每个消费者接收大约所有消息的 1/3。如果想提高消费的速度,用户不需要不增加分区数量,只需要在同一个订阅中添加更多的消费者。

共享订阅模型

三种订阅模式的选择

独占和故障切换订阅,仅允许一个消费者来使用和消费,每个对主题的订阅。这两种模式都按主题分区顺序使用消息。它们最适用于需要严格消息顺序的流(Stream)用例。共享订阅允许每个主题分区有多个消费者。同一订阅中的每个消费者仅接收主题分区的一部分消息。共享订阅最适用于不需要保证消息顺序的队列(Queue)的使用模式,并且可以按照需要任意扩展消费者的数量。

Pulsar 中的订阅实际上与 Apache Kafka 中的 Consumer Group 的概念类似。创建订阅的操作很轻量化,而且具有高度可扩展性,用户可以根据应用的需要创建任意数量的订阅。对同一主题的不同订阅,也可以采用不同的订阅类型。比如用户可以在同一主题上可以提供一个包含 3 个消费者的故障切换订阅,同时也提供一个包含 20 个消费者的共享订阅,并且可以在不改变分区数量的情况下,向共享订阅添加更多的消费者。

下图描绘了一个包含 3 个订阅 A,B 和 C 的主题,并说明了消息如何从生产者流向消费者。

不同订阅模式下的演示

除了统一消息 API 之外,由于 Pulsar 主题分区实际上是存储在 Apache BookKeeper 中,它还提供了一个读取 API(Reader),类似于消费者 API(但 Reader 没有游标管理),以便用户完全控制如何使用 Topic 中的消息。

Pulsar 的消息确认(ACK)

由于分布式系统的特性,当使用分布式消息系统时,可能会发生故障。比如在消费者从消息系统中的主题消费消息的过程中,消费消息的消费者和服务于主题分区的消息代理(Broker)都可能发生错误。消息确认(ACK)的目的就是保证当发生这样的故障后,消费者能够从上一次停止的地方恢复消费,保证既不会丢失消息,也不会重复处理已经确认(ACK)的消息。

  • 在 Apache Kafka 中,恢复点通常称为 Offset,更新恢复点的过程称为消息确认或提交 Offset。
  • 在 Apache Pulsar 中,每个订阅中都使用一个专门的数据结构–游标(Cursor)来跟踪订阅中的每条消息的确认(ACK)状态。每当消费者在主题分区上确认消息时,游标都会更新。更新游标可确保消费者不会再次收到消息。

Apache Pulsar 提供两种消息确认方法,单条确认(Individual Ack)和累积确认(Cumulative Ack)。通过累积确认,消费者只需要确认它收到的最后一条消息。主题分区中的所有消息(包括)提供消息 ID 将被标记为已确认,并且不会再次传递给消费者。累积确认与 Apache Kafka 中的 Offset 更新类似。Apache Pulsar 可以支持消息的单条确认,也就是选择性确认。消费者可以单独确认一条消息。被确认后的消息将不会被重新传递。下图说明了单条确认和累积确认的差异(灰色框中的消息被确认并且不会被重新传递)。在图的上半部分,它显示了累计确认的一个例子,M12 之前的消息被标记为 acked。在图的下半部分,它显示了单独进行 acking 的示例。仅确认消息 M7 和 M12 - 在消费者失败的情况下,除了 M7 和 M12 之外,其他所有消息将被重新传送。

单条确认与累计确认

独占订阅或故障切换订阅的消费者能够对消息进行单条确认和累积确认;共享订阅的消费者只允许对消息进行单条确认。单条确认消息的能力为处理消费者故障提供了更好的体验。对于某些应用来说,处理一条消息可能需要很长时间或者非常昂贵,防止重新传送已经确认的消息非常重要。这个管理 Ack 的专门的数据结构–游标(Cursor),由 Broker 来管理,利用 BookKeeper 的 Ledger 提供存储,在后面的文章中我们会介绍更多的关于游标(Cursor)的细节。Apache Pulsar 提供了灵活的消息消费订阅类型和消息确认方法,通过简单的统一的 API,就可以支持各种消息和流的使用场景。

Pulsar 的消息保留(Retention)

在消息被确认后,Pulsar 的 Broker 会更新对应的游标。当 Topic 里面中的一条消息,被所有的订阅都确认 ack 后,才能删除这条消息。Pulsar 还允许通过设置保留时间,将消息保留更长时间,即使所有订阅已经确认消费了它们。下图说明了如何在有 2 个订阅的主题中保留消息。订阅 A 在 M6 和订阅 B 已经消耗了 M10 之前的所有消息之前已经消耗了所有消息。这意味着 M6 之前的所有消息(灰色框中)都可以安全删除。订阅 A 仍未使用 M6 和 M9 之间的消息,无法删除它们。如果主题配置了消息保留期,则消息 M0 到 M5 将在配置的时间段内保持不变,即使 A 和 B 已经确认消费了它们。

消息保留

在消息保留策略中,Pulsar 还支持消息生存时间(TTL)。如果消息未在配置的 TTL 时间段内被任何消费者使用,则消息将自动标记为已确认。消息保留期消息 TTL 之间的区别在于:消息保留期作用于标记为已确认并设置为已删除的消息,而 TTL 作用于未 ack 的消息。上面的图例中说明了 Pulsar 中的 TTL。例如,如果订阅 B 没有活动消费者,则在配置的 TTL 时间段过后,消息 M10 将自动标记为已确认,即使没有消费者实际读取该消息。