8.3 使用 Kafka 发送消息
8.3 使用 Kafka 发送消息
Apache Kafka 是我们在本章中研究的最新消息传递选项。乍一看,Kafka 是一个消息代理,就像 ActiveMQ、Artemis 或 Rabbit 一样。但是 Kafka 有一些独特的技巧。
Kafka 被设计为在集群中运行,提供了巨大的可伸缩性。通过将其 topic 划分到集群中的所有实例中,它具有很强的弹性。RabbitMQ 主要处理 exchange 中的队列,而 Kafka 仅利用 topic 来提供消息的发布/订阅。
Kafka topic 被复制到集群中的所有 broker 中。集群中的每个节点充当一个或多个 topic 的 leader,负责该 topic 的数据并将其复制到集群中的其他节点。
更进一步说,每个 topic 可以分成多个分区。在这种情况下,集群中的每个节点都是一个 topic 的一个或多个分区的 leader,但不是整个 topic 的 leader。该 topic 的职责由所有节点分担。图 8.2 说明了这是如何工作的。图 8.2 Kafka 集群由多个 broker 组成,每一个都作为 topic 分区的 leader
![图 8.2](E:\Document\spring-in-action-v5-translate\第二部分 集成 Spring\第 8 章 发送异步消息\图 8.2.jpg)
由于 Kafka 独特的构建风格,我鼓励你在迪伦·斯科特(Dylan Scott,2017)的*《Kafka 实战》*中阅读更多关于它的内容。出于我们的目的,我们将重点讨论如何使用 Spring 向 Kafka 发送和接收消息。
8.3.1 设置 Spring 的 Kafka
要开始使用 Kafka 进行消息传递,需要将适当的依赖项添加到构建中。但是,与 JMS 和 RabbitMQ 不同,Kafka 没有 Spring Boot starter。不过还是只需要一个依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
这个依赖项将 Kafka 所需的一切都带到项目中。更重要的是,它的存在将触发 Kafka 的 Spring Boot 自动配置,它将在 Spring 应用程序上下文中生成一个 KafkaTemplate。你所需要做的就是注入 KafkaTemplate 并开始发送和接收消息。
然而,在开始发送和接收消息之前,应该了解一些在使用 Kafka 时会派上用场的属性。具体来说就是,KafkaTemplate 默认在 localhost 上运行 Kafka broker,并监听 9092 端口。在开发应用程序时,在本地启动 Kafka broker 是可以的,但是在进入生产环境时,需要配置不同的主机和端口。
spring.kafka.bootstrap-servers 属性设置一个或多个 Kafka 服务器的位置,用于建立到 Kafka 集群的初始连接。例如,如果集群中的 Kafka 服务器之一运行在 Kafka .tacocloud.com 上,并监听 9092 端口,那么可以在 YAML 中像这样配置它的位置:
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
但是注意 spring.kafka.bootstrap-servers 属性是复数形式,它接受一个列表。因此,可以在集群中为它提供多个 Kafka 服务器:
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9094
在项目中设置了 Kafka 之后,就可以发送和接收消息了。首先来看看 KafkaTemplate 将 Order 对象发送给 Kafka。
8.3.2 使用 KafkaTemplate 发送消息
在许多方面,KafkaTemplate 与 JMS 和 RabbitMQ 类似。与此同时,它也是不同的,尤其是在我们考虑它发送消息的方法时:
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
注意到的第一件事是没有 convertAndSend() 方法。这是因为 KafkaTemplate 是用的泛型,同时能够在发送消息时直接处理域类型。在某种程度上,所有的 send() 方法都在做 convertAndSend() 的工作。
再者 send() 和 sendDefault() 的参数,它们与 JMS 和 Rabbit 中使用的参数完全不同。当使用 Kafka 发送消息时,可以指定以下参数来指导如何发送消息:
- 发送消息的 topic(send() 方法必要的参数)
- 写入 topic 的分区(可选)
- 发送记录的键(可选)
- 时间戳(可选;默认为 System.currentTimeMillis())
- payload(必须)
topic 和 payload 是两个最重要的参数。分区和键对如何使用 KafkaTemplate 几乎没有影响,除了作为 send() 和 sendDefault() 的参数用于提供额外信息。出于我们的目的,我们将把重点放在将消息有效负载发送到给定主题上,而不考虑分区和键。
对于 send() 方法,还可以选择发送一个 ProducerRecord,它与在单个对象中捕获所有上述参数的类型差不多。也可以发送 Message 对象,但是这样做需要将域对象转换为 Message。通常,使用其他方法比创建和发送 ProducerRecord 或 Message 对象更容易。
使用 KafkaTemplate 及其 send() 方法,可以编写一个基于 kafka 的 OrderMessagingService 实现。下面的程序清单显示了这样一个实现。程序清单 8.8 使用 KafkaTemplate 发送订单
package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}
在 OrderMessagingService 的这个实现中,sendOrder() 方法使用注入的 KafkaTemplate 的 send() 方法向名为 tacocloud.orders.topic 的主题发送 Order。代码中除了使用 “Kafka” 这个名称外,这与为 JMS 和 Rabbit 编写的代码没有太大的不同。
如果设置了默认主题,可以稍微简化 sendOrder() 方法。首先,通过设置 spring.kafka.template.default-topic 属性,将默认主题设置为 tacocloud.orders.topic:
spring:
kafka:
template:
default-topic: tacocloud.orders.topic
然后,在 sendOrder() 方法中,可以调用 sendDefault() 而不是 send(),并且不指定主题名称:
@Override
public void sendOrder(Order order) {
kafkaTemplate.sendDefault(order);
}
现在已经编写了消息发送代码了,让我们将注意力转向编写从 Kafka 接收这些消息的代码。
8.3.3 编写 Kafka 监听器
除了 send() 和 sendDefault() 的惟一方法签名之外,KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于它不提供任何接收消息的方法。这意味着使用 Spring 消费来自 Kafka 主题的消息的唯一方法是编写消息监听器。
对于 Kafka,消息监听器被定义为被 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener 和 @RabbitListener,其使用方式大致相同。下面程序清单显示了为 Kafka 编写的基于 listener 的订单接收程序。程序清单 8.9 使用 @KafkaListener 接收订单
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order) {
ui.displayOrder(order);
}
}
handle() 方法由 @KafkaListener 注解,表示当消息到达名为 tacocloud.orders.topic 的主题时应该调用它。正如程序清单 8.9 中所写的,只为 handle() 方法提供了一个 Order(payload)参数 。但是,如果需要来自消息的其他元数据,它也可以接受一个 ConsumerRecord 或 Message 对象。
例如,handle() 的以下实现接受一个 ConsumerRecord,这样就可以记录消息的分区和时间戳:
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}
类似地,可以使用 Message 而不是 ConsumerRecord,并达到同样的效果:
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}
值得注意的是,消息有效负载也可以通过 ConsumerRecord.value() 或 Message.getPayload() 获得。这意味着可以通过这些对象请求 Order,而不是直接将其作为 handle() 的参数。