Adding Behavior to Endpoints

Prior to Spring Integration 2.2, you could add behavior to an entire Integration flow by adding an AOP Advice to a poller’s <advice-chain/> element. However, suppose you want to retry, say, just a REST Web Service call, and not any downstream endpoints.

For example, consider the following flow:

inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter

If you configure some retry-logic into an advice chain on the poller and the call to http-gateway2 failed because of a network glitch, the retry causes both http-gateway1 and http-gateway2 to be called a second time. Similarly, after a transient failure in the jdbc-outbound-adapter, both HTTP gateways are called a second time before again calling the jdbc-outbound-adapter.

Spring Integration 2.2 adds the ability to add behavior to individual endpoints. This is achieved by the addition of the <request-handler-advice-chain/> element to many endpoints. The following example shows how to the <request-handler-advice-chain/> element within an outbound-gateway:

<int-http:outbound-gateway id="withAdvice"
    url-expression="'http://localhost/test1'"
    request-channel="requests"
    reply-channel="nextChannel">
    <int-http:request-handler-advice-chain>
        <ref bean="myRetryAdvice" />
    </int-http:request-handler-advice-chain>
</int-http:outbound-gateway>

In this case, myRetryAdvice is applied only locally to this gateway and does not apply to further actions taken downstream after the reply is sent to nextChannel. The scope of the advice is limited to the endpoint itself.

At this time, you cannot advise an entire <chain/> of endpoints. The schema does not allow a <request-handler-advice-chain> as a child element of the chain itself.

However, a <request-handler-advice-chain> can be added to individual reply-producing endpoints within a <chain> element. An exception is that, in a chain that produces no reply, because the last element in the chain is an outbound-channel-adapter, that last element cannot be advised. If you need to advise such an element, it must be moved outside the chain (with the output-channel of the chain being the input-channel of the adapter). The adapter can then be advised as usual. For chains that produce a reply, every child element can be advised.

Provided Advice Classes

In addition to providing the general mechanism to apply AOP advice classes, Spring Integration provides these out-of-the-box advice implementations:

Retry Advice

The retry advice (o.s.i.handler.advice.RequestHandlerRetryAdvice) leverages the rich retry mechanisms provided by the Spring Retry project. The core component of spring-retry is the RetryTemplate, which allows configuration of sophisticated retry scenarios, including RetryPolicy and BackoffPolicy strategies (with a number of implementations) as well as a RecoveryCallback strategy to determine the action to take when retries are exhausted.

Stateless Retry

Stateless retry is the case where the retry activity is handled entirely within the advice. The thread pauses (if configured to do so) and retries the action.

Stateful Retry

Stateful retry is the case where the retry state is managed within the advice but where an exception is thrown and the caller resubmits the request. An example for stateful retry is when we want the message originator (for example,JMS) to be responsible for resubmitting, rather than performing it on the current thread. Stateful retry needs some mechanism to detect a retried submission.

For more information on spring-retry, see the project’s Javadoc and the reference documentation for Spring Batch, where spring-retry originated.

The default back off behavior is to not back off. Retries are attempted immediately. Using a back off policy that causes threads to pause between attempts may cause performance issues, including excessive memory use and thread starvation. In high-volume environments, back off policies should be used with caution.
Configuring the Retry Advice

The examples in this section use the following <service-activator> that always throws an exception:

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}
Simple Stateless Retry

The default RetryTemplate has a SimpleRetryPolicy which tries three times. There is no BackOffPolicy, so the three attempts are made back-to-back-to-back with no delay between attempts. There is no RecoveryCallback, so the result is to throw the exception to the caller after the final failed retry occurs. In a Spring Integration environment, this final exception might be handled by using an error-channel on the inbound endpoint. The following example uses RetryTemplate and shows its DEBUG output:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
Simple Stateless Retry with Recovery

The following example adds a RecoveryCallback to the preceding example and uses an ErrorMessageSendingRecoverer to send an ErrorMessage to a channel:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
Stateless Retry with Customized Policies, and Recovery

