| This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.3.4! | 
| This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.3.4! | 
This section describes how polling works in Spring Integration.
Polling Consumer
When Message Endpoints (Channel Adapters) are connected to channels and instantiated, they produce one of the following instances:
The actual implementation depends on the type of channel to which these endpoints connect.
A channel adapter connected to a channel that implements the org.springframework.messaging.SubscribableChannel interface produces an instance of EventDrivenConsumer.
On the other hand, a channel adapter connected to a channel that implements the  org.springframework.messaging.PollableChannel interface (such as a QueueChannel) produces an instance of PollingConsumer.
Polling consumers let Spring Integration components actively poll for Messages rather than process messages in an event-driven manner.
They represent a critical cross-cutting concern in many messaging scenarios. In Spring Integration, polling consumers are based on the pattern with the same name, which is described in the book Enterprise Integration Patterns, by Gregor Hohpe and Bobby Woolf. You can find a description of the pattern on the book’s website.
For more information polling consumer configuration, see Message Endpoints.
Pollable Message Source
Spring Integration offers a second variation of the polling consumer pattern.
When inbound channel adapters are used, these adapters are often wrapped by a SourcePollingChannelAdapter.
For example, when retrieving messages from a remote FTP Server location, the adapter described in FTP Inbound Channel Adapter is configured with a poller to periodically retrieve messages.
So, when components are configured with pollers, the resulting instances are of one of the following types:
This means that pollers are used in both inbound and outbound messaging scenarios. Here are some use cases in which pollers are used:
- 
Polling certain external systems, such as FTP Servers, Databases, and Web Services 
- 
Polling internal (pollable) message channels 
- 
Polling internal services (such as repeatedly executing methods on a Java class) 
| AOP advice classes can be applied to pollers, in an advice-chain, such as a transaction advice to start a transaction.
Starting with version 4.1, aPollSkipAdviceis provided.
Pollers use triggers to determine the time of the next poll.
ThePollSkipAdvicecan be used to suppress (skip) a poll, perhaps because there is some downstream condition that would prevent the message being processed.
To use this advice, you have to provide it with an implementation of aPollSkipStrategy.
Starting with version 4.2.5, aSimplePollSkipStrategyis provided.
To use it, you can add an instance as a bean to the application context, inject it into aPollSkipAdvice, and add that to the poller’s advice chain.
To skip polling, callskipPolls().
To resume polling, callreset().
Version 4.2 added more flexibility in this area.
See Conditional Pollers. | 
This chapter is meant to only give a high-level overview of polling consumers and how they fit into the concept of message channels (see Message Channels) and channel adapters (see Channel Adapter). For more information regarding messaging endpoints in general and polling consumers in particular, see Message Endpoints.
| AOP advice classes can be applied to pollers, in an advice-chain, such as a transaction advice to start a transaction.
Starting with version 4.1, aPollSkipAdviceis provided.
Pollers use triggers to determine the time of the next poll.
ThePollSkipAdvicecan be used to suppress (skip) a poll, perhaps because there is some downstream condition that would prevent the message being processed.
To use this advice, you have to provide it with an implementation of aPollSkipStrategy.
Starting with version 4.2.5, aSimplePollSkipStrategyis provided.
To use it, you can add an instance as a bean to the application context, inject it into aPollSkipAdvice, and add that to the poller’s advice chain.
To skip polling, callskipPolls().
To resume polling, callreset().
Version 4.2 added more flexibility in this area.
See Conditional Pollers. | 
Deferred Acknowledgment Pollable Message Source
Starting with version 5.0.1, certain modules provide MessageSource implementations that support deferring acknowledgment until the downstream flow completes (or hands off the message to another thread).
This is currently limited to the AmqpMessageSource and the KafkaMessageSource.
With these message sources, the IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header (see MessageHeaderAccessor API) is added to the message.
When used with pollable message sources, the value of the header is an instance of AcknowledgmentCallback, as the following example shows:
@FunctionalInterface
public interface AcknowledgmentCallback {
    void acknowledge(Status status);
    boolean isAcknowledged();
    void noAutoAck();
    default boolean isAutoAck();
    enum Status {
        /**
         * Mark the message as accepted.
         */
        ACCEPT,
        /**
         * Mark the message as rejected.
         */
        REJECT,
        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE
    }
}Not all message sources (for example, a KafkaMessageSource) support the REJECT status.
It is treated the same as ACCEPT.
Applications can acknowledge a message at any time, as the following example shows:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);If the MessageSource is wired into a SourcePollingChannelAdapter, when the poller thread returns to the adapter after the downstream flow completes, the adapter checks whether the acknowledgment has already been acknowledged and, if not, sets its status to ACCEPT it (or REJECT if the flow throws an exception).
The status values are defined in the AcknowledgmentCallback.Status enumeration.
Spring Integration provides MessageSourcePollingTemplate to perform ad-hoc polling of a MessageSource.
This, too, takes care of setting ACCEPT or REJECT on the AcknowledgmentCallback when the MessageHandler callback returns (or throws an exception).
The following example shows how to poll with the MessageSourcePollingTemplate:
MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});In both cases (SourcePollingChannelAdapter and MessageSourcePollingTemplate), you can disable auto ack/nack by calling noAutoAck() on the callback.
You might do this if you hand off the message to another thread and wish to acknowledge later.
Not all implementations support this (for example, Apache Kafka does not, because the offset commit has to be performed on the same thread).
Conditional Pollers for Message Sources
This section covers how to use conditional pollers.
Background
Advice objects, in an advice-chain on a poller, advise the whole polling task (both message retrieval and processing).
These “around advice” methods do not have access to any context for the poll — only the poll itself.
This is fine for requirements such as making a task transactional or skipping a poll due to some external condition, as discussed earlier.
What if we wish to take some action depending on the result of the receive part of the poll or if we want to adjust the poller depending on conditions? For those instances, Spring Integration offers “Smart” Polling.
“Smart” Polling
Version 5.3 introduced the ReceiveMessageAdvice interface.
Any Advice objects in the advice-chain that implement this interface are applied only to the receive() operation - MessageSource.receive() and PollableChannel.receive(timeout).
Therefore, they can be applied only for the SourcePollingChannelAdapter or PollingConsumer.
Such classes implement the following methods:
- 
beforeReceive(Object source)This method is called before theObject.receive()method. It lets you examine and reconfigure the source. Returningfalsecancels this poll (similar to thePollSkipAdvicementioned earlier).
- 
Message<?> afterReceive(Message<?> result, Object source)This method is called after thereceive()method. Again, you can reconfigure the source or take any action (perhaps depending on the result, which can benullif there was no message created by the source). You can even return a different message
| Thread safety If an  | 
| Advice Chain Ordering You should understand how the advice chain is processed during initialization.
 | 
