5. 消息传递

Spring Cloud AWS 提供 Amazon SQSAmazon SNS 集成 简化了通过 SQS 或 SNS 发布和使用消息。而 SQS 完全依赖消息传递 API 在 Spring 4.0 中引入,SNS 仅部分实现它,因为接收部分必须以不同的方式处理 推送通知。spring-doc.cadn.net.cn

5.1. 配置消息传递

在使用和配置消息传递支持之前,应用程序必须包含相应的模块依赖项 进入 Maven 配置。Spring Cloud AWS Messaging 支持作为一个单独的模块提供,以允许模块化使用 模块的。spring-doc.cadn.net.cn

5.1.1. Maven 依赖配置

Spring Cloud AWS 消息传递模块作为独立模块提供,可以使用以下依赖声明导入:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-aws-messaging</artifactId>
    <version>{spring-cloud-version}</version>
</dependency>

5.2. SQS 支持

Amazon SQS 是 Amazon Web Service 平台上的托管消息服务,提供点对点通信 与队列。与 JMS 或其他消息服务相比,Amazon SQS 具有几个功能和限制,这些功能和限制应该 考虑到。spring-doc.cadn.net.cn

  • Amazon SQS 仅允许String有效负载,因此任何Object必须转换为字符串表示形式。 Spring Cloud AWS 专门支持通过将 Java 对象转换为 JSON 来传输带有 Amazon SQS 消息的 Java 对象。spring-doc.cadn.net.cn

  • Amazon SQS 不支持事务,因此可能会检索消息两次。申请必须写在 幂等方式,以便他们可以两次接收消息。spring-doc.cadn.net.cn

  • Amazon SQS 每条消息的最大消息大小为 256kb,因此无法发送较大的消息。spring-doc.cadn.net.cn

5.2.1. 发送消息

QueueMessagingTemplate包含许多发送消息的便捷方法。有一些发送方法指定了 destination 使用QueueMessageChannel对象和那些使用字符串指定目的地的对象,该字符串将 根据 SQS API 进行解析。不采用 destination 参数的 send 方法使用默认目标。spring-doc.cadn.net.cn

import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;

public class SqsQueueSender {

    private final QueueMessagingTemplate queueMessagingTemplate;

    @Autowired
    public SqsQueueSender(AmazonSQSAsync amazonSQSAsync) {
        this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
    }

    public void send(String message) {
        this.queueMessagingTemplate.send("physicalQueueName", MessageBuilder.withPayload(message).build());
    }
}

此示例使用MessageBuilderclass 创建带有字符串有效负载的消息。这QueueMessagingTemplate是 通过传递对AmazonSQSAsync客户。send 方法中的目标是一个字符串值,该字符串值 必须与 AWS 上定义的队列名称匹配。此值将由 Amazon SQS 客户端在运行时解析。选择 一个ResourceIdResolver实现可以传递给QueueMessagingTemplate构造函数通过以下方式解析资源 在 CloudFormation 堆栈中运行时的逻辑名称(有关以下内容的更多信息,请参阅管理云环境 资源名称解析)。spring-doc.cadn.net.cn

使用消息传递命名空间QueueMessagingTemplate可以在 XML 配置文件中定义。spring-doc.cadn.net.cn

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:aws-context="http://www.springframework.org/schema/cloud/aws/context"
    xmlns:aws-messaging="http://www.springframework.org/schema/cloud/aws/messaging"
    xmlns="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/cloud/aws/context
        http://www.springframework.org/schema/cloud/aws/context/spring-cloud-aws-context.xsd
        http://www.springframework.org/schema/cloud/aws/messaging
        http://www.springframework.org/schema/cloud/aws/messaging/spring-cloud-aws-messaging">

    <aws-context:context-credentials>
        <aws-context:instance-profile-credentials />
    </aws-context:context-credentials>

    <aws-messaging:queue-messaging-template id="queueMessagingTemplate" />

</beans>

在此示例中,消息传递命名空间处理程序构造一个新的QueueMessagingTemplate.这AmazonSQSAsync客户 根据提供的凭据自动创建并传递给模板的构造函数。如果 应用程序在配置的 CloudFormation 堆栈中运行ResourceIdResolver传递给构造函数(有关资源名称解析的更多信息,请参阅管理云环境)。spring-doc.cadn.net.cn

