该版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6spring-doc.cadn.net.cn

示例应用

Spring AMQP 示例项目包括两个示例应用程序。 第一个是一个简单的“Hello World”示例,演示了同步和异步消息接收。 它为了解基本组件提供了一个很好的起点。 第二个示例基于股票交易用例,以演示实际应用程序中常见的交互类型。 在本章中,我们提供了每个示例的快速演练,以便您可以专注于最重要的组件。 这些示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何 Maven 感知 IDE(例如 SpringSource Tool Suite)中。spring-doc.cadn.net.cn

“Hello World”示例

“Hello World”示例演示了同步和异步消息接收。 您可以导入spring-rabbit-helloworldsample 到 IDE 中,然后按照下面的讨论进行作。spring-doc.cadn.net.cn

同步示例

src/main/java目录中,导航到org.springframework.amqp.helloworld包。 打开HelloWorldConfiguration类,并注意它包含@Configuration类级别的注释,并注意到一些@Bean方法级别的注释。 这是 Spring 基于 Java 的配置示例。 你可以在这里阅读更多相关信息。spring-doc.cadn.net.cn

以下列表显示了如何创建连接工厂:spring-doc.cadn.net.cn

@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory =
        new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

该配置还包含RabbitAdmin,默认情况下,它会查找任何类型为 exchange、queue 或 binding 的 bean,然后在代理上声明它们。 事实上,helloWorldQueueHelloWorldConfiguration是一个示例,因为它是Queue.spring-doc.cadn.net.cn

以下列表显示了helloWorldQueue豆子定义:spring-doc.cadn.net.cn

@Bean
public Queue helloWorldQueue() {
    return new Queue(this.helloWorldQueueName);
}

回顾rabbitTemplatebean 配置,可以看到它的名称为helloWorldQueue设置为其queue属性(用于接收消息)和其routingKey属性(用于发送消息)。spring-doc.cadn.net.cn

现在我们已经探索了配置,我们可以看看实际使用这些组件的代码。 首先,打开Producer类。 它包含一个main()方法,其中 SpringApplicationContext被创建。spring-doc.cadn.net.cn

以下列表显示了main方法:spring-doc.cadn.net.cn

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("Hello World");
    System.out.println("Sent: Hello World");
}

在前面的示例中,AmqpTemplatebean 被检索并用于发送Message. 由于客户端代码应尽可能依赖接口,因此类型为AmqpTemplate而不是RabbitTemplate. 即使 bean 在HelloWorldConfigurationRabbitTemplate,依赖接口意味着这段代码更可移植(你可以独立于代码来更改配置)。 由于convertAndSend()方法被调用时,模板委托给其MessageConverter实例。 在这种情况下,它使用默认的SimpleMessageConverter,但可以向rabbitTemplatebean,如HelloWorldConfiguration.spring-doc.cadn.net.cn

现在打开Consumer类。 它实际上共享相同的配置基类,这意味着它共享rabbitTemplate豆。 这就是为什么我们在该模板中配置了routingKey(用于发送)和queue(用于接收)。 正如我们在AmqpTemplate,您可以改为将 'routingKey' 参数传递给 send 方法,将 'queue' 参数传递给 receive 方法。 这Consumer代码基本上是 Producer 的镜像,调用receiveAndConvert()而不是convertAndSend().spring-doc.cadn.net.cn

以下列表显示了Consumer:spring-doc.cadn.net.cn

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}

如果您运行Producer然后运行Consumer,您应该看到Received: Hello World在控制台输出中。spring-doc.cadn.net.cn

异步示例

同步示例演练了同步 Hello World 示例。 本节介绍一个稍微高级但功能更强大的选项。 通过一些修改,Hello World 示例可以提供异步接收的示例,也称为消息驱动的 POJO。 事实上,有一个子包提供了这一点:org.springframework.amqp.samples.helloworld.async.spring-doc.cadn.net.cn

同样,我们从发送方开始。打开ProducerConfiguration类,并注意它创建了一个connectionFactoryrabbitTemplate豆。 这一次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,并且RabbitTemplate仅设置了 'routingKey' 属性。回想一下,消息被发送到交换,而不是直接发送到队列。AMQP 默认交换是没有名称的直接交换。所有队列都绑定到该默认交换,其名称作为路由密钥。这就是为什么我们只需要在此处提供路由密钥。spring-doc.cadn.net.cn

以下列表显示了rabbitTemplate定义:spring-doc.cadn.net.cn

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.helloWorldQueueName);
    return template;
}

