SpringBoot中使用SpringIntegration

spring-integration是一个功能强大的EIP(Enterprise Integration Patterns),即企业集成模式。对,spring-integration是一个集大成者。就我自己的理解,集成了众多功能的它,是一种便捷的事件驱动消息框架用来在系统之间做消息传递的。 今天我们来探索一下如果使用它发送http请求。

什么是Spring Integration

Spring Integration在基于Spring的应用程序中实现轻量级消息传递,并支持通过声明适配器与外部系统集成。 Spring Integration的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点的分离,这对于生成可维护,可测试的代码至关重要。Spring Framework鼓励开发人员使用接口进行编码,并使用依赖注入(DI)为普通旧Java对象(POJO)提供执行其任务所需的依赖项。 Spring Integration将这一概念更进一步,其中POJO使用消息传递范例连接在一起,并且各个组件可能不了解应用程序中的其他组件。这种应用程序是通过组装细粒度可重用组件来构建的,以形成更高级别的功能。通过精心设计,这些流程可以模块化,并在更高的层次上重复使用。

官方地址:spring-integration

组成

Spring Integration 主要有Message、Channel、Message EndPoint组成。

Message

Message是用来在不同部分之间传递的数据。Message有两部分组成:消息体(payload)与消息头(header)。消息体可以是任何数据类型;消息头表示的元数据就是解释消息体的内容。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * A generic message representation with headers and body.
 *
 * @author Mark Fisher
 * @author Arjen Poutsma
 * @since 4.0
 * @see org.springframework.messaging.support.MessageBuilder
 */
public interface Message<T> {

    /**
     * Return the message payload.
     */
    T getPayload();

    /**
     * Return message headers for the message (never {@code null} but may be empty).
     */
    MessageHeaders getHeaders();

}

Channel

在消息系统中,消息发送者发送消息到通道(Channel),消息接受者从通道(Channel)接收消息。 MessageChannel 有两大子接口,分别是PollableChannel (可轮询)和SubscribableChannel(可订阅)。我们所有的消息通道类都是现实这两个接口。

MessageChannel

MessageChannel 是Spring Integration消息通道的顶级接口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public interface MessageChannel {

    /**
     * Constant for sending a message without a prescribed timeout.
     */
    long INDEFINITE_TIMEOUT = -1;


    /**
     * Send a {@link Message} to this channel. If the message is sent successfully,
     * the method returns {@code true}. If the message cannot be sent due to a
     * non-fatal reason, the method returns {@code false}. The method may also
     * throw a RuntimeException in case of non-recoverable errors.
     * <p>This method may block indefinitely, depending on the implementation.
     * To provide a maximum wait time, use {@link #send(Message, long)}.
     * @param message the message to send
     * @return whether or not the message was sent
     */
    boolean send(Message<?> message);

    /**
     * Send a message, blocking until either the message is accepted or the
     * specified timeout period elapses.
     * @param message the message to send
     * @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT}
     * @return {@code true} if the message is sent, {@code false} if not
     * including a timeout of an interrupt of the send
     */
    boolean send(Message<?> message, long timeout);

}
PollableChannel

PollableChannel 具备轮询获得消息的能力。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public interface PollableChannel extends MessageChannel {

    /**
     * Receive a message from this channel, blocking indefinitely if necessary.
     * @return the next available {@link Message} or {@code null} if interrupted
     */
    Message<?> receive();

    /**
     * Receive a message from this channel, blocking until either a message is available
     * or the specified timeout period elapses.
     * @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}.
     * @return the next available {@link Message} or {@code null} if the specified timeout
     * period elapses or the message reception is interrupted
     */
    Message<?> receive(long timeout);

}
SubscribableChannel

SubscribableChannel 发送消息给订阅了MessageHanlder的订阅者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public interface SubscribableChannel extends MessageChannel {

    /**
     * Register a message handler.
     * @return {@code true} if the handler was subscribed or {@code false} if it
     * was already subscribed.
     */
    boolean subscribe(MessageHandler handler);

    /**
     * Un-register a message handler.
     * @return {@code true} if the handler was un-registered, or {@code false}
     * if was not registered.
     */
    boolean unsubscribe(MessageHandler handler);

}

消息通道

PublishSubscribeChannel

PublishSubscribeChannel允许广播消息给所有订阅者,配置方式如下:

1
2
3
4
5
6
7
8
9
/**
 * 允许广播消息给所有订阅者,当前消息通道的id为publishSubscribeChannel
 * @return
 */
