Reactive Streams Support

Spring Integration provides support for Reactive Streams interaction in some places of the framework and from different aspects. We will discuss most of them here with appropriate links to the target chapters for details whenever necessary.

Preface

To recap, Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns. Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters. Spring Integration’s primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code. This goal is achieved in the target application using first class citizens like message, channel and endpoint, which allow us to build an integration flow (pipeline), where (in most cases) one endpoint produces messages into a channel to be consumed by another endpoint. This way we distinguish an integration interaction model from the target business logic. The crucial part here is a channel in between: the flow behavior depends on its implementation leaving endpoints untouched.

On the other hand, the Reactive Streams is a standard for asynchronous stream processing with non-blocking back pressure. The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – like passing elements on to another thread or thread-pool – while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The intention of Reactive Streams implementation, such as Project Reactor, is to preserve these benefits and characteristics across the whole processing graph of a stream application. The ultimate goal of Reactive Streams libraries is to provide types, set of operators and supporting API for a target application in a transparent and smooth manner as is possible with available programming language structure, but the final solution is not as imperative as it is with a normal function chain invocation. It is divided into to phases: definition and execution, which happens some time later during subscription to the final reactive publisher, and demand for data is pushed from the bottom of the definition to the top applying back-pressure as needed - we request as many events as we can handle at the moment. The reactive application looks like a "stream" or as we got used to in Spring Integration terms - "flow". In fact the Reactive Streams SPI since Java 9 is presented in the java.util.concurrent.Flow class.

From here it may look like Spring Integration flows are really a good fit for writing Reactive Streams applications when we apply some reactive framework operators on endpoints, but in fact the problems is much broader and we need to keep in mind that not all endpoints (e.g. JdbcMessageHandler) can be processed in a reactive stream transparently. Of course, the main goal for Reactive Streams support in Spring Integration is to allow the whole process to be fully reactive, on demand initiated and back-pressure ready. It is not going to be possible until the target protocols and systems for channel adapters provide a Reactive Streams interaction model. In the sections below we will describe what components and approaches are provided in Spring Integration for developing reactive application preserving integration flow structures.

All the Reactive Streams interaction in Spring Integration implemented with Project Reactor types, such as Mono and Flux.

Messaging Gateway

The simplest point of interaction with Reactive Streams is a @MessagingGateway where we just make a return type of the gateway method as a Mono<?> - and the whole integration flow behind a gateway method call is going to be performed when a subscription happens on the returned Mono instance. See Reactor Mono for more information. A similar Mono-reply approach is used in the framework internally for inbound gateways which are fully based on Reactive Streams compatible protocols (see Reactive Channel Adapters below for more information). The send-and-receive operation is wrapped into a Mono.deffer() with chaining a reply evaluation from the replyChannel header whenever it is available. This way an inbound component for the particular reactive protocol (e.g. Netty) is going to be as a subscriber and initiator for a reactive flow performed on the Spring Integration. If the request payload is a reactive type, it would be better to handle it withing a reactive stream definition deferring a process to the initiator subscription. For this purpose a handler method must return a reactive type as well. See the next section for more information.

Reactive Reply Payload

When a reply producing MessageHandler returns a reactive type payload for a reply message, it is processed in an asynchronous manner with a regular MessageChannel implementation provided for the outputChannel and flattened with on demand subscription when the output channel is a ReactiveStreamsSubscribableChannel implementation, e.g. FluxMessageChannel. With a standard imperative MessageChannel use-case, and if a reply payload is a multi-value publisher (see ReactiveAdapter.isMultiValue() for more information), it is wrapped into a Mono.just(). A result of this, the Mono has to be subscribed explicitly downstream or flattened by the FluxMessageChannel downstream. With a ReactiveStreamsSubscribableChannel for the outputChannel, there is no need to be concerned about return type and subscription; everything is processed smoothly by the framework internally.

See Asynchronous Service Activator for more information.

Also see Kotlin Coroutines for more information.

FluxMessageChannel and ReactiveStreamsConsumer

