9.1 声明简单的集成流

9.1 声明简单的集成流

一般来说,Spring Integration 支持创建集成流,应用程序可以通过这些集成流接收或发送数据到应用程序本身的外部资源。应用程序可以与之集成的一种资源是文件系统。因此,在 Spring Integration 的众多组件中,有用于读写文件的通道适配器。

为了熟悉 Spring Integration,将创建一个向文件系统写入数据的集成流。首先,需要将 Spring Integration 添加到项目构建中。对于 Maven,必要的依赖关系如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-integration-file</artifactId>
</dependency>

第一个依赖项是 Spring Integration 的 Spring Boot starter。无论 Spring Integration 流可能与什么集成,这种依赖关系都是开发 Spring Integration 流所必需的。与所有 Spring Boot starter 依赖项一样,它也存在于 Initializr 的复选框表单中。

第二个依赖项是 Spring Integration 的文件端点模块。此模块是用于与外部系统集成的二十多个端点模块之一。我们将在第 9.2.9 节中更多地讨论端点模块。但是,就目前而言,要知道文件端点模块提供了将文件从文件系统提取到集成流或将数据从流写入文件系统的能力。

接下来,需要为应用程序创建一种将数据发送到集成流的方法,以便将数据写入文件。为此,将创建一个网关接口,如下面所示。程序清单 9.1 消息网关接口,用于将方法转换为消息

package sia5;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel="textInChannel")
public interface FileWriterGateway {
    void writeToFile(
        @Header(FileHeaders.FILENAME) String filename,
        String data);
}

尽管它是一个简单的 Java 接口,但是关于 FileWriterGateway 还有很多要说的。首先会注意到它是由 @MessagingGateway 注解的。这个注解告诉 Spring Integration 在运行时生成这个接口的实现 —— 类似于 Spring Data 如何自动生成存储库接口的实现。当需要编写文件时,代码的其他部分将使用这个接口。

@MessagingGateway 的 defaultRequestChannel 属性表示,对接口方法的调用产生的任何消息都应该发送到给定的消息通道。在本例中,声明从 writeToFile() 调用产生的任何消息都应该发送到名为 textInChannel 的通道。

对于 writeToFile() 方法,它接受一个文件名作为字符串,另一个字符串包含应该写入文件的文本。关于此方法签名,值得注意的是 filename 参数使用 @Header 进行了注解。在本例中,@Header 注解指示传递给 filename 的值应该放在消息头中(指定为 FileHeaders),解析为 file_name 的文件名,而不是在消息有效负载中。另一方面,数据参数值则包含在消息有效负载中。

现在已经有了一个消息网关,还需要配置集成流。尽管添加到构建中的 Spring Integration starter 依赖项支持 Spring Integration 的基本自动配置,但仍然需要编写额外的配置来定义满足应用程序需求的流。声明集成流的三个配置选项包括:

  • XML 配置
  • Java 配置
  • 使用 DSL 进行 Java 配置

我们将对 Spring Integration 的这三种风格的配置进行讲解,从最原始的 XML 配置开始。

9.1.1 使用 XML 定义集成流

尽管在本书中我避免使用 XML 配置,但 Spring Integration 在 XML 中定义的集成流方面有着悠久的历史。因此,我认为值得至少展示一个 XML 定义的集成流示例。下面的程序清单显示了如何在 XML 中配置流。程序清单 9.2 使用 Spring XML 配置定义集成流

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/integration
           http://www.springframework.org/schema/integration/spring-integration.xsd
           http://www.springframework.org/schema/integration/file
           http://www.springframework.org/schema/integration/file/
                           springintegration-file.xsd">

    <int:channel id="textInChannel" />
    <int:transformer id="upperCase" input-channel="textInChannel"
          output-channel="fileWriterChannel" expression="payload.toUpperCase()" />
    <int:channel id="fileWriterChannel" />
    <int-file:outbound-channel-adapter id="writer" channel="fileWriterChannel"
          directory="/tmp/sia5/files" mode="APPEND" append-new-line="true" />