For more sophistication, we can provide the advice with a customized RetryTemplate. This example continues to use the SimpleRetryPolicy but increases the attempts to four. It also adds an ExponentialBackoffPolicy where the first retry waits one second, the second waits five seconds and the third waits 25 (for four attempts in all). The following listing shows the example and its DEBUG output:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
            <property name="retryTemplate" ref="retryTemplate" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="4" />
        </bean>
    </property>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="1000" />
            <property name="multiplier" value="5.0" />
            <property name="maxInterval" value="60000" />
        </bean>
    </property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
Namespace Support for Stateless Retry

Starting with version 4.0, the preceding configuration can be greatly simplified, thanks to the namespace support for the retry advice, as the following example shows:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

In the preceding example, the advice is defined as a top-level bean so that it can be used in multiple request-handler-advice-chain instances. You can also define the advice directly within the chain, as the following example shows:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

A <handler-retry-advice> can have a <fixed-back-off> or <exponential-back-off> child element or have no child element. A <handler-retry-advice> with no child element uses no back off. If there is no recovery-channel, the exception is thrown when retries are exhausted. The namespace can only be used with stateless retry.

For more complex environments (custom policies etc.), use normal <bean> definitions.

Simple Stateful Retry with Recovery

To make retry stateful, we need to provide the advice with a RetryStateGenerator implementation. This class is used to identify a message as being a resubmission so that the RetryTemplate can determine the current state of retry for this message. The framework provides a SpelExpressionRetryStateGenerator, which determines the message identifier by using a SpEL expression. This example again uses the default policies (three attempts with no back off). As with stateless retry, these policies can be customized. The following listing shows the example and its DEBUG output:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="retryStateGenerator">
                <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
                    <constructor-arg value="headers['jms_messageId']" />
                </bean>
            </property>
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

If you compare the preceding example with the stateless examples, you can see that, with stateful retry, the exception is thrown to the caller on each failure.

Exception Classification for Retry

Spring Retry has a great deal of flexibility for determining which exceptions can invoke retry. The default configuration retries for all exceptions and the exception classifier looks at the top-level exception. If you configure it to, say, retry only on MyException and your application throws a SomeOtherException where the cause is a MyException, retry does not occur.

Since Spring Retry 1.0.3, the BinaryExceptionClassifier has a property called traverseCauses (the default is false). When true, it traverses exception causes until it finds a match or runs out of causes traversing.

To use this classifier for retry, use a SimpleRetryPolicy created with the constructor that takes the max attempts, the Map of Exception objects, and the traverseCauses boolean. Then you can inject this policy into the RetryTemplate.

traverseCauses is required in this case because user exceptions may be wrapped in a MessagingException.
Circuit Breaker Advice

The general idea of the circuit breaker pattern is that, if a service is not currently available, do not waste time (and resources) trying to use it. The o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice implements this pattern. When the circuit breaker is in the closed state, the endpoint attempts to invoke the service. The circuit breaker goes to the open state if a certain number of consecutive attempts fail. When it is in the open state, new requests “fail fast” and no attempt is made to invoke the service until some time has expired.

When that time has expired, the circuit breaker is set to the half-open state. When in this state, if even a single attempt fails, the breaker immediately goes to the open state. If the attempt succeeds, the breaker goes to the closed state, in which case it does not go to the open state again until the configured number of consecutive failures again occur. Any successful attempt resets the state to zero failures for the purpose of determining when the breaker might go to the open state again.

Typically, this advice might be used for external services, where it might take some time to fail (such as a timeout attempting to make a network connection).

The RequestHandlerCircuitBreakerAdvice has two properties: threshold and halfOpenAfter. The threshold property represents the number of consecutive failures that need to occur before the breaker goes open. It defaults to 5. The halfOpenAfter property represents the time after the last failure that the breaker waits before attempting another request. The default is 1000 milliseconds.

The following example configures a circuit breaker and shows its DEBUG and ERROR output:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

In the preceding example, the threshold is set to 2 and halfOpenAfter is set to 12 seconds. A new request arrives every 5 seconds. The first two attempts invoked the service. The third and fourth failed with an exception indicating that the circuit breaker is open. The fifth request was attempted because the request was 15 seconds after the last failure. The sixth attempt fails immediately because the breaker immediately went to open.

Expression Evaluating Advice