使用消息转换器

为了方便发送领域模型对象,QueueMessagingTemplate有各种发送方法 将 Java 对象作为消息数据内容的参数。重载的方法convertAndSend()receiveAndConvert()QueueMessagingTemplate将转换过程委托给MessageConverter接口。此接口定义了一个简单的协定,用于在 Java 对象和 SQS 消息之间进行转换。默认值 实现SimpleMessageConverter只需解包消息有效负载,只要它与目标类型匹配即可。由 使用转换器,您和您的应用程序代码可以专注于通过 SQS 的详细信息,而不是关心它如何表示为 SQS 消息的细节。spring-doc.cadn.net.cn

由于 SQS 只能发送Stringpayloads 默认转换器SimpleMessageConverter应该只使用 发送String负载。对于更复杂的对象,应使用自定义转换器,例如 消息传递命名空间处理程序。spring-doc.cadn.net.cn

建议使用 XML 消息传递命名空间创建QueueMessagingTemplate因为它将设置更多 复杂MessageConverter当 Jackson 在类路径上时,将对象转换为 JSON。spring-doc.cadn.net.cn

<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
this.queueMessagingTemplate.convertAndSend("queueName", new Person("John, "Doe"));

在此示例中,一个QueueMessagingTemplate是使用 messaging 命名空间创建的。这convertAndSend方法 转换有效负载Person使用配置的MessageConverter并发送消息。spring-doc.cadn.net.cn

5.2.2. 接收消息

接收 SQS 消息有两种方式,要么使用receive方法QueueMessagingTemplate或与 注释驱动的侦听器端点。后者是迄今为止更方便的接收消息的方式。spring-doc.cadn.net.cn

Person person = this.queueMessagingTemplate.receiveAndConvert("queueName", Person.class);

在此示例中,QueueMessagingTemplate将从 SQS 队列中获取一条消息并将其转换为目标类 作为参数传递。spring-doc.cadn.net.cn

5.2.3. 注释驱动的侦听器端点

注释驱动的侦听器端点是侦听 SQS 消息的最简单方法。只需使用MessageMappingQueueMessageHandler将消息路由到带注释的方法。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener />
@SqsListener("queueName")
public void queueListener(Person person) {
    // ...
}

在此示例中,将启动一个队列侦听器容器,用于轮询 SQSqueueName传递给MessageMapping注解。传入消息将转换为目标类型,然后转换为带注释的方法queueListener被调用。spring-doc.cadn.net.cn

除了有效负载之外,还可以使用@Header@Headers附注。@Header用于注入特定的标头值,而@Headers注入一个Map<String, String>包含所有标头。spring-doc.cadn.net.cn

只有标准 支持随SQS消息发送的消息属性。目前不支持自定义属性。spring-doc.cadn.net.cn

除了提供的参数解析器外,还可以在aws-messaging:annotation-driven-queue-listener元素使用aws-messaging:argument-resolvers属性(见下面的示例)。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener>
    <aws-messaging:argument-resolvers>
        <bean class="org.custom.CustomArgumentResolver" />
    </aws-messaging:argument-resolvers>
</aws-messaging:annotation-driven-queue-listener>

默认情况下,SimpleMessageListenerContainer创建一个ThreadPoolTaskExecutor使用核心和max 池大小的计算值。核心池大小设置为队列数的两倍,最大池大小是通过将队列数乘以maxNumberOfMessages田。 如果这些默认值不能满足应用程序的需求,则可以使用task-executor属性(见下面的示例)。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener task-executor="simpleTaskExecutor" />
消息回复

消息侦听器方法可以使用@SendTo将其返回值发送到另一个通道。 这SendToHandlerMethodReturnValueHandler使用aws-messaging:annotation-driven-queue-listener元素发送返回值。消息传递模板必须实现 这DestinationResolvingMessageSendingOperations接口。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener send-to-message-template="queueMessagingTemplate"/>
@SqsListener("treeQueue")
@SendTo("leafsQueue")
public List<Leaf> extractLeafs(Tree tree) {
    // ...
}

在此示例中,extractLeafs方法将接收来自treeQueue然后返回一个ListLeafs 将被发送到leafsQueue.请注意,在aws-messaging:annotation-driven-queue-listenerXML 元素有一个属性send-to-message-template指定QueueMessagingTemplate作为用于发送消息返回值的消息传递模板 listener 方法。spring-doc.cadn.net.cn

处理异常

内部引发的异常@SqsListener带注释的方法可以通过用@MessageExceptionHandler.spring-doc.cadn.net.cn

import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.stereotype.Component;

@Component
public class MyMessageHandler {

    @SqsListener("queueName")
    void handle(String message) {
        ...
        throw new MyException("something went wrong");
    }

    @MessageExceptionHandler(MyException.class)
    void handleException(MyException e) {
        ...
    }
}

5.2.4. SimpleMessageListenerContainerFactory

SimpleMessageListenerContainer也可以通过创建类型SimpleMessageListenerContainerFactory.spring-doc.cadn.net.cn

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSqs);
    factory.setAutoStartup(false);
    factory.setMaxNumberOfMessages(5);
    // ...

    return factory;
}