由于此示例演示了异步消息接收,因此生产端被设计为连续发送消息(如果它是像同步版本一样的按执行消息模型,那么它实际上是一个消息驱动的消费者就不会那么明显了)。负责连续发送消息的组件被定义为ProducerConfiguration. 它配置为每三秒运行一次。spring-doc.cadn.net.cn

以下列表显示了组件:spring-doc.cadn.net.cn

static class ScheduledProducer {

    @Autowired
    private volatile RabbitTemplate rabbitTemplate;

    private final AtomicInteger counter = new AtomicInteger();

    @Scheduled(fixedRate = 3000)
    public void sendMessage() {
        rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
    }
}

您不需要了解所有细节,因为真正的重点应该放在接收方(我们将在接下来介绍)。 但是,如果您还不熟悉 Spring 任务调度支持,您可以在此处了解更多信息。 简而言之,该postProcessorbean 中的ProducerConfiguration向调度程序注册任务。spring-doc.cadn.net.cn

现在我们可以转向接收方。 为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。 该类称为HelloWorldHandler,如以下清单所示:spring-doc.cadn.net.cn

public class HelloWorldHandler {

    public void handleMessage(String text) {
        System.out.println("Received: " + text);
    }

}

该类是 POJO。 它不扩展任何基类,不实现任何接口,甚至不包含任何导入。 它正在“适应”以适应MessageListenerSpring AMQP 的接口MessageListenerAdapter. 然后,您可以在SimpleMessageListenerContainer. 对于此示例,容器是在ConsumerConfiguration类。 您可以在那里看到包裹在适配器中的 POJO。spring-doc.cadn.net.cn

以下列表显示了如何listenerContainer定义为:spring-doc.cadn.net.cn

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueName(this.helloWorldQueueName);
    container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
    return container;
}

SimpleMessageListenerContainer是一个 Spring 生命周期组件,默认情况下会自动启动。 如果您查看Consumer类,你可以看到它的main()方法只包含一个单行引导程序,用于创建ApplicationContext. 制作人的main()方法也是一个单行引导程序,因为其方法被@Scheduled也会自动启动。 您可以启动ProducerConsumer以任何顺序,您应该会看到每三秒发送一次和接收消息。spring-doc.cadn.net.cn

股票交易

股票交易示例演示了比 Hello World 示例更高级的消息传递方案。 但是,配置非常相似,只是更复杂一些。 由于我们详细介绍了 Hello World 配置,因此在这里,我们将重点介绍此示例的不同之处。 有一个服务器将市场数据(股票报价)推送到主题交易所。 然后,客户端可以通过将队列与路由模式(例如,app.stock.quotes.nasdaq.*). 该演示的另一个主要功能是由客户端发起并由服务器处理的请求-回复“股票交易”交互。 这涉及一个私人replyTo客户端在订单请求消息本身中发送的队列。spring-doc.cadn.net.cn

服务器的核心配置位于RabbitServerConfiguration类中的org.springframework.amqp.rabbit.stocks.config.server包。 它扩展了AbstractStockAppRabbitConfiguration. 这是定义服务器和客户端通用资源的地方,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。 在该通用配置文件中,您还会看到JacksonJsonMessageConverterRabbitTemplate.spring-doc.cadn.net.cn

特定于服务器的配置由两件事组成。 首先,它在RabbitTemplate这样它就不需要在每次调用时提供该交换名称来发送Message. 它在基本配置类中定义的抽象回调方法中执行此作。 以下列表显示了该方法:spring-doc.cadn.net.cn

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}

其次,声明股票请求队列。 在这种情况下,它不需要任何显式绑定,因为它绑定到默认的无名交换,并以自己的名称作为路由键。 如前所述,AMQP 规范定义了该行为。 以下列表显示了stockRequestQueue豆:spring-doc.cadn.net.cn

@Bean
public Queue stockRequestQueue() {
    return new Queue(STOCK_REQUEST_QUEUE_NAME);
}