The final supplied advice class is the o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice. This advice is more general than the other two advices. It provides a mechanism to evaluate an expression on the original inbound message sent to the endpoint. Separate expressions are available to be evaluated, after either success or failure. Optionally, a message containing the evaluation result, together with the input message, can be sent to a message channel.

A typical use case for this advice might be with an <ftp:outbound-channel-adapter/>, perhaps to move the file to one directory if the transfer was successful or to another directory if it fails:

The advice has properties to set an expression when successful, an expression for failures, and corresponding channels for each. For the successful case, the message sent to the successChannel is an AdviceMessage, with the payload being the result of the expression evaluation. An additional property, called inputMessage, contains the original message sent to the handler. A message sent to the failureChannel (when the handler throws an exception) is an ErrorMessage with a payload of MessageHandlingExpressionEvaluatingAdviceException. Like all MessagingException instances, this payload has failedMessage and cause properties, as well as an additional property called evaluationResult, which contains the result of the expression evaluation.

Starting with version 5.1.3, if channels are configured, but expressions are not provided, the default expression is used to evaluate to the payload of the message.

When an exception is thrown in the scope of the advice, by default, that exception is thrown to the caller after any failureExpression is evaluated. If you wish to suppress throwing the exception, set the trapException property to true. The following advice shows how to configure an advice with Java DSL:

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.<String>handle((payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}
Rate Limiter Advice

The Rate Limiter advice (RateLimiterRequestHandlerAdvice) allows to ensure that an endpoint does not get overloaded with requests. When the rate limit is breached the request will go in a blocked state.

A typical use case for this advice might be an external service provider not allowing more than n number of request per minute.

The RateLimiterRequestHandlerAdvice implementation is fully based on the Resilience4j project and requires either RateLimiter or RateLimiterConfig injections. Can also be configured with defaults and/or custom name.

The following example configures a rate limiter advice with one request per 1 second:

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}
Caching Advice

Starting with version 5.2, the CacheRequestHandlerAdvice has been introduced. It is based on the caching abstraction in Spring Framework and aligned with the concepts and functionality provided by the @Caching annotation family. The logic internally is based on the CacheAspectSupport extension, where proxying for caching operations is done around the AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage method with the request Message<?> as the argument. This advice can be configured with a SpEL expression or a Function to evaluate a cache key. The request Message<?> is available as the root object for the SpEL evaluation context, or as the Function input argument. By default, the payload of the request message is used for the cache key. The CacheRequestHandlerAdvice must be configured with cacheNames, when a default cache operation is a CacheableOperation, or with a set of any arbitrary CacheOperation s. Every CacheOperation can be configured separately or have shared options, like a CacheManager, CacheResolver and CacheErrorHandler, can be reused from the CacheRequestHandlerAdvice configuration. This configuration functionality is similar to Spring Framework’s @CacheConfig and @Caching annotation combination. If a CacheManager is not provided, a single bean is resolved by default from the BeanFactory in the CacheAspectSupport.

The following example configures two advices with different set of caching operations:

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}

Reactive Advice

Starting with version 5.3, a ReactiveRequestHandlerAdvice can be used for request message handlers producing a Mono replies. A BiFunction<Message<?>, Mono<?>, Publisher<?>> has to be provided for this advice and it is called from the Mono.transform() operator on a reply produced by the intercepted handleRequestMessage() method implementation. Typically, such a Mono customization is necessary when we would like to control network fluctuations via timeout(), retry() and similar support operators. For example when we can an HTTP request over WebFlux client, we could use below configuration to not wait for response more than 5 seconds:

.handle(WebFlux.outboundGateway("https://somehost/"),
                       e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));

The message argument is the request message for the message handler and can be used to determine request-scope attributes. The mono argument is the result of this message handler’s handleRequestMessage() method implementation. A nested Mono.transform() can also be called from this function to apply, for example, a Reactive Circuit Breaker.

Context Holder Advice

