This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.2.12!

Inbound Channel Adapter

The following listing shows the possible configuration options for an AMQP Inbound Channel Adapter:

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
container

Note that when configuring an external container with XML, you cannot use the Spring AMQP namespace to define the container. This is because the namespace requires at least one <listener/> element. In this environment, the listener is internal to the adapter. For this reason, you must define the container by using a normal Spring <bean/> definition, as the following example shows:

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
Even though the Spring Integration JMS and AMQP support is similar, important differences exist. The JMS inbound channel adapter is using a JmsDestinationPollingSource under the covers and expects a configured poller. The AMQP inbound channel adapter uses an AbstractMessageListenerContainer and is message driven. In that regard, it is more similar to the JMS message-driven channel adapter.

Starting with version 5.5, the AmqpInboundChannelAdapter can be configured with an org.springframework.amqp.rabbit.retry.MessageRecoverer strategy which is used in the RecoveryCallback when the retry operation is called internally. See setMessageRecoverer() JavaDocs for more information.

The @Publisher annotation also can be used in combination with a @RabbitListener:

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

    @Bean
    QueueChannel fromRabbitViaPublisher() {
        return new QueueChannel();
    }

    @RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
    @Publisher("fromRabbitViaPublisher")
    @Payload("#args.payload.toUpperCase()")
    public void consumeForPublisher(String payload) {

    }

}

By default, the @Publisher AOP interceptor deals with a return value from a method call. However, the return value from a @RabbitListener method is treated as an AMQP reply message. Therefore, such an approach cannot be used together with a @Publisher, so a @Payload annotation with respective SpEL expression against method arguments is a recommended way for this combination. See more information about the @Publisher in the Annotation-driven Configuration section.

When using exclusive or single-active consumers in the listener container, it is recommended that you set the container property forceStop to true. This will prevent a race condition where, after stopping the container, another consumer could start consuming messages before this instance has fully stopped.

Batched Messages

See the Spring AMQP Documentation for more information about batched messages.

To produce batched messages with Spring Integration, simply configure the outbound endpoint with a BatchingRabbitTemplate.

When receiving batched messages, by default, the listener containers extract each fragment message and the adapter will produce a Message<?> for each fragment. Starting with version 5.2, if the container’s deBatchingEnabled property is set to false, the de-batching is performed by the adapter instead, and a single Message<List<?>> is produced with the payload being a list of the fragment payloads (after conversion if appropriate).

The default BatchingStrategy is the SimpleBatchingStrategy, but this can be overridden on the adapter.

The org.springframework.amqp.rabbit.retry.MessageBatchRecoverer must be used with batches when recovery is required for retry operations.