|
此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2! |
示例应用程序
Spring AMQP 示例项目包含两个示例应用程序。第一个是一个简单的“Hello World”示例,用于演示同步和异步消息接收。它为理解核心组件提供了一个极佳的起点。第二个示例基于股票交易用例,以展示在实际应用中常见的交互类型。在本章中,我们将快速浏览每个示例,以便您能专注于最重要的组件。两个示例均基于 Maven,因此您可以直接将它们导入任何支持 Maven 的 IDE(例如 SpringSource 工具套件)。
“Hello World” 示例
“Hello World”示例演示了同步和异步消息接收方式。
您可以将 spring-rabbit-helloworld 示例导入 IDE,然后按照下方的说明进行操作。
同步示例
在 src/main/java 目录中,导航到 org.springframework.amqp.helloworld 包。
打开 HelloWorldConfiguration 类,注意其在类级别包含 @Configuration 注解,并注意某些方法级别包含 @Bean 注解。
这是 Spring 基于 Java 的配置的一个示例。
您可在此处了解更多信息:此处。
以下列表显示了如何创建连接工厂:
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
配置中还包含一个 RabbitAdmin 实例,该实例默认会查找任何类型为 exchange、queue 或 binding 的 Bean,然后在 broker 上声明它们。实际上,HelloWorldConfiguration 中生成的 helloWorldQueue Bean 是一个示例,因为它是一个 Queue 的实例。
以下列表显示了 helloWorldQueue 个 Bean 定义:
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顾 rabbitTemplate Bean 配置,可以看到其 queue 属性(用于接收消息)被设置为 helloWorldQueue 的名称,而其 routingKey 属性(用于发送消息)则相应配置。
现在我们已经探索了配置,接下来可以查看实际使用这些组件的代码。首先,打开同一包内的 Producer 类。它包含一个 main() 方法,在该方法中创建了 Spring ApplicationContext。
以下列表显示了 main 方法:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,AmqpTemplate Bean 被检索并用于发送 Message。由于客户端代码应尽可能依赖接口,因此类型为 AmqpTemplate 而非 RabbitTemplate。尽管在 HelloWorldConfiguration 中创建的 Bean 是 RabbitTemplate 的实例,但依赖接口意味着该代码更具可移植性(您可独立于代码更改配置)。由于调用了 convertAndSend() 方法,模板会委托给其 MessageConverter 实例。在此情况下,它使用默认的 SimpleMessageConverter,但也可为 rabbitTemplate Bean 提供不同的实现,如 HelloWorldConfiguration 中所定义。
现在打开 Consumer 类。
它实际上共享了相同的配置基类,这意味着它共享了 rabbitTemplate 这个 Bean。
这就是为什么我们为该模板同时配置了 routingKey(用于发送)和 queue(用于接收)的原因。
正如我们在 AmqpTemplate 中所描述的那样,您也可以将 'routingKey' 参数传递给 send 方法,以及将 'queue' 参数传递给 receive 方法。Consumer 的代码基本上是生产者(Producer)的镜像版本,它调用的是 receiveAndConvert() 而非 convertAndSend()。
以下列表显示了 Consumer 的主方法:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果您运行 Producer,然后运行 Consumer,您应该在控制台输出中看到 Received: Hello World。
异步示例
同步示例 遍历了同步的“Hello World”示例。本节描述了一种稍显高级但功能显著更强的选项。通过少量修改,'Hello World' 示例可提供异步接收(也称为消息驱动型 POJO)的示例。事实上,存在一个子包,正好提供了这一点:org.springframework.amqp.samples.helloworld.async。
再次,我们从发送端开始。
打开 ProducerConfiguration 类并注意它创建了一个 connectionFactory 和一个 rabbitTemplate Bean。
这一次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,而 RabbitTemplate 仅设置了 'routingKey' 属性。
请回忆一下,消息是发送到交换机(exchange)而非直接发送到队列。
AMQP 默认交换机是一个无名的直接交换机(direct exchange)。
所有队列均绑定到该默认交换机,并以队列名称作为路由键(routing key)。
因此,我们在此处只需提供路由键即可。
以下列表显示了 rabbitTemplate 的定义:
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于本示例演示了异步消息接收,因此发送方被设计为持续发送消息(如果它采用每次执行发送一条消息的模型,如同步版本那样,则不太明显地表明它实际上是一个消息驱动型消费者)。负责持续发送消息的组件被定义为 ProducerConfiguration 内部的一个类。该组件被配置为每三秒运行一次。
以下列表显示了该组件:
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您无需理解所有细节,因为真正的重点应放在接收端(我们将在下文介绍)。
然而,如果您还不熟悉 Spring 的任务调度支持,可以在此 了解更多。
简而言之,postProcessor 在 ProducerConfiguration 中的 Bean 会将任务注册到调度器上。
现在我们可以转向接收端。
为了强调消息驱动的 POJO 行为,我们首先从响应消息的组件开始。
该类名为 HelloWorldHandler,其代码示例如下所示:
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
该类是一个普通的 Java 对象(POJO)。
它不继承任何基类,不实现任何接口,甚至不包含任何导入语句。
Spring AMQP 的 MessageListenerAdapter 接口通过“适配”方式将其适配到 MessageListener 接口上。
然后,您可以将此适配器配置到 SimpleMessageListenerContainer 上。
在本示例中,容器是在 ConsumerConfiguration 类中创建的。
您可以在那里看到 POJO 被包装在适配器中。
以下列表显示了如何定义 listenerContainer:
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
该 SimpleMessageListenerContainer 是一个 Spring 生命周期组件,默认情况下会自动启动。如果查看 Consumer 类,可以看到其 main() 方法仅包含一行用于创建 ApplicationContext 的启动代码。生产者的 main() 方法同样是一行启动代码,因为被 @Scheduled 注解所标注的方法所属的组件也会自动启动。您可以以任意顺序启动 Producer 和 Consumer,并应能看到每三秒有消息被发送和接收。
股票交易
股票交易示例演示了比“Hello World”示例更高级的消息传递场景。然而,其配置非常相似,只是略显复杂一些。由于我们已在“Hello World”配置中详细讲解过相关内容,此处我们将重点放在本示例的不同之处。有一个服务器会将市场数据(如股票报价)推送到一个主题交换机(topic exchange)。随后,客户端可通过绑定一个队列并使用路由模式(例如,app.stock.quotes.nasdaq.*)来订阅该市场数据流。此演示的另一主要功能是客户端发起、服务器处理的请求-响应式“股票交易”交互过程。这涉及一个由客户端在订单请求消息内部发送的私有replyTo队列。
服务器的核心配置位于 RabbitServerConfiguration 类中,该类位于 org.springframework.amqp.rabbit.stocks.config.server 包内。它继承自 AbstractStockAppRabbitConfiguration。此处定义了服务器与客户端共用的资源,包括市场数据主题交换(其名称为 'app.stock.marketdata')以及服务器为股票交易公开的队列(其名称为 'app.stock.request')。在该公共配置文件中,您还可以看到在 RabbitTemplate 上配置了一个 JacksonJsonMessageConverter。
服务器特定的配置包含两方面内容。首先,它在 RabbitTemplate 上配置了市场数据交易所,以便无需在每次调用发送 Message 时都提供该交易所名称。它通过在基础配置类中定义的一个抽象回调方法来实现这一点。以下列表展示了该方法:
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
第二,声明股票请求队列。
在此情况下,无需任何显式绑定,因为它已绑定到默认的无名交换机,并以自身名称作为路由键。
如前所述,AMQP 规范定义了这种行为。
以下列表显示了 stockRequestQueue bean 的定义:
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在您已经查看了服务器 AMQP 资源的配置,导航到 org.springframework.amqp.rabbit.stocks 包,该包位于 src/test/java 目录下。在那里,您可以看到实际的 Server 类,它提供了一个 main() 方法。它根据 server-bootstrap.xml 配置文件创建一个 ApplicationContext。在那里,您可以看到用于发布虚拟市场数据的计划任务。该配置依赖于 Spring 的 task 命名空间支持。引导配置文件还导入了几个其他文件。最有趣的是 server-messaging.xml,它直接位于 src/main/resources 的下方。在那里,您可以看到负责处理股票交易请求的 messageListenerContainer 个 bean。最后,查看在 serverHandler 中定义的 bean,它位于 server-handlers.xml(也位于 'src/main/resources' 目录中)。该bean是ServerHandler类的实例,是一个良好的消息驱动POJO示例,同时也能够发送回复消息。请注意,它本身并不依赖于该框架或任何AMQP概念。它接受一个 TradeRequest 并返回一个 TradeResponse。以下列表显示了 handleMessage 方法的定义:
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在,我们已经看到了服务器最重要的配置和代码,接下来可以转向客户端。最好的起点可能是 RabbitClientConfiguration,位于 org.springframework.amqp.rabbit.stocks.config.client 包中。注意,它声明了两个队列,但未提供显式名称。以下列表展示了这两个队列的 Bean 定义:
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
这些都是私有队列,且唯一名称会自动生成。第一个生成的队列将由客户端用于绑定服务器所暴露的市场数据交换机。请回忆一下,在 AMQP 中,消费者与队列交互,而生产者则与交换机交互。“队列与交换机的绑定”决定了代理(broker)应如何将消息从特定交换机路由到某个队列。由于市场数据交换机是一个主题交换机(topic exchange),其绑定可使用路由模式来表达。RabbitClientConfiguration 通过 Binding 对象实现这一点,而该对象则是利用 BindingBuilder 流式 API(fluent API)生成的。以下列表展示了 Binding:
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
请注意,实际值已外部化到属性文件中(client.properties 位于 src/main/resources 下),并且我们使用 Spring 的 @Value 注解来注入该值。这通常是一个好主意。否则,该值将被硬编码在类中,且在不重新编译的情况下无法修改。在此情况下,轻松运行多个客户端版本,并对用于绑定的路由模式进行更改会更加便捷。我们现在可以尝试一下。
首先运行 org.springframework.amqp.rabbit.stocks.Server,然后运行 org.springframework.amqp.rabbit.stocks.Client。您将看到针对 NASDAQ 只股票的示例报价,因为客户端属性文件(client.properties)中与 'stocks.quote.pattern' 键关联的当前值为 'app.stock.quotes.nasdaq.'。现在,在保持现有 Server 和 Client 实例运行的同时,将该属性值更改为 'app.stock.quotes.nyse.',并启动第二个 Client 实例。您会发现第一个客户端仍接收纳斯达克(NASDAQ)的报价,而第二个客户端则接收纽约证券交易所(NYSE)的报价。您也可以将模式更改为获取所有股票,甚至某个特定股票代码。
我们探讨的最后一个功能是从客户端视角出发的请求-响应交互。回想一下,我们已经看到过接受 TradeRequest 个对象并返回 TradeResponse 个对象的 ServerHandler。相应地,在 Client 端的代码位于 org.springframework.amqp.rabbit.stocks.gateway 包中的 RabbitStockServiceGateway。它通过调用 RabbitTemplate 来发送消息。以下列表展示了 send 方法:
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
请注意,在发送消息之前,它设置了 replyTo 地址。
它提供了由 traderJoeQueue Bean 定义生成的队列(如前面所示)。
以下列表显示了 @Bean 对 StockServiceGateway 类本身的定义:
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您不再运行服务器和客户端,请现在启动它们。尝试以 '100 TCKR' 格式发送请求。在短暂的人工延迟(模拟“处理”请求)之后,您应在客户端看到一条确认消息出现。
从非 Spring 应用接收 JSON
Spring 应用程序在发送 JSON 时,会将 TypeId 头部设置为完全限定类名,以协助接收应用程序将 JSON 转换回 Java 对象。
示例 spring-rabbit-json 探索了多种将 JSON 从非 Spring 应用中转换的技术。