The FluxMessageChannel is a combined implementation of MessageChannel and Publisher<Message<?>>. A Flux, as a hot source, is created internally for sinking incoming messages from the send() implementation. The Publisher.subscribe() implementation is delegated to that internal Flux. Also, for on demand upstream consumption, the FluxMessageChannel provides an implementation for the ReactiveStreamsSubscribableChannel contract. Any upstream Publisher (see Source Polling Channel Adapter and splitter below, for example) provided for this channel is auto-subscribed when subscription is ready for this channel. Events from this delegating publishers are sunk into an internal Flux mentioned above.

A consumer for the FluxMessageChannel must be a org.reactivestreams.Subscriber instance for honoring the Reactive Streams contract. Fortunately, all of the MessageHandler implementations in Spring Integration also implement a CoreSubscriber from project Reactor. And thanks to a ReactiveStreamsConsumer implementation in between, the whole integration flow configuration is left transparent for target developers. In this case, the flow behavior is changed from an imperative push model to a reactive pull model. A ReactiveStreamsConsumer can also be used to turn any MessageChannel into a reactive source using IntegrationReactiveUtils, making an integration flow partially reactive.

See FluxMessageChannel for more information.

Starting with version 5.5, the ConsumerEndpointSpec introduces a reactive() option to make the endpoint in the flow as a ReactiveStreamsConsumer independently of the input channel. The optional Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> can be provided to customise a source Flux from the input channel via Flux.transform() operation, e.g. with the publishOn(), doOnNext(), retry() etc. This functionality is represented as a @Reactive sub-annotation for all the messaging annotation (@ServiceActivator, @Splitter etc.) via their reactive() attribute.

Source Polling Channel Adapter

Usually, the SourcePollingChannelAdapter relies on the task which is initiated by the TaskScheduler. A polling trigger is built from the provided options and used for periodic scheduling a task to poll a target source of data or events. When an outputChannel is a ReactiveStreamsSubscribableChannel, the same Trigger is used to determine the next time for execution, but instead of scheduling tasks, the SourcePollingChannelAdapter creates a Flux<Message<?>> based on the Flux.generate() for the nextExecutionTime values and Mono.delay() for a duration from the previous step. A Flux.flatMapMany() is used then to poll maxMessagesPerPoll and sink them into an output Flux. This generator Flux is subscribed by the provided ReactiveStreamsSubscribableChannel honoring a back-pressure downstream. Starting with version 5.5, when maxMessagesPerPoll == 0, the source is not called at all, and flatMapMany() is completed immediately via a Mono.empty() result until the maxMessagesPerPoll is changed to non-zero value at a later time, e.g. via a Control Bus. This way, any MessageSource implementation can be turned into a reactive hot source.

See Polling Consumer for more information.

Event-Driven Channel Adapter

MessageProducerSupport is the base class for event-driven channel adapters and, typically, its sendMessage(Message<?>) is used as a listener callback in the producing driver API. This callback can also be easily plugged into the doOnNext() Reactor operator when a message producer implementation builds a Flux of messages instead of listener-based functionality. In fact, this is done in the framework when an outputChannel of the message producer is not a ReactiveStreamsSubscribableChannel. However, for improved end-user experience, and to allow more back-pressure ready functionality, the MessageProducerSupport provides a subscribeToPublisher(Publisher<? extends Message<?>>) API to be used in the target implementation when a Publisher<Message<?>>> is the source of data from the target system. Typically, it is used from the doStart() implementation when target driver API is called for a Publisher of source data. It is recommended to combine a reactive MessageProducerSupport implementation with a FluxMessageChannel as the outputChannel for on-demand subscription and event consumption downstream. The channel adapter goes to a stopped state when a subscription to the Publisher is cancelled. Calling stop() on such a channel adapter completes the producing from the source Publisher. The channel adapter can be restarted with automatic subscription to a newly created source Publisher.

Message Source to Reactive Streams