</beans>

分析一下程序清单 9.2 中的 XML:

  • 配置了一个名为 textInChannel 的通道,这与为 FileWriterGateway 设置的请求通道是相同的。当在 FileWriterGateway 上调用 writeToFile() 方法时,结果消息被发布到这个通道。
  • 配置了一个转换器来接收来自 textInChannel 的消息。它使用 Spring Expression Language(SpEL)表达式在消息有效负载上调用 toUpperCase()。然后将大写操作的结果发布到 fileWriterChannel 中。
  • 配置了一个名为 fileWriterChannel 的通道,此通道用作连接转换器和外部通道适配器的管道。
  • 最后,使用 int-file 命名空间配置了一个外部通道适配器。这个 XML 命名空间由 Spring Integration 的文件模块提供,用于编写文件。按照配置,它将接收来自 fileWriterChannel 的消息,并将消息有效负载写到一个文件中,该文件的名称在消息的 file_name 头中指定,该文件位于 directory 属性中指定的目录中。如果文件已经存在,则将用换行来追加文件,而不是覆盖它。

如果希望在 Spring Boot 应用程序中使用 XML 配置,则需要将 XML 作为资源导入 Spring 应用程序。最简单的方法是在应用程序的 Java 配置类上使用 Spring 的 @ImportResource 注解:

@Configuration
@ImportResource("classpath:/filewriter-config.xml")
public class FileWriterIntegrationConfig { ... }

尽管基于 XML 的配置很好地服务于 Spring Integration,但大多数开发人员对使用 XML 越来越谨慎。(正如我所说的,我在本书中避免使用 XML 配置。)让我们把这些尖括号放在一边,将注意力转向 Spring Integration 的 Java 配置风格。

9.1.2 在 Java 中配置集成流

大多数现代 Spring 应用程序都避开了 XML 配置,而采用了 Java 配置。实际上,在 Spring Boot 应用程序中,Java 配置是自动配置的自然补充。因此,如果要将集成流添加到 Spring Boot 应用程序中,那么在 Java 中定义该流是很有意义的。

作为如何使用 Java 配置编写集成流的示例,请查看下面的程序清单。这显示了与以前相同的文件编写集成流,但这次是用 Java 编写的。程序清单 9.3 使用 Java 配置定义集成流

package sia5;

import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.transformer.GenericTransformer;

@Configuration
public class FileWriterIntegrationConfig {

    @Bean
    @Transformer(inputChannel="textInChannel", outputChannel="fileWriterChannel")
    public GenericTransformer<String, String> upperCaseTransformer() {
        return text -> text.toUpperCase();
    }

    @Bean
    @ServiceActivator(inputChannel="fileWriterChannel")
    public FileWritingMessageHandler fileWriter() {
        FileWritingMessageHandler handler =
            new FileWritingMessageHandler(new File("/tmp/sia5/files"));
        handler.setExpectReply(false);
        handler.setFileExistsMode(FileExistsMode.APPEND);
        handler.setAppendNewLine(true);
        return handler;
    }
}

使用 Java 配置,可以声明两个 bean:一个转换器和一个文件写入消息处理程序。这里转换器是 GenericTransformer。因为 GenericTransformer 是一个函数接口,所以能够以 lambda 的形式提供在消息文本上调用 toUpperCase() 的实现。转换器的 bean 使用 @Transformer 进行注解,并将其指定为集成流中的转换器,该转换器接收名为 textInChannel 的通道上的消息,并将消息写入名为 fileWriterChannel 的通道。

至于文件写入 bean,它使用 @ServiceActivator 进行了注解,以指示它将接受来自 fileWriterChannel 的消息,并将这些消息传递给由 FileWritingMessageHandler 实例定义的服务。FileWritingMessageHandler 是一个消息处理程序,它使用消息的 file_name 头中指定的文件名将消息有效负载写入指定目录中的文件。与 XML 示例一样,将 FileWritingMessageHandler 配置为用换行符附加到文件中。

