Class RetryTopicConfigurationSupport
java.lang.Object
org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport
- All Implemented Interfaces:
Aware
,SmartInitializingSingleton
,ApplicationContextAware
public class RetryTopicConfigurationSupport
extends Object
implements ApplicationContextAware, SmartInitializingSingleton
This is the main class providing the configuration behind the non-blocking,
topic-based delayed retries feature. It is typically imported by adding
@EnableKafkaRetryTopic
to an application
@Configuration
class. An alternative more advanced option
is to extend directly from this class and override methods as necessary, remembering
to add @Configuration
to the subclass and @Bean
to overridden @Bean
methods. For more details see the javadoc of
@EnableRetryTopic
.- Since:
- 2.9
- Author:
- Tomaz Fernandes, Gary Russell
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic class
Configure blocking retries to be used along non-blocking.static class
Configure customizers for components instantiated by the retry topics feature. -
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
protected void
Override this method to configure blocking retries parameters such as exceptions to be retried and theBackOff
to be used.protected void
configureCustomizers
(RetryTopicConfigurationSupport.CustomizersConfigurer customizersConfigurer) Override this method to configure customizers for components created by non-blocking retries' configuration, such asMessageListenerContainer
,DeadLetterPublishingRecoverer
andDefaultErrorHandler
.protected Consumer<DeadLetterPublishingRecovererFactory>
Override this method to further configure theDeadLetterPublishingRecovererFactory
.protected Consumer<DestinationTopicResolver>
Override this method to configure theDestinationTopicResolver
.protected Consumer<ListenerContainerFactoryConfigurer>
Override this method to further configure theListenerContainerFactoryConfigurer
.protected Consumer<RetryTopicConfigurer>
Override this method if you need to configure theRetryTopicConfigurer
.protected RetryTopicComponentFactory
Override this method to provide a subclass ofRetryTopicComponentFactory
with different component implementations or subclasses.destinationTopicResolver
(ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider) Return a globalDestinationTopicResolver
for resolving theDestinationTopic
to which a givenConsumerRecord
should be sent for retry.kafkaConsumerBackoffManager
(ApplicationContext applicationContext, ListenerContainerRegistry registry, ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider, RetryTopicSchedulerWrapper wrapper, TaskScheduler taskScheduler) Create theKafkaConsumerBackoffManager
instance that will be used to back off partitions.protected void
manageNonBlockingFatalExceptions
(List<Class<? extends Throwable>> nonBlockingRetriesExceptions) Override this method to manage non-blocking retries fatal exceptions.retryTopicConfigurer
(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, DestinationTopicResolver destinationTopicResolver, ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider, BeanFactory beanFactory) Return a globalRetryTopicConfigurer
for configuring retry topics forKafkaListenerEndpoint
instances with a correspondingRetryTopicConfiguration
.void
setApplicationContext
(ApplicationContext applicationContext)
-
Constructor Details
-
RetryTopicConfigurationSupport
public RetryTopicConfigurationSupport()
-
-
Method Details
-
setApplicationContext
- Specified by:
setApplicationContext
in interfaceApplicationContextAware
- Throws:
BeansException
-
afterSingletonsInstantiated
public void afterSingletonsInstantiated()- Specified by:
afterSingletonsInstantiated
in interfaceSmartInitializingSingleton
-
retryTopicConfigurer
@Bean(name="org.springframework.kafka.retrytopic.internalRetryTopicConfigurer") public RetryTopicConfigurer retryTopicConfigurer(@Qualifier("org.springframework.kafka.config.internalKafkaConsumerBackOffManager") KafkaConsumerBackoffManager kafkaConsumerBackoffManager, @Qualifier("org.springframework.kafka.retrytopic.internalDestinationTopicResolver") DestinationTopicResolver destinationTopicResolver, ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider, BeanFactory beanFactory) Return a globalRetryTopicConfigurer
for configuring retry topics forKafkaListenerEndpoint
instances with a correspondingRetryTopicConfiguration
. To configure it, consider overriding theconfigureRetryTopicConfigurer()
.- Parameters:
kafkaConsumerBackoffManager
- the globalKafkaConsumerBackoffManager
.destinationTopicResolver
- the globalDestinationTopicResolver
.componentFactoryProvider
- the component factory provider.beanFactory
- theBeanFactory
.- Returns:
- the instance.
- See Also:
-
configureRetryTopicConfigurer
Override this method if you need to configure theRetryTopicConfigurer
.- Returns:
- a
RetryTopicConfigurer
consumer.
-
configureDeadLetterPublishingContainerFactory
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory()Override this method to further configure theDeadLetterPublishingRecovererFactory
.- Returns:
- a
DeadLetterPublishingRecovererFactory
consumer.
-
configureListenerContainerFactoryConfigurer
protected Consumer<ListenerContainerFactoryConfigurer> configureListenerContainerFactoryConfigurer()Override this method to further configure theListenerContainerFactoryConfigurer
.- Returns:
- a
ListenerContainerFactoryConfigurer
consumer.
-
configureBlockingRetries
protected void configureBlockingRetries(RetryTopicConfigurationSupport.BlockingRetriesConfigurer blockingRetries) Override this method to configure blocking retries parameters such as exceptions to be retried and theBackOff
to be used.- Parameters:
blockingRetries
- aRetryTopicConfigurationSupport.BlockingRetriesConfigurer
.
-
manageNonBlockingFatalExceptions
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingRetriesExceptions) Override this method to manage non-blocking retries fatal exceptions. Records which processing throws an exception present in this list will be forwarded directly to the DLT, if one is configured, or stop being processed otherwise.- Parameters:
nonBlockingRetriesExceptions
- aList
of fatal exceptions containing the framework defaults.
-
configureCustomizers
protected void configureCustomizers(RetryTopicConfigurationSupport.CustomizersConfigurer customizersConfigurer) Override this method to configure customizers for components created by non-blocking retries' configuration, such asMessageListenerContainer
,DeadLetterPublishingRecoverer
andDefaultErrorHandler
.- Parameters:
customizersConfigurer
- aRetryTopicConfigurationSupport.CustomizersConfigurer
.
-
destinationTopicResolver
@Bean(name="org.springframework.kafka.retrytopic.internalDestinationTopicResolver") public DestinationTopicResolver destinationTopicResolver(ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider) Return a globalDestinationTopicResolver
for resolving theDestinationTopic
to which a givenConsumerRecord
should be sent for retry. To configure it, consider overriding one of these other more fine-grained methods:manageNonBlockingFatalExceptions(java.util.List<java.lang.Class<? extends java.lang.Throwable>>)
to configure non-blocking retries.configureDestinationTopicResolver()
to further customize the component.createComponentFactory()
to provide a subclass instance.
- Parameters:
componentFactoryProvider
- the component factory provider.- Returns:
- the instance.
-
configureDestinationTopicResolver
Override this method to configure theDestinationTopicResolver
.- Returns:
- a
DestinationTopicResolver
consumer.
-
kafkaConsumerBackoffManager
@Bean(name="org.springframework.kafka.config.internalKafkaConsumerBackOffManager") public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext, @Qualifier("org.springframework.kafka.config.internalKafkaListenerEndpointRegistry") ListenerContainerRegistry registry, ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider, @Nullable RetryTopicSchedulerWrapper wrapper, @Nullable TaskScheduler taskScheduler) Create theKafkaConsumerBackoffManager
instance that will be used to back off partitions. To provide a custom implementation, either override this method, or override theRetryTopicComponentFactory.kafkaBackOffManagerFactory(org.springframework.kafka.listener.ListenerContainerRegistry, org.springframework.context.ApplicationContext)
method and return a differentKafkaBackOffManagerFactory
.- Parameters:
applicationContext
- the application context.registry
- theListenerContainerRegistry
to be used to fetch theMessageListenerContainer
at runtime to be backed off.componentFactoryProvider
- the component factory provider.wrapper
- aRetryTopicSchedulerWrapper
.taskScheduler
- aTaskScheduler
.- Returns:
- the instance.
-
createComponentFactory
Override this method to provide a subclass ofRetryTopicComponentFactory
with different component implementations or subclasses.- Returns:
- the instance.
-