@Bean
public PublishSubscribeChannel publishSubscribeChannel(){
	PublishSubscribeChannel channel = new PublishSubscribeChannel();
	return channel;
}
QueueChannel

QueueChannel允许消息接收者轮询获得消息,用一个队列(queue)接收消息,队列的容量大小可配置,配置方式如下:

1
2
3
4
5
@Bean
public QueueChannel queueChannel(){
	QueueChannel channel = new QueueChannel(10);//10的队列的容量
	return channel;
}
PriorityChannel

PriorityChannel可按照优先级将数据存储到队列,它依据于消息的消息头priority属性,配置方式如下:

1
2
3
4
5
@Bean
public PriorityChannel priorityChannel(){
	PriorityChannel channel = new PriorityChannel(10);
	return channel;
}
RendezvousChannel

RendezvousChannel确保每一个接收者都接收到消息后再发送消息,配置方式如下:

1
2
3
4
5
@Bean
public RendezvousChannel rendezvousChannel(){
	RendezvousChannel channel = new RendezvousChannel();
	return channel;
}
DirectChannel

DirectChannel是Spring Integration默认的消息通道,它允许将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,配置方式如下:

1
2
3
4
5
@Bean
public DirectChannel directChannel(){
	DirectChannel channel = new DirectChannel();
	return channel;
}
ExecutorChannel

ExecutorChannel可绑定一个多线程的task executor,配置方式如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Bean
public ExecutorChannel executorChannel(){
	ExecutorChannel channel = new ExecutorChannel(executor());
	return channel;
}

@Bean
public Executor executor(){
	ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
	taskExecutor.setCorePoolSize(5);
	taskExecutor.setMaxPoolSize(10);
	taskExecutor.setQueueCapacity(25);
	taskExecutor.initialize();
	return taskExecutor;
}

通道拦截器

ChannelInterceptor

Spring Integration给消息通道提供了通道拦截器(ChannelInterceptor),用来拦截发送和接收消息的操作。 ChannelInterceptor接口定义如下,我们只需要实现这个接口即可:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public interface ChannelInterceptor {

	Message<?> preSend(Message<?> message, MessageChannel channel);

	void postSend(Message<?> message, MessageChannel channel, boolean sent);

	void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

	boolean preReceive(MessageChannel channel);

	Message<?> postReceive(Message<?> message, MessageChannel channel);

	void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);

}

Message EndPoint

消息端点(Message EndPoint)是真正处理消息的(Message)组件,它还可以控制通道的路由。

Channel Adapter

通道适配器(Channel Adapter)是一种连接外部系统或传输协议的端点(EndPoint),可以分为入站(inbound)和出站(outbound)。  通道适配器是单向的,入站通道适配器只支持接收消息,出站通道适配器只支持输出消息。 Spring Integration内置了如下的适配器:

1
RabbitMQ、Feed、File、FTP/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMITwitter、XMPP、WebServices(SOAP、REST)、WebSocket

Gateway

消息网关(Gateway)类似于Adapter,但是提供了双向的请求/返回集成方式,也分为入站(inbound)和出站(outbound)。 Spring Integration 对响应的Adapter都提供了Gateway。

Service Activator

Service Activator 可调用Spring的Bean来处理消息,并将处理后的结果输出到指定的消息通道。

Router

路由(Router) 可根据消息体内容(Payload Type Router)、消息头的值(Header Value Router) 以及定义好的接收表(Recipient List Router) 作为条件,来决定消息传递到的通道。

Filter

过滤器(Filter) 类似于路由(Router),不同的是过滤器不决定消息路由到哪里,而是决定消息是否可以传递给消息通道。

Splitter

拆分器(Splitter)将消息拆分为几个部分单独处理,拆分器处理的返回值是一个集合或者数组。

Aggregator

聚合器(Aggregator)与拆分器相反,它接收一个java.util.List作为参数,将多个消息合并为一个消息。

Enricher

当我们从外部获得消息后,需要增加额外的消息到已有的消息中,这时就需要使用消息增强器(Enricher)。消息增强器主要有消息体 增强器(Payload Enricher)和消息头增强器(Header Enricher)两种。

Transformer

转换器(Transformer)是对获得的消息进行一定的转换处理(如数据格式转换)。

Bridge

使用连接桥(Bridge)可以简单的将两个消息通道连接起来。

快速入门

阅读官方文档它是这样描述的:

In the following “quick start” application you can see that the same gateway interface is used to invoke two completely different service implementations. To build and run this program you will need the spring-integration-ws and spring-integration-xml modules as described above.

