Class KafkaMessageSource<K,V>

Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, Lifecycle, MessageSource<Object>, Pausable, IntegrationPattern, NamedComponent, IntegrationInboundManagement, IntegrationManagement, ManageableLifecycle

public class KafkaMessageSource<K,V> extends AbstractMessageSource<Object> implements Pausable
Polled message source for Apache Kafka. Only one thread can poll for data (or acknowledge a message) at a time.

NOTE: If the application acknowledges messages out of order, the acks will be deferred until all messages prior to the offset are ack'd. If multiple records are retrieved and an earlier offset is requeued, records from the subsequent offsets will be redelivered - even if they were processed successfully. Applications should therefore implement idempotency.

Starting with version 3.1.2, this source implements Pausable which allows you to pause and resume the Consumer. While the consumer is paused, you must continue to call AbstractMessageSource.receive() within max.poll.interval.ms, to prevent a rebalance.

Since:
5.4
Author:
Gary Russell, Mark Norkin, Artem Bilan, Anshul Mehra, Christian Tzolov
  • Field Details

    • REMAINING_RECORDS

      public static final String REMAINING_RECORDS
      The number of records remaining from the previous poll.
      Since:
      3.2
      See Also:
    • newAssignment

      public volatile boolean newAssignment
  • Constructor Details

    • KafkaMessageSource

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)
      Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      See Also:
    • KafkaMessageSource

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)
      Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up to max.poll.records to be fetched on each poll. When false (default) max.poll.records is coerced to 1 if the consumer factory is a DefaultKafkaConsumerFactory or otherwise rejected with an IllegalArgumentException. IMPORTANT: When true, you must call AbstractMessageSource.receive() at a sufficient rate to consume the number of records received within max.poll.interval.ms. When false, you must call AbstractMessageSource.receive() within max.poll.interval.ms. pause() will not take effect until the records from the previous poll are consumed.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      allowMultiFetch - true to allow max.poll.records > 1.
    • KafkaMessageSource

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)
      Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      ackCallbackFactory - the ack callback factory.
      See Also:
    • KafkaMessageSource

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)
      Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up to max.poll.records to be fetched on each poll. When false (default) max.poll.records is coerced to 1 if the consumer factory is a DefaultKafkaConsumerFactory or otherwise rejected with an IllegalArgumentException. IMPORTANT: When true, you must call AbstractMessageSource.receive() at a sufficient rate to consume the number of records received within max.poll.interval.ms. When false, you must call AbstractMessageSource.receive() within max.poll.interval.ms. pause() will not take effect until the records from the previous poll are consumed.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      ackCallbackFactory - the ack callback factory.
      allowMultiFetch - true to allow max.poll.records > 1.
  • Method Details

    • getAssignedPartitions

      public Collection<org.apache.kafka.common.TopicPartition> getAssignedPartitions()
      Return the currently assigned partitions.
      Returns:
      the partitions.
    • onInit

      protected void onInit()
      Overrides:
      onInit in class AbstractExpressionEvaluator
    • getConsumerProperties

      public org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()
      Get a reference to the configured consumer properties; allows further customization of the properties before the source is started.
      Returns:
      the properties.
    • getGroupId

      protected String getGroupId()
    • getClientId

      protected String getClientId()
    • getPollTimeout

      protected long getPollTimeout()
    • getMessageConverter

      protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()
    • setMessageConverter

      public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      Set the message converter to replace the default MessagingMessageConverter.
      Parameters:
      messageConverter - the converter.
    • getPayloadType

      protected Class<?> getPayloadType()
    • setPayloadType

      public void setPayloadType(Class<?> payloadType)
      Set the payload type. Only applies if a type-aware message converter is provided.
      Parameters:
      payloadType - the type to convert to.
    • getRebalanceListener

      protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()
    • getComponentType

      public String getComponentType()
      Specified by:
      getComponentType in interface NamedComponent
    • isRawMessageHeader

      protected boolean isRawMessageHeader()
    • setRawMessageHeader

      public void setRawMessageHeader(boolean rawMessageHeader)
      Set to true to include the raw ConsumerRecord as headers with keys KafkaHeaders.RAW_DATA and IntegrationMessageHeaderAccessor.SOURCE_DATA. enabling callers to have access to the record to process errors.
      Parameters:
      rawMessageHeader - true to include the header.
    • getCommitTimeout

      protected Duration getCommitTimeout()
    • setCloseTimeout

      public void setCloseTimeout(Duration closeTimeout)
      Set the close timeout - default 30 seconds.
      Parameters:
      closeTimeout - the close timeout.
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
      Specified by:
      isRunning in interface ManageableLifecycle
    • start

      public void start()
      Specified by:
      start in interface Lifecycle
      Specified by:
      start in interface ManageableLifecycle
    • stop

      public void stop()
      Specified by:
      stop in interface Lifecycle
      Specified by:
      stop in interface ManageableLifecycle
    • pause

      public void pause()
      Description copied from interface: Pausable
      Pause the endpoint.
      Specified by:
      pause in interface Pausable
    • resume

      public void resume()
      Description copied from interface: Pausable
      Resume the endpoint if paused.
      Specified by:
      resume in interface Pausable
    • isPaused

      public boolean isPaused()
      Description copied from interface: Pausable
      Check if the endpoint is paused.
      Specified by:
      isPaused in interface Pausable
      Returns:
      true if paused.
    • doReceive

      protected Object doReceive()
      Description copied from class: AbstractMessageSource
      Subclasses must implement this method. Typically the returned value will be the payload of type T, but the returned value may also be a Message instance whose payload is of type T; also can be AbstractIntegrationMessageBuilder which is used for additional headers population.
      Specified by:
      doReceive in class AbstractMessageSource<Object>
      Returns:
      The value returned.
    • createConsumer

      protected void createConsumer()
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
      Specified by:
      destroy in interface IntegrationManagement
      Overrides:
      destroy in class AbstractMessageSource<Object>