Scatter-Gather
Starting with version 4.1, Spring Integration provides an implementation of the scatter-gather enterprise integration pattern. It is a compound endpoint for which the goal is to send a message to the recipients and aggregate the results. As noted in Enterprise Integration Patterns, it is a component for scenarios such as “best quote”, where we need to request information from several suppliers and decide which one provides us with the best term for the requested item.
Previously, the pattern could be configured by using discrete components. This enhancement brings more convenient configuration.
The ScatterGatherHandler
is a request-reply endpoint that combines a PublishSubscribeChannel
(or a RecipientListRouter
) and an AggregatingMessageHandler
.
The request message is sent to the scatter
channel, and the ScatterGatherHandler
waits for the reply that the aggregator sends to the outputChannel
.
Functionality
The Scatter-Gather
pattern suggests two scenarios: “auction” and “distribution”.
In both cases, the aggregation
function is the same and provides all the options available for the AggregatingMessageHandler
.
(Actually, the ScatterGatherHandler
requires only an AggregatingMessageHandler
as a constructor argument.)
See Aggregator for more information.
Auction
The auction Scatter-Gather
variant uses “publish-subscribe” logic for the request message, where the “scatter” channel is a PublishSubscribeChannel
with apply-sequence="true"
.
However, this channel can be any MessageChannel
implementation (as is the case with the request-channel
in the ContentEnricher
— see Content Enricher).
However, in this case, you should create your own custom correlationStrategy
for the aggregation
function.
Distribution
The distribution Scatter-Gather
variant is based on the RecipientListRouter
(see RecipientListRouter
) with all available options for the RecipientListRouter
.
This is the second ScatterGatherHandler
constructor argument.
If you want to rely on only the default correlationStrategy
for the recipient-list-router
and the aggregator
, you should specify apply-sequence="true"
.
Otherwise, you should supply a custom correlationStrategy
for the aggregator
.
Unlike the PublishSubscribeChannel
variant (the auction variant), having a recipient-list-router
selector
option lets filter target suppliers based on the message.
With apply-sequence="true"
, the default sequenceSize
is supplied, and the aggregator
can release the group correctly.
The distribution option is mutually exclusive with the auction option.
The applySequence=true is required only for plain Java configuration based on the ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) constructor configuration since the framework cannot mutate externally provided components.
For convenience, the XML and Java DSL for Scatter-Gather sets applySequence to true starting with version 6.0.
|
For both the auction and the distribution variants, the request (scatter) message is enriched with the gatherResultChannel
header to wait for a reply message from the aggregator
.
By default, all suppliers should send their result to the replyChannel
header (usually by omitting the output-channel
from the ultimate endpoint).
However, the gatherChannel
option is also provided, letting suppliers send their reply to that channel for the aggregation.
Configuring a Scatter-Gather Endpoint
The following example shows Java configuration for the bean definition for Scatter-Gather
:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
In the preceding example, we configure the RecipientListRouter
distributor
bean with applySequence="true"
and the list of recipient channels.
The next bean is for an AggregatingMessageHandler
.
Finally, we inject both those beans into the ScatterGatherHandler
bean definition and mark it as a @ServiceActivator
to wire the scatter-gather component into the integration flow.
The following example shows how to configure the <scatter-gather>
endpoint by using the XML namespace:
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
1 | The id of the endpoint.
The ScatterGatherHandler bean is registered with an alias of id + '.handler' .
The RecipientListRouter bean is registered with an alias of id + '.scatterer' .
The AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer' .
Optional.
(The BeanFactory generates a default id value.) |
2 | Lifecycle attribute signaling whether the endpoint should be started during application context initialization.
In addition, the ScatterGatherHandler also implements Lifecycle and starts and stops gatherEndpoint , which is created internally if a gather-channel is provided.
Optional.
(The default is true .) |
3 | The channel on which to receive request messages to handle them in the ScatterGatherHandler .
Required. |
4 | The channel to which the ScatterGatherHandler sends the aggregation results.
Optional.
(Incoming messages can specify a reply channel themselves in the replyChannel message header). |
5 | The channel to which to send the scatter message for the auction scenario.
Optional.
Mutually exclusive with the <scatterer> sub-element. |
6 | The channel on which to receive replies from each supplier for the aggregation.
It is used as the replyChannel header in the scatter message.
Optional.
By default, the FixedSubscriberChannel is created. |
7 | The order of this component when more than one handler is subscribed to the same DirectChannel (use for load balancing purposes).
Optional. |
8 | Specifies the phase in which the endpoint should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is from highest to lowest.
By default, this value is Integer.MAX_VALUE , meaning that this container starts as late as possible and stops as soon as possible.
Optional. |
9 | The timeout interval to wait when sending a reply Message to the output-channel .
By default, the send() blocks for one second.
It applies only if the output channel has some 'sending' limitations — for example, a QueueChannel with a fixed 'capacity' that is full.
In this case, a MessageDeliveryException is thrown.
The send-timeout is ignored for AbstractSubscribableChannel implementations.
For group-timeout(-expression) , the MessageDeliveryException from the scheduled expire task leads this task to be rescheduled.
Optional. |
10 | Lets you specify how long the scatter-gather waits for the reply message before returning.
By default, it waits indefinitely.
'null' is returned if the reply times out.
Optional.
It defaults to -1 , meaning to wait indefinitely. |
11 | Specifies whether the scatter-gather must return a non-null value.
This value is true by default.
Consequently, a ReplyRequiredException is thrown when the underlying aggregator returns a null value after gather-timeout .
Note, if null is a possibility, the gather-timeout should be specified to avoid an indefinite wait. |
12 | The <recipient-list-router> options.
Optional.
Mutually exclusive with scatter-channel attribute. |
13 | The <aggregator> options.
Required. |
Error Handling
Since Scatter-Gather is a multi request-reply component, error handling has some extra complexity.
In some cases, it is better to just catch and ignore downstream exceptions if the ReleaseStrategy
allows the process to finish with fewer replies than requests.
In other cases something like a “compensation message” should be considered for returning from sub-flow, when an error happens.
Every async sub-flow should be configured with a errorChannel
header for the proper error message sending from the MessagePublishingErrorHandler
.
Otherwise, an error will be sent to the global errorChannel
with the common error handling logic.
See Error Handling for more information about async error processing.
Synchronous flows may use an ExpressionEvaluatingRequestHandlerAdvice
for ignoring the exception or returning a compensation message.
When an exception is thrown from one of the sub-flows to the ScatterGatherHandler
, it is just re-thrown to upstream.
This way all other sub-flows will work for nothing and their replies are going to be ignored in the ScatterGatherHandler
.
This might be an expected behavior sometimes, but in most cases it would be better to handle the error in the particular sub-flow without impacting all others and the expectations in the gatherer.
Starting with version 5.1.3, the ScatterGatherHandler
is supplied with the errorChannelName
option.
It is populated to the errorChannel
header of the scatter message and is used in the when async error happens or can be used in the regular synchronous sub-flow for directly sending an error message.
The sample configuration below demonstrates async error handling by returning a compensation message:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
To produce a proper reply, we have to copy headers (including replyChannel
and errorChannel
) from the failedMessage
of the MessagingException
that has been sent to the scatterGatherErrorChannel
by the MessagePublishingErrorHandler
.
This way the target exception is returned to the gatherer of the ScatterGatherHandler
for reply messages group completion.
Such an exception payload
can be filtered out in the MessageGroupProcessor
of the gatherer or processed other way downstream, after the scatter-gather endpoint.
Before sending scattering results to the gatherer, ScatterGatherHandler reinstates the request message headers, including reply and error channels if any.
This way errors from the AggregatingMessageHandler are going to be propagated to the caller, even if an async hand off is applied in scatter recipient subflows.
For successful operation, a gatherResultChannel , originalReplyChannel and originalErrorChannel headers must be transferred back to replies from scatter recipient subflows.
In this case a reasonable, finite gatherTimeout must be configured for the ScatterGatherHandler .
Otherwise, it is going to be blocked waiting for a reply from the gatherer forever, by default.
|