大概意思是使用这个示例,需要spring-integration-wsspring-integration-xml两个模块,同时它使用了相同的网关接口调用了两个不同的服务实现。

编写快速启动类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class Main {

        public static void main(String... args) throws Exception {
                ApplicationContext ctx =
                        new ClassPathXmlApplicationContext("context.xml");
                // Simple Service
                TempConverter converter =
                        ctx.getBean("simpleGateway", TempConverter.class);
                System.out.println(converter.fahrenheitToCelcius(68.0f));
                // Web Service
                converter  = ctx.getBean("wsGateway", TempConverter.class);
                System.out.println(converter.fahrenheitToCelcius(68.0f));
        }
 }

编写接口类

1
2
3
4
5
public interface TempConverter {

        float fahrenheitToCelcius(float fahren);

}

编写xml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!-- Simple Service -->
<!-- 网关:simpleGateway -->
<int:gateway id="simpleGateway"
        service-interface="foo.TempConverter"
        default-request-channel="simpleExpression" />

<int:service-activator id="expressionConverter"
        input-channel="simpleExpression"
        expression="(payload - 32) / 9 * 5"/>

<!-- Web Service -->

<int:gateway id="wsGateway" service-interface="foo.TempConverter"
        default-request-channel="viaWebService" />

<int:chain id="wsChain" input-channel="viaWebService">
        <int:transformer
           expression="'&lt;FahrenheitToCelsius xmlns=&quot;https://www.w3schools.com/xml/&quot;&gt;&lt;Fahrenheit&gt;XXX&lt;/Fahrenheit&gt;&lt;/FahrenheitToCelsius&gt;'.replace('XXX', payload.toString())" />
        <int-ws:header-enricher>
                <int-ws:soap-action value="https://www.w3schools.com/xml/FahrenheitToCelsius"/>
        </int-ws:header-enricher>
        <int-ws:outbound-gateway
                uri="https://www.w3schools.com/xml/tempconvert.asmx"/>
        <int-xml:xpath-transformer
                xpath-expression="/*[local-name()='FahrenheitToCelsiusResponse']/*[local-name()='FahrenheitToCelsiusResult']"/></int:chain>

咦,嫌弃,和我们想用的内容有差距啊,,既然用SpringBoot那就得跟上节奏,JavaBean配置,接着往下看

And here is the same application (web service part) using the Java DSL (and Spring Boot). You will need the spring-boot-starter-integration dependency or spring-integration-java-dsl directly if you don’t use Spring Boot. If you use Spring Integration starting version 5.0, you don’t need any additional dependencies - the Java DSL is included to the core project

进入正题了,大意是使用JavaBean配置或者spring boot的示例,如果不实用Spring Boot那么就需要咱们手动加入spring-boot-starter-integration或者spring-integration-java-dsl所依赖的模块,如果使用的是5.0及以上的版本,那么你不需要手动引入依赖,Java DSL已经包含在其中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
@SpringBootApplication
@IntegrationComponentScan
public class Application {

  public static void main(String[] args) {
    ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);
    TempConverter converter = ctx.getBean(TempConverter.class);
    System.out.println(converter.fahrenheitToCelcius(68.0f));
    ctx.close();
  }

  @MessagingGateway
  public interface TempConverter {

    @Gateway(requestChannel = "convert.input")
    float fahrenheitToCelcius(float fahren);

  }

  @Bean
  public IntegrationFlow convert() {
      return f -> f
        .transform(payload ->
              "<FahrenheitToCelsius xmlns=\"https://www.w3schools.com/xml/\">"
            +     "<Fahrenheit>" + payload + "</Fahrenheit>"
            + "</FahrenheitToCelsius>")
        .enrichHeaders(h -> h
            .header(WebServiceHeaders.SOAP_ACTION,
                "https://www.w3schools.com/xml/FahrenheitToCelsius"))
        .handle(new SimpleWebServiceOutboundGateway(
            "https://www.w3schools.com/xml/tempconvert.asmx"))
        .transform(Transformers.xpath("/*[local-name()=\"FahrenheitToCelsiusResponse\"]"
            + "/*[local-name()=\"FahrenheitToCelsiusResult\"]"));
  }

}

开始使用

翻阅了一遍官网的samples,根本没找到http方式关于SpringBoot的示例,没事,咱们活学活用,参考其他模块,直接上代码