Starting with version 5.3, a ReactiveMessageSourceProducer is provided. It is a combination of a provided MessageSource and event-driven production into the configured outputChannel. Internally it wraps a MessageSource into the repeatedly resubscribed Mono producing a Flux<Message<?>> to be subscribed in the subscribeToPublisher(Publisher<? extends Message<?>>) mentioned above. The subscription for this Mono is done using Schedulers.boundedElastic() to avoid possible blocking in the target MessageSource. When the message source returns null (no data to pull), the Mono is turned into a repeatWhenEmpty() state with a delay for a subsequent re-subscription based on a IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration entry from the subscriber context. By default, it is 1 second. If the MessageSource produces messages with a IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK information in the headers, it is acknowledged (if necessary) in the doOnSuccess() of the original Mono and rejected in the doOnError() if the downstream flow throws a MessagingException with the failed message to reject. This ReactiveMessageSourceProducer could be used for any use-case when a polling channel adapter’s features should be turned into a reactive, on demand solution for any existing MessageSource<?> implementation.

Splitter and Aggregator

When an AbstractMessageSplitter gets a Publisher for its logic, the process goes naturally over the items in the Publisher to map them into messages for sending to the outputChannel. If this channel is a ReactiveStreamsSubscribableChannel, the Flux wrapper for the Publisher is subscribed on demand from that channel and this splitter behavior looks more like a flatMap Reactor operator, when we map an incoming event into multi-value output Publisher. It makes most sense when the whole integration flow is built with a FluxMessageChannel before and after the splitter, aligning Spring Integration configuration with a Reactive Streams requirements and its operators for event processing. With a regular channel, a Publisher is converted into an Iterable for standard iterate-and-produce splitting logic.

A FluxAggregatorMessageHandler is another sample of specific Reactive Streams logic implementation which could be treated as a "reactive operator" in terms of Project Reactor. It is based on the Flux.groupBy() and Flux.window() (or buffer()) operators. The incoming messages are sunk into a Flux.create() initiated when a FluxAggregatorMessageHandler is created, making it as a hot source. This Flux is subscribed to by a ReactiveStreamsSubscribableChannel on demand, or directly in the FluxAggregatorMessageHandler.start() when the outputChannel is not reactive. This MessageHandler has its power, when the whole integration flow is built with a FluxMessageChannel before and after this component, making the whole logic back-pressure ready.

See Stream and Flux Splitting and Flux Aggregator for more information.

Java DSL

An IntegrationFlow in Java DSL can start from any Publisher instance (see IntegrationFlow.from(Publisher<Message<T>>)). Also, with an IntegrationFlowBuilder.toReactivePublisher() operator, the IntegrationFlow can be turned into a reactive hot source. A FluxMessageChannel is used internally in both cases; it can subscribe to an inbound Publisher according to its ReactiveStreamsSubscribableChannel contract and it is a Publisher<Message<?>> by itself for downstream subscribers. With a dynamic IntegrationFlow registration we can implement a powerful logic combining Reactive Streams with this integration flow bridging to/from Publisher.

Starting with version 5.5.6, a toReactivePublisher(boolean autoStartOnSubscribe) operator variant is present to control a lifecycle of the whole IntegrationFlow behind the returned Publisher<Message<?>>. Typically, the subscription and consumption from the reactive publisher happens in the later runtime phase, not during reactive stream composition, or even ApplicationContext startup. To avoid boilerplate code for lifecycle management of the IntegrationFlow at the Publisher<Message<?>> subscription point and for better end-user experience, this new operator with the autoStartOnSubscribe flag has been introduced. It marks (if true) the IntegrationFlow and its components for autoStartup = false, so an ApplicationContext won’t initiate production and consumption of messages in the flow automatically. Instead, the start() for the IntegrationFlow is initiated from the internal Flux.doOnSubscribe(). Independently of the autoStartOnSubscribe value, the flow is stopped from a Flux.doOnCancel() and Flux.doOnTerminate() - it does not make sense to produce messages if there is nothing to consume them.

For the exact opposite use-case, when IntegrationFlow should call a reactive stream and continue after completion, a fluxTransform() operator is provided in the IntegrationFlowDefinition. The flow at this point is turned into a FluxMessageChannel which is propagated into a provided fluxFunction, performed in the Flux.transform() operator. A result of the function is wrapped into a Mono<Message<?>> for flat-mapping into an output Flux which is subscribed by another FluxMessageChannel for downstream flow.

See Java DSL Chapter for more information.

ReactiveMessageHandler