SimpleActiveIdleReceiveMessageAdvice
This advice is a simple implementation of ReceiveMessageAdvice.
When used in conjunction with a DynamicPeriodicTrigger, it adjusts the polling frequency, depending on whether the previous poll resulted in a message or not.
The poller must also have a reference to the same DynamicPeriodicTrigger.
| Important: Async Handoff SimpleActiveIdleReceiveMessageAdvicemodifies the trigger based on thereceive()result.
This works only if the advice is called on the poller thread.
It does not work if the poller has atask-executor.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using anExecutorChannel. | 
CompoundTriggerAdvice
This advice allows the selection of one of two triggers based on whether a poll returns a message or not.
Consider a poller that uses a CronTrigger.
CronTrigger instances are immutable, so they cannot be altered once constructed.
Consider a use case where we want to use a cron expression to trigger a poll once each hour but, if no message is received, poll once per minute and, when a message is retrieved, revert to using the cron expression.
The advice (and poller) use a CompoundTrigger for this purpose.
The trigger’s primary trigger can be a CronTrigger.
When the advice detects that no message is received, it adds the secondary trigger to the CompoundTrigger.
When the CompoundTrigger instance’s nextExecutionTime method is invoked, it delegates to the secondary trigger, if present.
Otherwise, it delegates to the primary trigger.
The poller must also have a reference to the same CompoundTrigger.
The following example shows the configuration for the hourly cron expression with a fallback to every minute:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>| Important: Async Handoff CompoundTriggerAdvicemodifies the trigger based on thereceive()result.
This works only if the advice is called on the poller thread.
It does not work if the poller has atask-executor.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using anExecutorChannel. | 
MessageSource-only Advices
Some advices might be applied only for the MessageSource.receive() and they don’t make sense for PollableChannel.
For this purpose a MessageSourceMutator interface (an extension of the ReceiveMessageAdvice) is still present.
See Inbound Channel Adapters: Polling Multiple Servers and Directories for more information.
| Thread safety If an  | 
| Advice Chain Ordering You should understand how the advice chain is processed during initialization.
 | 
| Important: Async Handoff SimpleActiveIdleReceiveMessageAdvicemodifies the trigger based on thereceive()result.
This works only if the advice is called on the poller thread.
It does not work if the poller has atask-executor.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using anExecutorChannel. | 
| Important: Async Handoff CompoundTriggerAdvicemodifies the trigger based on thereceive()result.
This works only if the advice is called on the poller thread.
It does not work if the poller has atask-executor.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using anExecutorChannel. |