Starting with version 6.1, the ContextHolderRequestHandlerAdvice has been introduced. This advice takes some value from the request message as and stores it in the context holder. The value is clear from the context when an execution is finished on the target MessageHandler. The best way to think about this advice is similar to the programming flow where we store some value into a ThreadLocal, get access to it from the target call and then clean up the ThreadLocal after execution. The ContextHolderRequestHandlerAdvice requires these constructor arguments: a Function<Message<?>, Object> as a value provider, Consumer<Object> as a context set callback and Runnable as a context clean up hook.

Following is a sample how a ContextHolderRequestHandlerAdvice can be used in combination with a o.s.i.file.remote.session.DelegatingSessionFactory:

@Bean
DelegatingSessionFactory<?> dsf(SessionFactory<?> one, SessionFactory<?> two) {
    return new DelegatingSessionFactory<>(Map.of("one", one, "two", two), null);
}

@Bean
ContextHolderRequestHandlerAdvice contextHolderRequestHandlerAdvice(DelegatingSessionFactory<String> dsf) {
    return new ContextHolderRequestHandlerAdvice(message -> message.getHeaders().get("FACTORY_KEY"),
                                      dsf::setThreadKey, dsf::clearThreadKey);
}

@ServiceActivator(inputChannel = "in", adviceChain = "contextHolderRequestHandlerAdvice")
FtpOutboundGateway ftpOutboundGateway(DelegatingSessionFactory<?> sessionFactory) {
	return new FtpOutboundGateway(sessionFactory, "ls", "payload");
}

And it is just enough to send a message to the in channel with a FACTORY_KEY header set to either one or two. The ContextHolderRequestHandlerAdvice sets the value from that header into a DelegatingSessionFactory via its setThreadKey. Then when FtpOutboundGateway executes an ls command a proper delegating SessionFactory is chosen from the DelegatingSessionFactory according to the value in its ThreadLocal. When the result is produced from the FtpOutboundGateway, a ThreadLocal value in the DelegatingSessionFactory is cleared according to the clearThreadKey() call from the ContextHolderRequestHandlerAdvice. See Delegating Session Factory for more information.

Custom Advice Classes

In addition to the provided advice classes described earlier, you can implement your own advice classes. While you can provide any implementation of org.aopalliance.aop.Advice (usually org.aopalliance.intercept.MethodInterceptor), we generally recommend that you subclass o.s.i.handler.advice.AbstractRequestHandlerAdvice. This has the benefit of avoiding the writing of low-level aspect-oriented programming code as well as providing a starting point that is specifically tailored for use in this environment.

Subclasses need to implement the doInvoke() method, the definition of which follows:

/**
 * Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
 * invokes the handler method and returns its result, or null).
 * @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
 * @param target The target handler.
 * @param message The message that will be sent to the handler.
 * @return the result after invoking the {@link MessageHandler}.
 * @throws Exception
 */
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;

The callback parameter is a convenience to avoid subclasses that deal with AOP directly. Invoking the callback.execute() method invokes the message handler.

The target parameter is provided for those subclasses that need to maintain state for a specific handler, perhaps by maintaining that state in a Map keyed by the target. This feature allows the same advice to be applied to multiple handlers. The RequestHandlerCircuitBreakerAdvice uses advice this to keep circuit breaker state for each handler.

The message parameter is the message sent to the handler. While the advice cannot modify the message before invoking the handler, it can modify the payload (if it has mutable properties). Typically, an advice would use the message for logging or to send a copy of the message somewhere before or after invoking the handler.

The return value would normally be the value returned by callback.execute(). However, the advice does have the ability to modify the return value. Note that only AbstractReplyProducingMessageHandler instances return values. The following example shows a custom advice class that extends AbstractRequestHandlerAdvice:

public class MyAdvice extends AbstractRequestHandlerAdvice {

    @Override
    protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
        // add code before the invocation
        Object result = callback.execute();
        // add code after the invocation
        return result;
    }
}

In addition to the execute() method, ExecutionCallback provides an additional method: cloneAndExecute(). This method must be used in cases where the invocation might be called multiple times within a single execution of doInvoke(), such as in the RequestHandlerRetryAdvice. This is required because the Spring AOP org.springframework.aop.framework.ReflectiveMethodInvocation object maintains state by keeping track of which advice in a chain was last invoked. This state must be reset for each call.

For more information, see the ReflectiveMethodInvocation Javadoc.

