该版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6! |
示例应用
Spring AMQP 示例项目包括两个示例应用程序。 第一个是一个简单的“Hello World”示例,演示了同步和异步消息接收。 它为了解基本组件提供了一个很好的起点。 第二个示例基于股票交易用例,以演示实际应用程序中常见的交互类型。 在本章中,我们提供了每个示例的快速演练,以便您可以专注于最重要的组件。 这些示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何 Maven 感知 IDE(例如 SpringSource Tool Suite)中。
“Hello World”示例
“Hello World”示例演示了同步和异步消息接收。
您可以导入spring-rabbit-helloworld
sample 到 IDE 中,然后按照下面的讨论进行作。
同步示例
在src/main/java
目录中,导航到org.springframework.amqp.helloworld
包。
打开HelloWorldConfiguration
类,并注意它包含@Configuration
类级别的注释,并注意到一些@Bean
方法级别的注释。
这是 Spring 基于 Java 的配置示例。
你可以在这里阅读更多相关信息。
以下列表显示了如何创建连接工厂:
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
该配置还包含RabbitAdmin
,默认情况下,它会查找任何类型为 exchange、queue 或 binding 的 bean,然后在代理上声明它们。
事实上,helloWorldQueue
在HelloWorldConfiguration
是一个示例,因为它是Queue
.
以下列表显示了helloWorldQueue
豆子定义:
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顾rabbitTemplate
bean 配置,可以看到它的名称为helloWorldQueue
设置为其queue
属性(用于接收消息)和其routingKey
属性(用于发送消息)。
现在我们已经探索了配置,我们可以看看实际使用这些组件的代码。
首先,打开Producer
类。
它包含一个main()
方法,其中 SpringApplicationContext
被创建。
以下列表显示了main
方法:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,AmqpTemplate
bean 被检索并用于发送Message
.
由于客户端代码应尽可能依赖接口,因此类型为AmqpTemplate
而不是RabbitTemplate
.
即使 bean 在HelloWorldConfiguration
是RabbitTemplate
,依赖接口意味着这段代码更可移植(你可以独立于代码来更改配置)。
由于convertAndSend()
方法被调用时,模板委托给其MessageConverter
实例。
在这种情况下,它使用默认的SimpleMessageConverter
,但可以向rabbitTemplate
bean,如HelloWorldConfiguration
.
现在打开Consumer
类。
它实际上共享相同的配置基类,这意味着它共享rabbitTemplate
豆。
这就是为什么我们在该模板中配置了routingKey
(用于发送)和queue
(用于接收)。
正如我们在AmqpTemplate
,您可以改为将 'routingKey' 参数传递给 send 方法,将 'queue' 参数传递给 receive 方法。
这Consumer
代码基本上是 Producer 的镜像,调用receiveAndConvert()
而不是convertAndSend()
.
以下列表显示了Consumer
:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果您运行Producer
然后运行Consumer
,您应该看到Received: Hello World
在控制台输出中。
异步示例
同步示例演练了同步 Hello World 示例。
本节介绍一个稍微高级但功能更强大的选项。
通过一些修改,Hello World 示例可以提供异步接收的示例,也称为消息驱动的 POJO。
事实上,有一个子包提供了这一点:org.springframework.amqp.samples.helloworld.async
.
同样,我们从发送方开始。打开ProducerConfiguration
类,并注意它创建了一个connectionFactory
和rabbitTemplate
豆。 这一次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,并且RabbitTemplate
仅设置了 'routingKey' 属性。回想一下,消息被发送到交换,而不是直接发送到队列。AMQP 默认交换是没有名称的直接交换。所有队列都绑定到该默认交换,其名称作为路由密钥。这就是为什么我们只需要在此处提供路由密钥。
以下列表显示了rabbitTemplate
定义:
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于此示例演示了异步消息接收,因此生产端被设计为连续发送消息(如果它是像同步版本一样的按执行消息模型,那么它实际上是一个消息驱动的消费者就不会那么明显了)。负责连续发送消息的组件被定义为ProducerConfiguration
. 它配置为每三秒运行一次。
以下列表显示了组件:
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您不需要了解所有细节,因为真正的重点应该放在接收方(我们将在接下来介绍)。
但是,如果您还不熟悉 Spring 任务调度支持,您可以在此处了解更多信息。
简而言之,该postProcessor
bean 中的ProducerConfiguration
向调度程序注册任务。
现在我们可以转向接收方。
为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。
该类称为HelloWorldHandler
,如以下清单所示:
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
该类是 POJO。
它不扩展任何基类,不实现任何接口,甚至不包含任何导入。
它正在“适应”以适应MessageListener
Spring AMQP 的接口MessageListenerAdapter
.
然后,您可以在SimpleMessageListenerContainer
.
对于此示例,容器是在ConsumerConfiguration
类。
您可以在那里看到包裹在适配器中的 POJO。
以下列表显示了如何listenerContainer
定义为:
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
这SimpleMessageListenerContainer
是一个 Spring 生命周期组件,默认情况下会自动启动。
如果您查看Consumer
类,你可以看到它的main()
方法只包含一个单行引导程序,用于创建ApplicationContext
.
制作人的main()
方法也是一个单行引导程序,因为其方法被@Scheduled
也会自动启动。
您可以启动Producer
和Consumer
以任何顺序,您应该会看到每三秒发送一次和接收消息。
股票交易
股票交易示例演示了比 Hello World 示例更高级的消息传递方案。
但是,配置非常相似,只是更复杂一些。
由于我们详细介绍了 Hello World 配置,因此在这里,我们将重点介绍此示例的不同之处。
有一个服务器将市场数据(股票报价)推送到主题交易所。
然后,客户端可以通过将队列与路由模式(例如,app.stock.quotes.nasdaq.*
).
该演示的另一个主要功能是由客户端发起并由服务器处理的请求-回复“股票交易”交互。
这涉及一个私人replyTo
客户端在订单请求消息本身中发送的队列。
服务器的核心配置位于RabbitServerConfiguration
类中的org.springframework.amqp.rabbit.stocks.config.server
包。
它扩展了AbstractStockAppRabbitConfiguration
.
这是定义服务器和客户端通用资源的地方,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。
在该通用配置文件中,您还会看到JacksonJsonMessageConverter
在RabbitTemplate
.
特定于服务器的配置由两件事组成。
首先,它在RabbitTemplate
这样它就不需要在每次调用时提供该交换名称来发送Message
.
它在基本配置类中定义的抽象回调方法中执行此作。
以下列表显示了该方法:
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,声明股票请求队列。
在这种情况下,它不需要任何显式绑定,因为它绑定到默认的无名交换,并以自己的名称作为路由键。
如前所述,AMQP 规范定义了该行为。
以下列表显示了stockRequestQueue
豆:
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在您已经看到了服务器 AMQP 资源的配置,请导航到org.springframework.amqp.rabbit.stocks
package 下的src/test/java
目录。 在那里,你可以看到实际的Server
提供main()
方法。 它创建了一个ApplicationContext
基于server-bootstrap.xml
配置文件。在那里,您可以看到发布虚拟市场数据的计划任务。该配置依赖于 Spring 的task
命名空间支持。引导配置文件还导入了其他一些文件。最有趣的是server-messaging.xml
,位于src/main/resources
. 在那里,你可以看到messageListenerContainer
负责处理股票交易请求的 bean。最后,看看serverHandler
在server-handlers.xml
(也在 'src/main/resources' 中)。该 bean 是ServerHandler
类,并且是消息驱动的 POJO 的一个很好的例子,它也可以发送回复消息。
请注意,它本身未与框架或任何 AMQP 概念耦合。
它接受一个TradeRequest
并返回一个TradeResponse
.
以下列表显示了handleMessage
方法:
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在我们已经看到了服务器最重要的配置和代码,我们可以转向客户端。
最好的起点大概是RabbitClientConfiguration
在org.springframework.amqp.rabbit.stocks.config.client
包。
请注意,它声明了两个队列,但没有提供显式名称。
以下列表显示了两个队列的 Bean 定义:
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
这些是专用队列,并且会自动生成唯一名称。
客户端使用第一个生成的队列绑定到服务器已公开的市场数据交换。
回想一下,在 AMQP 中,使用者与队列交互,而生产者与交换交互。
队列与交换的“绑定”是告诉代理将消息从给定交换传递(或路由)到队列的内容。
由于市场数据交换是主题交换,因此绑定可以用路由模式来表达。
这RabbitClientConfiguration
使用Binding
对象,并且该对象是使用BindingBuilder
流畅的 API。
以下列表显示了Binding
:
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
请注意,实际值已在属性文件 (client.properties
下src/main/resources
),并且我们使用 Spring 的@Value
注释来注入该值。
这通常是一个好主意。
否则,该值将被硬编码在类中,并且无需重新编译即可无法修改。
在这种情况下,在更改用于绑定的路由模式时运行多个版本的客户端要容易得多。
我们现在可以尝试一下。
首先运行org.springframework.amqp.rabbit.stocks.Server
然后org.springframework.amqp.rabbit.stocks.Client
.
您应该看到虚拟引文NASDAQ
stocks,因为与 client.properties 中的“stocks.quote.pattern”键关联的当前值是“app.stock.quotes.nasdaq。'.
现在,在保留现有Server
和Client
运行时,将该属性值更改为“app.stock.quotes.nyse.' 并开始第二个Client
实例。
您应该看到第一个客户仍然收到纳斯达克报价,而第二个客户收到纽约证券交易所报价。
相反,您可以更改模式以获取所有股票甚至单个股票代码。
我们探索的最后一个功能是从客户角度出发的请求-回复交互。
回想一下,我们已经看到了ServerHandler
接受TradeRequest
对象和返回TradeResponse
对象。
上相应的代码Client
side 是RabbitStockServiceGateway
在org.springframework.amqp.rabbit.stocks.gateway
包。
它委托给RabbitTemplate
为了发送消息。
以下列表显示了send
方法:
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
请注意,在发送消息之前,它会将replyTo
地址。
它提供了由traderJoeQueue
bean 定义(如前面所示)。
以下列表显示了@Bean
定义StockServiceGateway
类本身:
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您不再运行服务器和客户端,请立即启动它们。 尝试发送格式为“100 TCKR”的请求。 在模拟请求“处理”的短暂人为延迟之后,您应该会在客户端上看到一条确认消息。
从非 Spring 应用程序接收 JSON
Spring 应用程序在发送 JSON 时,将TypeId
标头添加到完全限定的类名中,以帮助接收应用程序将 JSON 转换回 Java 对象。
这spring-rabbit-json
示例探索了几种从非 Spring 应用程序转换 JSON 的技术。