8.3 使用Kafka 发送消息
          8.3 使用Kafka 发送消息
更进一步说,每个
由于
8.3.1 设置Spring 的Kafka 
要开始使用
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
这个依赖项将
然而,在开始发送和接收消息之前,应该了解一些在使用
spring:
  kafka:
    bootstrap-servers:
      - kafka.tacocloud.com:9092
但是注意
spring:
  kafka:
    bootstrap-servers:
      - kafka.tacocloud.com:9092
      - kafka.tacocloud.com:9093
      - kafka.tacocloud.com:9094
在项目中设置了
8.3.2 使用KafkaTemplate 发送消息
在许多方面,
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
注意到的第一件事是没有
再者
- 发送消息的topic (send() 方法必要的参数)
- 写入topic 的分区(可选)
- 发送记录的键(可选)
- 时间戳(可选;默认为System.currentTimeMillis() )
- payload(必须)
对于
使用
package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
    private KafkaTemplate<String, Order> kafkaTemplate;
    @Autowired
    public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    @Override
    public void sendOrder(Order order) {
        kafkaTemplate.send("tacocloud.orders.topic", order);
    }
}
在
如果设置了默认主题,可以稍微简化
spring:
  kafka:
    template:
      default-topic: tacocloud.orders.topic
然后,在
@Override
public void sendOrder(Order order) {
    kafkaTemplate.sendDefault(order);
}
现在已经编写了消息发送代码了,让我们将注意力转向编写从
8.3.3 编写Kafka 监听器
除了
对于
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
    private KitchenUI ui;
    @Autowired
    public OrderListener(KitchenUI ui) {
        this.ui = ui;
    }
    @KafkaListener(topics="tacocloud.orders.topic")
    public void handle(Order order) {
        ui.displayOrder(order);
    }
}
例如,
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
    log.info("Received from partition {} with timestamp {}",
             record.partition(), record.timestamp());
    ui.displayOrder(order);
}
类似地,可以使用
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
    MessageHeaders headers = message.getHeaders();
    log.info("Received from partition {} with timestamp {}",
             headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
             headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
    ui.displayOrder(order);
}
值得注意的是,消息有效负载也可以通过