Other Advice Chain Elements

While the abstract class mentioned above is a convenience, you can add any Advice, including a transaction advice, to the chain.

Handling Message Advice

As discussed in the introduction to this section, advice objects in a request handler advice chain are applied to just the current endpoint, not the downstream flow (if any). For MessageHandler objects that produce a reply (such as those that extend AbstractReplyProducingMessageHandler), the advice is applied to an internal method: handleRequestMessage() (called from MessageHandler.handleMessage()). For other message handlers, the advice is applied to MessageHandler.handleMessage().

There are some circumstances where, even if a message handler is an AbstractReplyProducingMessageHandler, the advice must be applied to the handleMessage method. For example, the idempotent receiver might return null, which would cause an exception if the handler’s replyRequired property is set to true. Another example is the BoundRabbitChannelAdvice — see Strict Message Ordering.

Starting with version 4.3.1, a new HandleMessageAdvice interface and its base implementation (AbstractHandleMessageAdvice) have been introduced. Advice objects that implement HandleMessageAdvice are always applied to the handleMessage() method, regardless of the handler type.

It is important to understand that HandleMessageAdvice implementations (such as idempotent receiver), when applied to a handlers that return responses, are dissociated from the adviceChain and properly applied to the MessageHandler.handleMessage() method.

Because of this disassociation, the advice chain order is not honored.

Consider the following configuration:

<some-reply-producing-endpoint ... >
    <int:request-handler-advice-chain>
        <tx:advice ... />
        <ref bean="myHandleMessageAdvice" />
    </int:request-handler-advice-chain>
</some-reply-producing-endpoint>

In the preceding example, the <tx:advice> is applied to the AbstractReplyProducingMessageHandler.handleRequestMessage(). However, myHandleMessageAdvice is applied for to MessageHandler.handleMessage(). Therefore, it is invoked before the <tx:advice>. To retain the order, you should follow the standard Spring AOP configuration approach and use an endpoint id together with the .handler suffix to obtain the target MessageHandler bean. Note that, in that case, the entire downstream flow is within the transaction scope.

In the case of a MessageHandler that does not return a response, the advice chain order is retained.

Starting with version 5.3, the HandleMessageAdviceAdapter is provided to apply any MethodInterceptor for the MessageHandler.handleMessage() method and, therefore, the whole sub-flow. For example, a RetryOperationsInterceptor could be applied to the whole sub-flow starting from some endpoint; this is not possible, by default, because the consumer endpoint applies advices only to the AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage().

Transaction Support

Starting with version 5.0, a new TransactionHandleMessageAdvice has been introduced to make the whole downstream flow transactional, thanks to the HandleMessageAdvice implementation. When a regular TransactionInterceptor is used in the <request-handler-advice-chain> element (for example, through configuring <tx:advice>), a started transaction is applied only for an internal AbstractReplyProducingMessageHandler.handleRequestMessage() and is not propagated to the downstream flow.

To simplify XML configuration, along with the <request-handler-advice-chain>, a <transactional> element has been added to all <outbound-gateway> and <service-activator> and related components. The following example shows <transactional> in use:

<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
        <int-jdbc:transactional/>
</int-jdbc:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
    <constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

If you are familiar with the JPA integration components, such a configuration is not new, but now we can start a transaction from any point in our flow — not only from the <poller> or a message-driven channel adapter such as JMS.

Java configuration can be simplified by using the TransactionInterceptorBuilder, and the result bean name can be used in the messaging annotations adviceChain attribute, as the following example shows:

@Bean
public ConcurrentMetadataStore store() {
    return new SimpleMetadataStore(hazelcastInstance()
                       .getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    message -> message.getPayload().toString(),
                    message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(this.transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRES_NEW)
                .build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
         outputChannel = "output",
         adviceChain = { "idempotentReceiverInterceptor",
                 "transactionInterceptor" })
public Transformer transformer() {
    return message -> message;
}

Note the true parameter on the TransactionInterceptorBuilder constructor. It causes the creation of a TransactionHandleMessageAdvice, not a regular TransactionInterceptor.

Java DSL supports an Advice through the .transactional() options on the endpoint configuration, as the following example shows:

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
        .handle(Jpa.updatingGateway(this.entityManagerFactory),
                e -> e.transactional(true))
        .channel(c -> c.queue("persistResults"));
}