FileWritingMessageHandler bean 配置的一个独特之处是调用 setExpectReply(false) 来指示服务激活器不应该期望应答通道(通过该通道可以将值返回到流中的上游组件)。如果不调用 setExpectReply(),则文件写入 bean 默认为 true,尽管管道仍按预期工作,但将看到记录了一些错误,说明没有配置应答通道。

还会看到不需要显式地声明通道。如果不存在具有这些名称的 bean,就会自动创建 textInChannel 和 fileWriterChannel 通道。但是,如果希望对通道的配置方式有更多的控制,可以像这样显式地将它们构造为 bean:

@Bean
public MessageChannel textInChannel() {
    return new DirectChannel();
}
...
@Bean
public MessageChannel fileWriterChannel() {
    return new DirectChannel();
}

可以说,Java 配置选项更易于阅读,也更简洁,而且与我在本书中所追求的纯 Java 配置完全一致。但是,通过 Spring Integration 的 Java DSL(领域特定语言)配置风格,它可以变得更加精简。

9.1.3 使用 Spring Integration 的 DSL 配置

让我们进一步尝试定义文件编写集成流。这一次,仍然使用 Java 定义它,但是将使用 Spring Integration 的 Java DSL。不是为流中的每个组件声明一个单独的 bean,而是声明一个定义整个流的 bean。程序清单 9.4 为设计集成流提高流式 API

package sia5;

import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.support.FileExistsMode;

@Configuration
public class FileWriterIntegrationConfig {

    @Bean
    public IntegrationFlow fileWriterFlow() {
        return IntegrationFlows
            .from(MessageChannels.direct("textInChannel"))
            .<String, String>transform(t -> t.toUpperCase())
            .handle(Files.outboundAdapter(new File("/tmp/sia5/files"))
                    .fileExistsMode(FileExistsMode.APPEND)
                    .appendNewLine(true))
            .get();
    }
}

这个新配置尽可能简洁,用一个 bean 方法捕获整个流。IntegrationFlows 类初始化了这个构建者 API,可以从该 API 声明流。

在程序清单 9.4 中,首先从名为 textInChannel 的通道接收消息,然后该通道转到一个转换器,使消息有效负载大写。在转换器之后,消息由出站通道适配器处理,该适配器是根据 Spring Integration 的文件模块中提供的文件类型创建的。最后,调用 get() 构建要返回的 IntegrationFlow。简而言之,这个 bean 方法定义了与 XML 和 Java 配置示例相同的集成流。

注意,与 Java 配置示例一样,不需要显式地声明通道 bean。虽然引用了 textInChannel,但它是由 Spring Integration 自动创建的,因为没有使用该名称的现有通道 bean。但是如果需要,可以显式地声明通道 bean。

至于连接转换器和外部通道适配器的通道,甚至不需要通过名称引用它。如果需要显式配置通道,可以在流定义中通过调用 channel() 的名称引用:

@Bean
public IntegrationFlow fileWriterFlow() {
    return IntegrationFlows
        .from(MessageChannels.direct("textInChannel"))
        .<String, String>transform(t -> t.toUpperCase())
        .channel(MessageChannels.direct("fileWriterChannel"))
        .handle(Files.outboundAdapter(new File("/tmp/sia5/files"))
                .fileExistsMode(FileExistsMode.APPEND)
                .appendNewLine(true))
        .get();
}

在使用 Spring Integration 的 Java DSL(与任何流式 API 一样)时要记住的一件事是,必须巧妙地使用空白来保持可读性。在这里给出的示例中,我小心地缩进了行以表示相关代码块。对于更长、更复杂的流,甚至可以考虑将流的一部分提取到单独的方法或子流中,以获得更好的可读性。

现在已经看到了使用三种不同配置风格定义的简单流,让我们回过头来看看 Spring Integration 的全貌。

下一页