5.2.5. 使用 Amazon SQS 使用 AWS Event 消息

还可以使用 SQS 消息侦听器接收 AWS 生成的事件消息。因为 AWS 消息不包含 mime 类型标头,必须配置 Jackson 消息转换器 使用strictContentTypeMatch属性 false 也可以解析没有正确 MIME 类型的消息。spring-doc.cadn.net.cn

下一个代码显示了使用QueueMessageHandlerFactory并重新配置MappingJackson2MessageConverterspring-doc.cadn.net.cn

@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);
    factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
    return factory;
}

通过上述配置,可以接收 S3 存储桶(以及其他存储桶)的事件通知 事件通知,如 Elastic Transcoder 消息)在@SqsListener带注释的方法 如下所示。spring-doc.cadn.net.cn

@SqsListener("testQueue")
public void receive(S3EventNotification s3EventNotificationRecord) {
    S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getRecords().get(0).getS3();
}

5.3. SNS 支持

Amazon SNS 是一种发布-订阅消息传递系统,允许客户端向特定主题发布通知。其他 感兴趣的客户端可以使用不同的协议(如 HTTP/HTTPS、电子邮件或 Amazon SQS 队列)进行订阅来接收消息。spring-doc.cadn.net.cn

下图显示了 Amazon SNS 架构的典型示例。spring-doc.cadn.net.cn

SNS 概述

Spring Cloud AWS 通过提供使用NotificationMessagingTemplate和 使用 Spring Web MVC 接收 HTTP/HTTPS 端点的通知@Controller基于编程模型。亚马逊河 基于 SQS 的订阅可以与 Spring Cloud AWS 消息传递模块提供的注释驱动的消息支持一起使用。spring-doc.cadn.net.cn

5.3.1. 发送消息

NotificationMessagingTemplate包含两种发送通知的便捷方法。第一个指定 destination 使用String这将针对 SNS API 解决。第二个不去目的地 参数,并使用默认目标。所有常用的发送方法MessageSendingOperations已实现,但发送通知不太方便,因为主题必须作为标头传递。spring-doc.cadn.net.cn

目前仅String有效负载可以使用NotificationMessagingTemplate因为这是预期的 类型。spring-doc.cadn.net.cn

import com.amazonaws.services.sns.AmazonSNS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;

public class SnsNotificationSender {

    private final NotificationMessagingTemplate notificationMessagingTemplate;

    @Autowired
    public SnsNotificationSender(AmazonSNS amazonSns) {
        this.notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
    }

    public void send(String subject, String message) {
        this.notificationMessagingTemplate.sendNotification("physicalTopicName", message, subject);
    }
}