Advising Filters

There is an additional consideration when advising Filter advices. By default, any discard actions (when the filter returns false) are performed within the scope of the advice chain. This could include all the flow downstream of the discard channel. So, for example, if an element downstream of the discard channel throws an exception and there is a retry advice, the process is retried. Also, if throwExceptionOnRejection is set to true (the exception is thrown within the scope of the advice).

Setting discard-within-advice to false modifies this behavior and the discard (or exception) occurs after the advice chain is called.

Advising Endpoints Using Annotations

When configuring certain endpoints by using annotations (@Filter, @ServiceActivator, @Splitter, and @Transformer), you can supply a bean name for the advice chain in the adviceChain attribute. In addition, the @Filter annotation also has the discardWithinAdvice attribute, which can be used to configure the discard behavior, as discussed in Advising Filters. The following example causes the discard to be performed after the advice:

@MessageEndpoint
public class MyAdvisedFilter {

    @Filter(inputChannel="input", outputChannel="output",
            adviceChain="adviceChain", discardWithinAdvice="false")
    public boolean filter(String s) {
        return s.contains("good");
    }
}

Ordering Advices within Advice Chain

Advice classes are “around” advices and are applied in a nested fashion. The first advice is the outermost, while the last advice is the innermost (that is, closest to the handler being advised). It is important to put the advice classes in the correct order to achieve the functionality you desire.

For example, suppose you want to add a retry advice and a transaction advice. You may want to place the retry advice first, followed by the transaction advice. Consequently, each retry is performed in a new transaction. On the other hand, if you want all the attempts and any recovery operations (in the retry RecoveryCallback) to be scoped within the transaction, you could put the transaction advice first.

Advised Handler Properties

Sometimes, it is useful to access handler properties from within the advice. For example, most handlers implement NamedComponent to let you access the component name.

The target object can be accessed through the target argument (when subclassing AbstractRequestHandlerAdvice) or invocation.getThis() (when implementing org.aopalliance.intercept.MethodInterceptor).

When the entire handler is advised (such as when the handler does not produce replies or the advice implements HandleMessageAdvice), you can cast the target object to an interface, such as NamedComponent, as shown in the following example:

String componentName = ((NamedComponent) target).getComponentName();

When you implement MethodInterceptor directly, you could cast the target object as follows:

String componentName = ((NamedComponent) invocation.getThis()).getComponentName();

When only the handleRequestMessage() method is advised (in a reply-producing handler), you need to access the full handler, which is an AbstractReplyProducingMessageHandler. The following example shows how to do so:

AbstractReplyProducingMessageHandler handler =
    ((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();

String componentName = handler.getComponentName();

Idempotent Receiver Enterprise Integration Pattern

Starting with version 4.1, Spring Integration provides an implementation of the Idempotent Receiver Enterprise Integration Pattern. It is a functional pattern and the whole idempotency logic should be implemented in the application. However, to simplify the decision-making, the IdempotentReceiverInterceptor component is provided. This is an AOP Advice that is applied to the MessageHandler.handleMessage() method and that can filter a request message or mark it as a duplicate, according to its configuration.

Previously, you could have implemented this pattern by using a custom MessageSelector in a <filter/> (see Filter), for example. However, since this pattern really defines the behavior of an endpoint rather than being an endpoint itself, the idempotent receiver implementation does not provide an endpoint component. Rather, it is applied to endpoints declared in the application.

The logic of the IdempotentReceiverInterceptor is based on the provided MessageSelector and, if the message is not accepted by that selector, it is enriched with the duplicateMessage header set to true. The target MessageHandler (or downstream flow) can consult this header to implement the correct idempotency logic. If the IdempotentReceiverInterceptor is configured with a discardChannel or throwExceptionOnRejection = true, the duplicate message is not sent to the target MessageHandler.handleMessage(). Rather, it is discarded. If you want to discard (do nothing with) the duplicate message, the discardChannel should be configured with a NullChannel, such as the default nullChannel bean.

To maintain state between messages and provide the ability to compare messages for the idempotency, we provide the MetadataStoreSelector. It accepts a MessageProcessor implementation (which creates a lookup key based on the Message) and an optional ConcurrentMetadataStore (Metadata Store). See the MetadataStoreSelector Javadoc for more information. You can also customize the value for ConcurrentMetadataStore by using an additional MessageProcessor. By default, MetadataStoreSelector uses the timestamp message header.

Normally, the selector selects a message for acceptance if there is no existing value for the key. In some cases, it is useful to compare the current and new values for a key, to determine whether the message should be accepted. Starting with version 5.3, the compareValues property is provided which references a BiPredicate<String, String>; the first parameter is the old value; return true to accept the message and replace the old value with the new value in the MetadataStore. This can be useful to reduce the number of keys; for example, when processing lines in a file, you can store the file name in the key and the current line number in the value. Then, after a restart, you can skip lines that have already been processed. See Idempotent Downstream Processing a Split File for an example.

For convenience, the MetadataStoreSelector options are configurable directly on the <idempotent-receiver> component. The following listing shows all the possible attributes:

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1 The ID of the IdempotentReceiverInterceptor bean. Optional.
2 Consumer endpoint name(s) or pattern(s) to which this interceptor is applied. Separate names (patterns) with commas (,), such as endpoint="aaa, bbb*, ccc, *ddd, eee*fff". Endpoint bean names matching these patterns are then used to retrieve the target endpoint’s MessageHandler bean (using its .handler suffix), and the IdempotentReceiverInterceptor is applied to those beans. Required.
3 A MessageSelector bean reference. Mutually exclusive with metadata-store and key-strategy (key-expression). When selector is not provided, one of key-strategy or key-strategy-expression is required.
4 Identifies the channel to which to send a message when the IdempotentReceiverInterceptor does not accept it. When omitted, duplicate messages are forwarded to the handler with a duplicateMessage header. Optional.
5 A ConcurrentMetadataStore reference. Used by the underlying MetadataStoreSelector. Mutually exclusive with selector. Optional. The default MetadataStoreSelector uses an internal SimpleMetadataStore that does not maintain state across application executions.
6 A MessageProcessor reference. Used by the underlying MetadataStoreSelector. Evaluates an idempotentKey from the request message. Mutually exclusive with selector and key-expression. When a selector is not provided, one of key-strategy or key-strategy-expression is required.
7 A SpEL expression to populate an ExpressionEvaluatingMessageProcessor. Used by the underlying MetadataStoreSelector. Evaluates an idempotentKey by using the request message as the evaluation context root object. Mutually exclusive with selector and key-strategy. When a selector is not provided, one of key-strategy or key-strategy-expression is required.
8 A MessageProcessor reference. Used by the underlying MetadataStoreSelector. Evaluates a value for the idempotentKey from the request message. Mutually exclusive with selector and value-expression. By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the Metadata 'value'.
9 A SpEL expression to populate an ExpressionEvaluatingMessageProcessor. Used by the underlying MetadataStoreSelector. Evaluates a value for the idempotentKey by using the request message as the evaluation context root object. Mutually exclusive with selector and value-strategy. By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the metadata 'value'.
10 A reference to a BiPredicate<String, String> bean which allows you to optionally select a message by comparing the old and new values for the key; null by default.
11 Whether to throw an exception if the IdempotentReceiverInterceptor rejects the message. Defaults to false. It is applied regardless of whether or not a discard-channel is provided.

For Java configuration, Spring Integration provides the method-level @IdempotentReceiver annotation. It is used to mark a method that has a messaging annotation (@ServiceActivator, @Router, and others) to specify which `IdempotentReceiverInterceptor objects are applied to this endpoint. The following example shows how to use the @IdempotentReceiver annotation:

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

When you use the Java DSL, you can add the interceptor to the endpoint’s advice chain, as the following example shows:

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
The IdempotentReceiverInterceptor is designed only for the MessageHandler.handleMessage(Message<?>) method. Starting with version 4.3.1, it implements HandleMessageAdvice, with the AbstractHandleMessageAdvice as a base class, for better dissociation. See Handling Message Advice for more information.