Starting with version 5.3, the ReactiveMessageHandler is supported natively in the framework. This type of message handler is designed for reactive clients which return a reactive type for on-demand subscription for low-level operation execution and doesn’t provide any reply data to continue a reactive stream composition. When a ReactiveMessageHandler is used in the imperative integration flow, the handleMessage() result in subscribed immediately after return, just because there is no reactive streams composition in such a flow to honor back-pressure. In this case the framework wraps this ReactiveMessageHandler into a ReactiveMessageHandlerAdapter - a plain implementation of MessageHandler. However, when a ReactiveStreamsConsumer is involved in the flow (e.g. when channel to consume is a FluxMessageChannel), such a ReactiveMessageHandler is composed to the whole reactive stream with a flatMap() Reactor operator to honor back-pressure during consumption.

One of the out-of-the-box ReactiveMessageHandler implementation is a ReactiveMongoDbStoringMessageHandler for Outbound Channel Adapter. See MongoDB Reactive Channel Adapters for more information.

Reactive Channel Adapters

When the target protocol for integration provides a Reactive Streams solution, it becomes straightforward to implement channel adapters in Spring Integration.

An inbound, event-driven channel adapter implementation is about wrapping a request (if necessary) into a deferred Mono or Flux and perform a send (and produce reply, if any) only when a protocol component initiates a subscription into a Mono returned from the listener method. This way we have a reactive stream solution encapsulated exactly in this component. Of course, downstream integration flow subscribed on the output channel should honor Reactive Streams specification and be performed in the on demand, back-pressure ready manner.

This is not always available by the nature (or with the current implementation) of MessageHandler processor used in the integration flow. This limitation can be handled using thread pools and queues or FluxMessageChannel (see above) before and after integration endpoints when there is no reactive implementation.

An example for a reactive event-driven inbound channel adapter:

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

Usage would look like:

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

Or in a declarative way:

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

Or even without a channel adapter, we can always use the Java DSL in the following way:

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

A reactive outbound channel adapter implementation is about the initiation (or continuation) of a reactive stream to interaction with an external system according to the provided reactive API for the target protocol. An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of the reactive stream on top. A returned reactive type can be subscribed immediately if we are in a one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.

An example for a reactive outbound channel adapter:

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

We will be able to use both of the channel adapters:

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

Currently, Spring Integration provides channel adapter (or gateway) implementations for WebFlux, RSocket, MongoDb, R2DBC, ZeroMQ, GraphQL, Apache Cassandra. The Redis Stream Channel Adapters are also reactive and uses ReactiveStreamOperations from Spring Data. More reactive channel adapters are coming, for example for Apache Kafka in Kafka based on the ReactiveKafkaProducerTemplate and ReactiveKafkaConsumerTemplate from Spring for Apache Kafka etc. For many other non-reactive channel adapters thread pools are recommended to avoid blocking during reactive stream processing.

Reactive to Imperative Context Propagation

When the Context Propagation library is on the classpath, the Project Reactor can take ThreadLocal values (e.g. Micrometer Observation or SecurityContextHolder) and store them into a Subscriber context. The opposite operation is also possible, when we need to populate a logging MDC for tracing or let services we call from the reactive stream to restore an observation from the scope. See more information in Project Reactor documentation about its special operators for context propagation. The storing and restoring context works smoothly if our whole solution is a single reactive stream composition since a Subscriber context is visible from downstream up to the beginning of the composition(Flux or Mono). But, if the application switches between different Flux instances or into imperative processing and back, then the context tied to the Subscriber might not be available. For such a use case, Spring Integration provides an additional capability (starting with version 6.0.5) to store a Reactor ContextView into the IntegrationMessageHeaderAccessor.REACTOR_CONTEXT message header produced from the reactive stream, e.g. when we perform direct send() operation. This header is used then in the FluxMessageChannel.subscribeTo() to restore a Reactor context for the Message that this channel is going to emit. Currently, this header is populated from the WebFluxInboundEndpoint and RSocketInboundGateway components, but can be used in any solution where reactive to imperative integration is performed. The logic to populate this header is like this:

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

Note, that we still need to use a handle() operator to make Reactor restore ThreadLocal values from the context. Even if it is sent as a header, the framework cannot make an assumption if it is going to be to restore onto ThreadLocal values downstream.

To restore the context from a Message on the other Flux or Mono composition, this logic can be performed:

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));