此示例构造一个新的NotificationMessagingTemplate通过传递一个AmazonSNSclient 作为参数。在send方法方便sendNotification方法用于发送messagesubject到 SNS 主题。这 destination 在sendNotification方法是一个字符串值,必须与 AWS 上定义的主题名称匹配。此值 由 Amazon SNS 客户端在运行时解析。可选的ResourceIdResolver实现可以传递给NotificationMessagingTemplate构造函数,用于在 CloudFormation 堆栈中运行时按逻辑名称解析资源。 (有关资源名称解析的更多信息,请参阅管理云环境。spring-doc.cadn.net.cn

建议使用 XML 消息传递命名空间创建NotificationMessagingTemplate因为它会自动 配置 SNS 客户端以设置默认转换器。spring-doc.cadn.net.cn

<aws-messaging:notification-messaging-template id="notificationMessagingTemplate" />

5.3.2. 注释驱动的 HTTP 通知端点

SNS 支持多种端点类型(SQS、电子邮件、HTTP、HTTPS),Spring Cloud AWS 提供对 HTTP(S) 端点的支持。 SNS 向 HTTP 主题侦听器端点发送三种类型的请求,每种请求都提供了注释:spring-doc.cadn.net.cn

HTTP 端点基于 Spring MVC 控制器。Spring Cloud AWS 添加了一些自定义参数解析器来提取 通知请求中的消息和主题。spring-doc.cadn.net.cn

@Controller
@RequestMapping("/topicName")
public class NotificationTestController {

    @NotificationSubscriptionMapping
    public void handleSubscriptionMessage(NotificationStatus status) throws IOException {
        //We subscribe to start receive the message
        status.confirmSubscription();
    }

    @NotificationMessageMapping
    public void handleNotificationMessage(@NotificationSubject String subject, @NotificationMessage String message) {
        // ...
    }

    @NotificationUnsubscribeConfirmationMapping
    public void handleUnsubscribeMessage(NotificationStatus status) {
        //e.g. the client has been unsubscribed and we want to "re-subscribe"
        status.confirmSubscription();
    }
}

目前无法在方法级别定义映射 URL,因此RequestMapping必须 在类型级别完成,并且必须包含终结点的完整路径。spring-doc.cadn.net.cn

此示例创建一个新的 Spring MVC 控制器,其中包含三种方法来处理上面列出的三个请求。挨次 以解决handleNotificationMessage方法:必须注册自定义参数解析器。这 下面列出了 XML 配置。spring-doc.cadn.net.cn

<mvc:annotation-driven>
    <mvc:argument-resolvers>
        <ref bean="notificationResolver" />
    </mvc:argument-resolvers>
</mvc:annotation-driven>

<aws-messaging:notification-argument-resolver id="notificationResolver" />

aws-messaging:notification-argument-resolver元素注册三个参数解析器:NotificationStatusHandlerMethodArgumentResolver,NotificationMessageHandlerMethodArgumentResolver, 和NotificationSubjectHandlerMethodArgumentResolver.spring-doc.cadn.net.cn

5.4. 使用 CloudFormation

Amazon SQS 队列和 SNS 主题可以在堆栈中进行配置,然后由应用程序使用。Spring Cloud AWS 还支持按逻辑名称查找堆栈配置的队列和主题,并解析为物理 名字。以下示例显示了 CloudFormation 模板中的 SNS 主题和 SQS 队列配置。spring-doc.cadn.net.cn

"LogicalQueueName": {
    "Type": "AWS::SQS::Queue",
    "Properties": {
    }
},
"LogicalTopicName": {
    "Type": "AWS::SNS::Topic",
    "Properties": {
    }
}

逻辑名称LogicalQueueNameLogicalTopicName然后可以在配置和应用程序中使用 如下图所示:spring-doc.cadn.net.cn

<aws-messaging:queue-messaging-template default-destination="LogicalQueueName" />

<aws-messaging:notification-messaging-template default-destination="LogicalTopicName" />
@SqsListener("LogicalQueueName")
public void receiveQueueMessages(Person person) {
    // Logical names can also be used with messaging templates
    this.notificationMessagingTemplate.sendNotification("anotherLogicalTopicName", "Message", "Subject");
}

当使用如上例所示的逻辑名称时,可以在不同的环境中创建堆栈,而无需任何 应用程序内部的配置或代码更改。spring-doc.cadn.net.cn