Reactive Streams Support
Spring Integration provides support for Reactive Streams interaction in some places of the framework and from different aspects. We will discuss most of them here with appropriate links to the target chapters for details whenever necessary.
Preface
To recap, Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns.
Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.
Spring Integration’s primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code.
This goal is achieved in the target application using first class citizens like message
, channel
and endpoint
, which allow us to build an integration flow (pipeline), where (in most cases) one endpoint produces messages into a channel to be consumed by another endpoint.
This way we distinguish an integration interaction model from the target business logic.
The crucial part here is a channel in between: the flow behavior depends on its implementation leaving endpoints untouched.
On the other hand, the Reactive Streams is a standard for asynchronous stream processing with non-blocking back pressure.
The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – like passing elements on to another thread or thread-pool – while ensuring that the receiving side is not forced to buffer arbitrary amounts of data.
In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded.
The intention of Reactive Streams implementation, such as Project Reactor, is to preserve these benefits and characteristics across the whole processing graph of a stream application.
The ultimate goal of Reactive Streams libraries is to provide types, set of operators and supporting API for a target application in a transparent and smooth manner as is possible with available programming language structure, but the final solution is not as imperative as it is with a normal function chain invocation.
It is divided into to phases: definition and execution, which happens some time later during subscription to the final reactive publisher, and demand for data is pushed from the bottom of the definition to the top applying back-pressure as needed - we request as many events as we can handle at the moment.
The reactive application looks like a "stream"
or as we got used to in Spring Integration terms - "flow"
.
In fact the Reactive Streams SPI since Java 9 is presented in the java.util.concurrent.Flow
class.
From here it may look like Spring Integration flows are really a good fit for writing Reactive Streams applications when we apply some reactive framework operators on endpoints, but in fact the problems is much broader and we need to keep in mind that not all endpoints (e.g. JdbcMessageHandler
) can be processed in a reactive stream transparently.
Of course, the main goal for Reactive Streams support in Spring Integration is to allow the whole process to be fully reactive, on demand initiated and back-pressure ready.
It is not going to be possible until the target protocols and systems for channel adapters provide a Reactive Streams interaction model.
In the sections below we will describe what components and approaches are provided in Spring Integration for developing reactive application preserving integration flow structures.
All the Reactive Streams interaction in Spring Integration implemented with Project Reactor types, such as Mono and Flux .
|
Messaging Gateway
The simplest point of interaction with Reactive Streams is a @MessagingGateway
where we just make a return type of the gateway method as a Mono<?>
- and the whole integration flow behind a gateway method call is going to be performed when a subscription happens on the returned Mono
instance.
See Reactor Mono
for more information.
A similar Mono
-reply approach is used in the framework internally for inbound gateways which are fully based on Reactive Streams compatible protocols (see Reactive Channel Adapters below for more information).
The send-and-receive operation is wrapped into a Mono.deffer()
with chaining a reply evaluation from the replyChannel
header whenever it is available.
This way an inbound component for the particular reactive protocol (e.g. Netty) is going to be as a subscriber and initiator for a reactive flow performed on the Spring Integration.
If the request payload is a reactive type, it would be better to handle it withing a reactive stream definition deferring a process to the initiator subscription.
For this purpose a handler method must return a reactive type as well.
See the next section for more information.
Reactive Reply Payload
When a reply producing MessageHandler
returns a reactive type payload for a reply message, it is processed in an asynchronous manner with a regular MessageChannel
implementation provided for the outputChannel
(the async
must be set to true
) and flattened with on demand subscription when the output channel is a ReactiveStreamsSubscribableChannel
implementation, e.g. FluxMessageChannel
.
With a standard imperative MessageChannel
use-case, and if a reply payload is a multi-value publisher (see ReactiveAdapter.isMultiValue()
for more information), it is wrapped into a Mono.just()
.
A result of this, the Mono
has to be subscribed explicitly downstream or flattened by the FluxMessageChannel
downstream.
With a ReactiveStreamsSubscribableChannel
for the outputChannel
, there is no need to be concerned about return type and subscription; everything is processed smoothly by the framework internally.
See Asynchronous Service Activator for more information.
Also see Kotlin Coroutines for more information.
FluxMessageChannel
and ReactiveStreamsConsumer
The FluxMessageChannel
is a combined implementation of MessageChannel
and Publisher<Message<?>>
.
A Flux
, as a hot source, is created internally for sinking incoming messages from the send()
implementation.
The Publisher.subscribe()
implementation is delegated to that internal Flux
.
Also, for on demand upstream consumption, the FluxMessageChannel
provides an implementation for the ReactiveStreamsSubscribableChannel
contract.
Any upstream Publisher
(see Source Polling Channel Adapter and splitter below, for example) provided for this channel is auto-subscribed when subscription is ready for this channel.
Events from this delegating publishers are sunk into an internal Flux
mentioned above.
A consumer for the FluxMessageChannel
must be a org.reactivestreams.Subscriber
instance for honoring the Reactive Streams contract.
Fortunately, all of the MessageHandler
implementations in Spring Integration also implement a CoreSubscriber
from project Reactor.
And thanks to a ReactiveStreamsConsumer
implementation in between, the whole integration flow configuration is left transparent for target developers.
In this case, the flow behavior is changed from an imperative push model to a reactive pull model.
A ReactiveStreamsConsumer
can also be used to turn any MessageChannel
into a reactive source using IntegrationReactiveUtils
, making an integration flow partially reactive.
See FluxMessageChannel
for more information.
Starting with version 5.5, the ConsumerEndpointSpec
introduces a reactive()
option to make the endpoint in the flow as a ReactiveStreamsConsumer
independently of the input channel.
The optional Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
can be provided to customise a source Flux
from the input channel via Flux.transform()
operation, e.g. with the publishOn()
, doOnNext()
, retry()
etc.
This functionality is represented as a @Reactive
sub-annotation for all the messaging annotation (@ServiceActivator
, @Splitter
etc.) via their reactive()
attribute.
Source Polling Channel Adapter
Usually, the SourcePollingChannelAdapter
relies on the task which is initiated by the TaskScheduler
.
A polling trigger is built from the provided options and used for periodic scheduling a task to poll a target source of data or events.
When an outputChannel
is a ReactiveStreamsSubscribableChannel
, the same Trigger
is used to determine the next time for execution, but instead of scheduling tasks, the SourcePollingChannelAdapter
creates a Flux<Message<?>>
based on the Flux.generate()
for the nextExecutionTime
values and Mono.delay()
for a duration from the previous step.
A Flux.flatMapMany()
is used then to poll maxMessagesPerPoll
and sink them into an output Flux
.
This generator Flux
is subscribed by the provided ReactiveStreamsSubscribableChannel
honoring a back-pressure downstream.
Starting with version 5.5, when maxMessagesPerPoll == 0
, the source is not called at all, and flatMapMany()
is completed immediately via a Mono.empty()
result until the maxMessagesPerPoll
is changed to non-zero value at a later time, e.g. via a Control Bus.
This way, any MessageSource
implementation can be turned into a reactive hot source.
See Polling Consumer for more information.
Event-Driven Channel Adapter
MessageProducerSupport
is the base class for event-driven channel adapters and, typically, its sendMessage(Message<?>)
is used as a listener callback in the producing driver API.
This callback can also be easily plugged into the doOnNext()
Reactor operator when a message producer implementation builds a Flux
of messages instead of listener-based functionality.
In fact, this is done in the framework when an outputChannel
of the message producer is not a ReactiveStreamsSubscribableChannel
.
However, for improved end-user experience, and to allow more back-pressure ready functionality, the MessageProducerSupport
provides a subscribeToPublisher(Publisher<? extends Message<?>>)
API to be used in the target implementation when a Publisher<Message<?>>>
is the source of data from the target system.
Typically, it is used from the doStart()
implementation when target driver API is called for a Publisher
of source data.
It is recommended to combine a reactive MessageProducerSupport
implementation with a FluxMessageChannel
as the outputChannel
for on-demand subscription and event consumption downstream.
The channel adapter goes to a stopped state when a subscription to the Publisher
is cancelled.
Calling stop()
on such a channel adapter completes the producing from the source Publisher
.
The channel adapter can be restarted with automatic subscription to a newly created source Publisher
.
Message Source to Reactive Streams
Starting with version 5.3, a ReactiveMessageSourceProducer
is provided.
It is a combination of a provided MessageSource
and event-driven production into the configured outputChannel
.
Internally it wraps a MessageSource
into the repeatedly resubscribed Mono
producing a Flux<Message<?>>
to be subscribed in the subscribeToPublisher(Publisher<? extends Message<?>>)
mentioned above.
The subscription for this Mono
is done using Schedulers.boundedElastic()
to avoid possible blocking in the target MessageSource
.
When the message source returns null
(no data to pull), the Mono
is turned into a repeatWhenEmpty()
state with a delay
for a subsequent re-subscription based on a IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
entry from the subscriber context.
By default, it is 1 second.
If the MessageSource
produces messages with a IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
information in the headers, it is acknowledged (if necessary) in the doOnSuccess()
of the original Mono
and rejected in the doOnError()
if the downstream flow throws a MessagingException
with the failed message to reject.
This ReactiveMessageSourceProducer
could be used for any use-case when a polling channel adapter’s features should be turned into a reactive, on demand solution for any existing MessageSource<?>
implementation.
Splitter and Aggregator
When an AbstractMessageSplitter
gets a Publisher
for its logic, the process goes naturally over the items in the Publisher
to map them into messages for sending to the outputChannel
.
If this channel is a ReactiveStreamsSubscribableChannel
, the Flux
wrapper for the Publisher
is subscribed on demand from that channel and this splitter behavior looks more like a flatMap
Reactor operator, when we map an incoming event into multi-value output Publisher
.
It makes most sense when the whole integration flow is built with a FluxMessageChannel
before and after the splitter, aligning Spring Integration configuration with a Reactive Streams requirements and its operators for event processing.
With a regular channel, a Publisher
is converted into an Iterable
for standard iterate-and-produce splitting logic.
A FluxAggregatorMessageHandler
is another sample of specific Reactive Streams logic implementation which could be treated as a "reactive operator"
in terms of Project Reactor.
It is based on the Flux.groupBy()
and Flux.window()
(or buffer()
) operators.
The incoming messages are sunk into a Flux.create()
initiated when a FluxAggregatorMessageHandler
is created, making it as a hot source.
This Flux
is subscribed to by a ReactiveStreamsSubscribableChannel
on demand, or directly in the FluxAggregatorMessageHandler.start()
when the outputChannel
is not reactive.
This MessageHandler
has its power, when the whole integration flow is built with a FluxMessageChannel
before and after this component, making the whole logic back-pressure ready.
See Stream and Flux Splitting and Flux Aggregator for more information.
Java DSL
An IntegrationFlow
in Java DSL can start from any Publisher
instance (see IntegrationFlow.from(Publisher<Message<T>>)
).
Also, with an IntegrationFlowBuilder.toReactivePublisher()
operator, the IntegrationFlow
can be turned into a reactive hot source.
A FluxMessageChannel
is used internally in both cases; it can subscribe to an inbound Publisher
according to its ReactiveStreamsSubscribableChannel
contract and it is a Publisher<Message<?>>
by itself for downstream subscribers.
With a dynamic IntegrationFlow
registration we can implement a powerful logic combining Reactive Streams with this integration flow bridging to/from Publisher
.
Starting with version 5.5.6, a toReactivePublisher(boolean autoStartOnSubscribe)
operator variant is present to control a lifecycle of the whole IntegrationFlow
behind the returned Publisher<Message<?>>
.
Typically, the subscription and consumption from the reactive publisher happens in the later runtime phase, not during reactive stream composition, or even ApplicationContext
startup.
To avoid boilerplate code for lifecycle management of the IntegrationFlow
at the Publisher<Message<?>>
subscription point and for better end-user experience, this new operator with the autoStartOnSubscribe
flag has been introduced.
It marks (if true
) the IntegrationFlow
and its components for autoStartup = false
, so an ApplicationContext
won’t initiate production and consumption of messages in the flow automatically.
Instead, the start()
for the IntegrationFlow
is initiated from the internal Flux.doOnSubscribe()
.
Independently of the autoStartOnSubscribe
value, the flow is stopped from a Flux.doOnCancel()
and Flux.doOnTerminate()
- it does not make sense to produce messages if there is nothing to consume them.
For the exact opposite use-case, when IntegrationFlow
should call a reactive stream and continue after completion, a fluxTransform()
operator is provided in the IntegrationFlowDefinition
.
The flow at this point is turned into a FluxMessageChannel
which is propagated into a provided fluxFunction
, performed in the Flux.transform()
operator.
A result of the function is wrapped into a Mono<Message<?>>
for flat-mapping into an output Flux
which is subscribed by another FluxMessageChannel
for downstream flow.
See Java DSL Chapter for more information.
ReactiveMessageHandler
Starting with version 5.3, the ReactiveMessageHandler
is supported natively in the framework.
This type of message handler is designed for reactive clients which return a reactive type for on-demand subscription for low-level operation execution and doesn’t provide any reply data to continue a reactive stream composition.
When a ReactiveMessageHandler
is used in the imperative integration flow, the handleMessage()
result in subscribed immediately after return, just because there is no reactive streams composition in such a flow to honor back-pressure.
In this case the framework wraps this ReactiveMessageHandler
into a ReactiveMessageHandlerAdapter
- a plain implementation of MessageHandler
.
However, when a ReactiveStreamsConsumer
is involved in the flow (e.g. when channel to consume is a FluxMessageChannel
), such a ReactiveMessageHandler
is composed to the whole reactive stream with a flatMap()
Reactor operator to honor back-pressure during consumption.
One of the out-of-the-box ReactiveMessageHandler
implementation is a ReactiveMongoDbStoringMessageHandler
for Outbound Channel Adapter.
See MongoDB Reactive Channel Adapters for more information.
Starting with version 6.1, the IntegrationFlowDefinition
exposes a convenient handleReactive(ReactiveMessageHandler)
terminal operator.
Any ReactiveMessageHandler
implementation (even just a plain lambda using the Mono
API) can be used for this operator.
The framework subscribes to the returned Mono<Void>
automatically.
Here is a simple sample of possible configuration for this operator:
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
An overloaded version of this operator accepts a Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>
to customize a consumer endpoint around the provided ReactiveMessageHandler
.
In addition, a ReactiveMessageHandlerSpec
-based variants are also provided.
In most cases they are used for protocol-specific channel adapter implementations.
See the next section following links to the target technologies with respective reactive channel adapters.
Reactive Channel Adapters
When the target protocol for integration provides a Reactive Streams solution, it becomes straightforward to implement channel adapters in Spring Integration.
An inbound, event-driven channel adapter implementation is about wrapping a request (if necessary) into a deferred Mono
or Flux
and perform a send (and produce reply, if any) only when a protocol component initiates a subscription into a Mono
returned from the listener method.
This way we have a reactive stream solution encapsulated exactly in this component.
Of course, downstream integration flow subscribed on the output channel should honor Reactive Streams specification and be performed in the on demand, back-pressure ready manner.
This is not always available by the nature (or with the current implementation) of MessageHandler
processor used in the integration flow.
This limitation can be handled using thread pools and queues or FluxMessageChannel
(see above) before and after integration endpoints when there is no reactive implementation.
An example for a reactive event-driven inbound channel adapter:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
Usage would look like:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
Or in a declarative way:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
Or even without a channel adapter, we can always use the Java DSL in the following way:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
A reactive outbound channel adapter implementation is about the initiation (or continuation) of a reactive stream to interaction with an external system according to the provided reactive API for the target protocol. An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of the reactive stream on top. A returned reactive type can be subscribed immediately if we are in a one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.
An example for a reactive outbound channel adapter:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
We will be able to use both of the channel adapters:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
Currently, Spring Integration provides channel adapter (or gateway) implementations for WebFlux, RSocket, MongoDb, R2DBC, ZeroMQ, GraphQL, Apache Cassandra.
The Redis Stream Channel Adapters are also reactive and uses ReactiveStreamOperations
from Spring Data.
More reactive channel adapters are coming, for example for Apache Kafka in Kafka based on the ReactiveKafkaProducerTemplate
and ReactiveKafkaConsumerTemplate
from Spring for Apache Kafka etc.
For many other non-reactive channel adapters thread pools are recommended to avoid blocking during reactive stream processing.
Reactive to Imperative Context Propagation
When the Context Propagation library is on the classpath, the Project Reactor can take ThreadLocal
values (e.g. Micrometer Observation or SecurityContextHolder
) and store them into a Subscriber
context.
The opposite operation is also possible, when we need to populate a logging MDC for tracing or let services we call from the reactive stream to restore an observation from the scope.
See more information in Project Reactor documentation about its special operators for context propagation.
The storing and restoring context works smoothly if our whole solution is a single reactive stream composition since a Subscriber
context is visible from downstream up to the beginning of the composition(Flux
or Mono
).
But, if the application switches between different Flux
instances or into imperative processing and back, then the context tied to the Subscriber
might not be available.
For such a use case, Spring Integration provides an additional capability (starting with version 6.0.5
) to store a Reactor ContextView
into the IntegrationMessageHeaderAccessor.REACTOR_CONTEXT
message header produced from the reactive stream, e.g. when we perform direct send()
operation.
This header is used then in the FluxMessageChannel.subscribeTo()
to restore a Reactor context for the Message
that this channel is going to emit.
Currently, this header is populated from the WebFluxInboundEndpoint
and RSocketInboundGateway
components, but can be used in any solution where reactive to imperative integration is performed.
The logic to populate this header is like this:
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
Note, that we still need to use a handle()
operator to make Reactor restore ThreadLocal
values from the context.
Even if it is sent as a header, the framework cannot make an assumption if it is going to be to restore onto ThreadLocal
values downstream.
To restore the context from a Message
on the other Flux
or Mono
composition, this logic can be performed:
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));