pom中加入依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-integration</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.integration</groupId>
		<artifactId>spring-integration-http</artifactId>
		<exclusions>
			<exclusion>
				<artifactId>jackson-module-kotlin</artifactId>
				<groupId>com.fasterxml.jackson.module</groupId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
</dependencies>

编写测试类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

	@Autowired
	private Config.TempConverter tempConverter;

	@Test
	public void test() {
		//字符串方式
		final String info = tempConverter.hello("homeant");
		log.debug("info:{}", info);
		//实体对象方式
		log.debug("userInfo:{}", tempConverter.userInfo("homeant"));
	}


	@Configuration
	@EnableIntegration
	@IntegrationComponentScan
	@EnableAutoConfiguration
	public static class Config {
		@MessagingGateway
		public interface TempConverter {

			@Gateway(requestChannel = "convert.input")
			String hello(String name);

			@Gateway(requestChannel = "userInfo.input")
			UserInfo userInfo(String name);

		}

		@Bean
		public IntegrationFlow convert() {
			return f -> f.handle(Http.outboundGateway("https://api.github.com/users/{name}")
            .httpMethod(HttpMethod.GET)
            .expectedResponseType(String.class).uriVariable("name", r -> r.getPayload()))
            .log(LoggingHandler.Level.DEBUG).bridge();
		}

		@Bean
		public IntegrationFlow userInfo() {
			return f -> f.handle(Http.outboundGateway("https://api.github.com/users/{name}")
            .uriVariable("name", r -> r.getPayload())
            .httpMethod(HttpMethod.GET)
            .expectedResponseType(UserInfo.class))
            .log(LoggingHandler.Level.DEBUG).bridge();
		}
	}
}

启动测试类

1
2
[2019-09-30 22:11:54.977] [main] [DEBUG] [fun.vyse.cloud.test.ApplicationTest     :34  ] - info:{"login":"homeant","id":28803295,"node_id":"MDQ6VXNlcjI4ODAzMjk1","avatar_url":"https://avatars0.githubusercontent.com/u/28803295?v=4","gravatar_id":"","url":"https://api.github.com/users/homeant","html_url":"https://github.com/homeant","followers_url":"https://api.github.com/users/homeant/followers","following_url":"https://api.github.com/users/homeant/following{/other_user}","gists_url":"https://api.github.com/users/homeant/gists{/gist_id}","starred_url":"https://api.github.com/users/homeant/starred{/owner}{/repo}","subscriptions_url":"https://api.github.com/users/homeant/subscriptions","organizations_url":"https://api.github.com/users/homeant/orgs","repos_url":"https://api.github.com/users/homeant/repos","events_url":"https://api.github.com/users/homeant/events{/privacy}","received_events_url":"https://api.github.com/users/homeant/received_events","type":"User","site_admin":false,"name":"俊晨","company":null,"blog":"https://tianhui.xin","location":"China-BeiJing","email":null,"hireable":null,"bio":"高山仰止,景行行止。虽不能至,心向往之","public_repos":15,"public_gists":0,"followers":3,"following":3,"created_at":"2017-05-19T08:53:17Z","updated_at":"2019-09-27T06:10:27Z"}
[2019-09-30 22:11:55.971] [main] [DEBUG] [fun.vyse.cloud.test.ApplicationTest     :35  ] - userInfo:UserInfo(login=homeant, id=28803295, node_id=MDQ6VXNlcjI4ODAzMjk1, avatar_url=https://avatars0.githubusercontent.com/u/28803295?v=4, gravatar_id=, url=https://api.github.com/users/homeant, html_url=https://github.com/homeant, followers_url=https://api.github.com/users/homeant/followers, following_url=https://api.github.com/users/homeant/following{/other_user}, gists_url=https://api.github.com/users/homeant/gists{/gist_id}, starred_url=https://api.github.com/users/homeant/starred{/owner}{/repo}, subscriptions_url=https://api.github.com/users/homeant/subscriptions, organizations_url=https://api.github.com/users/homeant/orgs, repos_url=https://api.github.com/users/homeant/repos, events_url=https://api.github.com/users/homeant/events{/privacy}, received_events_url=https://api.github.com/users/homeant/received_events, type=User, site_admin=false, name=俊晨, company=null, blog=https://tianhui.xin, location=China-BeiJing, email=null, hireable=null, bio=高山仰止,景行行止。虽不能至,心向往之, public_repos=15, public_gists=0, followers=3, following=3, created_at=2017-05-19T08:53:17Z, updated_at=2019-09-27T06:10:27Z)

参考文档