现在您已经看到了服务器 AMQP 资源的配置,请导航到org.springframework.amqp.rabbit.stockspackage 下的src/test/java目录。 在那里,你可以看到实际的Server提供main()方法。 它创建了一个ApplicationContext基于server-bootstrap.xml配置文件。在那里,您可以看到发布虚拟市场数据的计划任务。该配置依赖于 Spring 的task命名空间支持。引导配置文件还导入了其他一些文件。最有趣的是server-messaging.xml,位于src/main/resources. 在那里,你可以看到messageListenerContainer负责处理股票交易请求的 bean。最后,看看serverHandlerserver-handlers.xml(也在 'src/main/resources' 中)。该 bean 是ServerHandler类,并且是消息驱动的 POJO 的一个很好的例子,它也可以发送回复消息。 请注意,它本身未与框架或任何 AMQP 概念耦合。 它接受一个TradeRequest并返回一个TradeResponse. 以下列表显示了handleMessage方法:spring-doc.cadn.net.cn

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}

现在我们已经看到了服务器最重要的配置和代码,我们可以转向客户端。 最好的起点大概是RabbitClientConfigurationorg.springframework.amqp.rabbit.stocks.config.client包。 请注意,它声明了两个队列,但没有提供显式名称。 以下列表显示了两个队列的 Bean 定义:spring-doc.cadn.net.cn

@Bean
public Queue marketDataQueue() {
    return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
    return amqpAdmin().declareQueue();
}

这些是专用队列,并且会自动生成唯一名称。 客户端使用第一个生成的队列绑定到服务器已公开的市场数据交换。 回想一下,在 AMQP 中,使用者与队列交互,而生产者与交换交互。 队列与交换的“绑定”是告诉代理将消息从给定交换传递(或路由)到队列的内容。 由于市场数据交换是主题交换,因此绑定可以用路由模式来表达。 这RabbitClientConfiguration使用Binding对象,并且该对象是使用BindingBuilder流畅的 API。 以下列表显示了Binding:spring-doc.cadn.net.cn

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
    return BindingBuilder.bind(
        marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

请注意,实际值已在属性文件 (client.propertiessrc/main/resources),并且我们使用 Spring 的@Value注释来注入该值。 这通常是一个好主意。 否则,该值将被硬编码在类中,并且无需重新编译即可无法修改。 在这种情况下,在更改用于绑定的路由模式时运行多个版本的客户端要容易得多。 我们现在可以尝试一下。spring-doc.cadn.net.cn

首先运行org.springframework.amqp.rabbit.stocks.Server然后org.springframework.amqp.rabbit.stocks.Client. 您应该看到虚拟引文NASDAQstocks,因为与 client.properties 中的“stocks.quote.pattern”键关联的当前值是“app.stock.quotes.nasdaq。'. 现在,在保留现有ServerClient运行时,将该属性值更改为“app.stock.quotes.nyse.' 并开始第二个Client实例。 您应该看到第一个客户仍然收到纳斯达克报价,而第二个客户收到纽约证券交易所报价。 相反,您可以更改模式以获取所有股票甚至单个股票代码。spring-doc.cadn.net.cn

我们探索的最后一个功能是从客户角度出发的请求-回复交互。 回想一下,我们已经看到了ServerHandler接受TradeRequest对象和返回TradeResponse对象。 上相应的代码Clientside 是RabbitStockServiceGatewayorg.springframework.amqp.rabbit.stocks.gateway包。 它委托给RabbitTemplate为了发送消息。 以下列表显示了send方法:spring-doc.cadn.net.cn

public void send(TradeRequest tradeRequest) {
    getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
            try {
                message.getMessageProperties().setCorrelationId(
                    UUID.randomUUID().toString().getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                throw new AmqpException(e);
            }
            return message;
        }
    });
}

请注意,在发送消息之前,它会将replyTo地址。 它提供了由traderJoeQueuebean 定义(如前面所示)。 以下列表显示了@Bean定义StockServiceGateway类本身:spring-doc.cadn.net.cn

@Bean
public StockServiceGateway stockServiceGateway() {
    RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
    gateway.setRabbitTemplate(rabbitTemplate());
    gateway.setDefaultReplyToQueue(traderJoeQueue());
    return gateway;
}

如果您不再运行服务器和客户端,请立即启动它们。 尝试发送格式为“100 TCKR”的请求。 在模拟请求“处理”的短暂人为延迟之后,您应该会在客户端上看到一条确认消息。spring-doc.cadn.net.cn

从非 Spring 应用程序接收 JSON

Spring 应用程序在发送 JSON 时,将TypeId标头添加到完全限定的类名中,以帮助接收应用程序将 JSON 转换回 Java 对象。spring-doc.cadn.net.cn

spring-rabbit-json示例探索了几种从非 Spring 应用程序转换 JSON 的技术。spring-doc.cadn.net.cn