Class EmbeddedKafkaBroker

java.lang.Object
org.springframework.kafka.test.EmbeddedKafkaBroker
All Implemented Interfaces:
DisposableBean, InitializingBean

public class EmbeddedKafkaBroker extends Object implements InitializingBean, DisposableBean
An embedded Kafka Broker(s) and Zookeeper manager. This class is intended to be used in the unit tests.
Since:
2.2
Author:
Marius Bogoevici, Artem Bilan, Gary Russell, Kamill Sokol, Elliot Kennedy, Nakul Mishra, Pawel Lozinski, Adrian Chlebosz
  • Field Details

  • Constructor Details

    • EmbeddedKafkaBroker

      public EmbeddedKafkaBroker(int count)
    • EmbeddedKafkaBroker

      public EmbeddedKafkaBroker(int count, boolean controlledShutdown, String... topics)
      Create embedded Kafka brokers.
      Parameters:
      count - the number of brokers.
      controlledShutdown - passed into TestUtils.createBrokerConfig.
      topics - the topics to create (2 partitions per).
    • EmbeddedKafkaBroker

      public EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions, String... topics)
      Create embedded Kafka brokers listening on random ports.
      Parameters:
      count - the number of brokers.
      controlledShutdown - passed into TestUtils.createBrokerConfig.
      partitions - partitions per topic.
      topics - the topics to create.
  • Method Details

    • brokerProperties

      public EmbeddedKafkaBroker brokerProperties(Map<String,String> properties)
      Specify the properties to configure Kafka Broker before start, e.g. auto.create.topics.enable, transaction.state.log.replication.factor etc.
      Parameters:
      properties - the properties to use for configuring Kafka Broker(s).
      Returns:
      this for chaining configuration.
      See Also:
      • KafkaConfig
    • brokerProperty

      public EmbeddedKafkaBroker brokerProperty(String property, Object value)
      Specify a broker property.
      Parameters:
      property - the property name.
      value - the value.
      Returns:
      the EmbeddedKafkaBroker.
    • kafkaPorts

      public EmbeddedKafkaBroker kafkaPorts(int... ports)
      Set explicit ports on which the kafka brokers will listen. Useful when running an embedded broker that you want to access from other processes.
      Parameters:
      ports - the ports.
      Returns:
      the EmbeddedKafkaBroker.
    • brokerListProperty

      public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty)
      Set the system property with this name to the list of broker addresses. Defaults to spring.kafka.bootstrap-servers for Spring Boot compatibility, since 3.0.10.
      Parameters:
      brokerListProperty - the brokerListProperty to set
      Returns:
      this broker.
      Since:
      2.3
    • zkPort

      public EmbeddedKafkaBroker zkPort(int port)
      Set an explicit port for the embedded Zookeeper.
      Parameters:
      port - the port.
      Returns:
      the EmbeddedKafkaBroker.
      Since:
      2.3
    • getZkPort

      public int getZkPort()
      Get the port that the embedded Zookeeper is running on or will run on.
      Returns:
      the port.
      Since:
      2.3
    • setZkPort

      public void setZkPort(int zkPort)
      Set the port to run the embedded Zookeeper on (default random).
      Parameters:
      zkPort - the port.
      Since:
      2.3
    • adminTimeout

      public EmbeddedKafkaBroker adminTimeout(int adminTimeout)
      Set the timeout in seconds for admin operations (e.g. topic creation, close).
      Parameters:
      adminTimeout - the timeout.
      Returns:
      the EmbeddedKafkaBroker
      Since:
      2.8.5
    • setAdminTimeout

      public void setAdminTimeout(int adminTimeout)
      Set the timeout in seconds for admin operations (e.g. topic creation, close). Default 10 seconds.
      Parameters:
      adminTimeout - the timeout.
      Since:
      2.2
    • zkConnectionTimeout

      public EmbeddedKafkaBroker zkConnectionTimeout(int zkConnectionTimeout)
      Set connection timeout for the client to the embedded Zookeeper.
      Parameters:
      zkConnectionTimeout - the connection timeout,
      Returns:
      the EmbeddedKafkaBroker.
      Since:
      2.4
    • zkSessionTimeout

      public EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout)
      Set session timeout for the client to the embedded Zookeeper.
      Parameters:
      zkSessionTimeout - the session timeout.
      Returns:
      the EmbeddedKafkaBroker.
      Since:
      2.4
    • afterPropertiesSet

      public void afterPropertiesSet()
      Specified by:
      afterPropertiesSet in interface InitializingBean
    • addTopics

      public void addTopics(String... topicsToAdd)
      Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.
      Parameters:
      topicsToAdd - the topics.
    • addTopics

      public void addTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
      Add topics to the existing broker(s). The broker(s) must be running.
      Parameters:
      topicsToAdd - the topics.
      Since:
      2.2
    • addTopicsWithResults

      public Map<String,Exception> addTopicsWithResults(String... topicsToAdd)
      Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.
      Parameters:
      topicsToAdd - the topics.
      Returns:
      the results; null values indicate success.
      Since:
      2.5.4
    • addTopicsWithResults

      public Map<String,Exception> addTopicsWithResults(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
      Add topics to the existing broker(s) and returning a map of results. The broker(s) must be running.
      Parameters:
      topicsToAdd - the topics.
      Returns:
      the results; null values indicate success.
      Since:
      2.5.4
    • doWithAdmin

      public void doWithAdmin(Consumer<org.apache.kafka.clients.admin.AdminClient> callback)
      Create an AdminClient; invoke the callback and reliably close the admin.
      Parameters:
      callback - the callback.
    • doWithAdminFunction

      public <T> T doWithAdminFunction(Function<org.apache.kafka.clients.admin.AdminClient,T> callback)
      Create an AdminClient; invoke the callback and reliably close the admin.
      Type Parameters:
      T - the function return type.
      Parameters:
      callback - the callback.
      Returns:
      a map of results.
      Since:
      2.5.4
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
    • getTopics

      public Set<String> getTopics()
    • getKafkaServers

      public List<kafka.server.KafkaServer> getKafkaServers()
    • getKafkaServer

      public kafka.server.KafkaServer getKafkaServer(int id)
    • getZookeeper

    • getZooKeeperClient

      public kafka.zookeeper.ZooKeeperClient getZooKeeperClient()
      Return the ZooKeeperClient.
      Returns:
      the client.
      Since:
      2.3.2
    • getZookeeperConnectionString

      public String getZookeeperConnectionString()
    • getBrokerAddress

      public BrokerAddress getBrokerAddress(int i)
    • getBrokerAddresses

      public BrokerAddress[] getBrokerAddresses()
    • getPartitionsPerTopic

      public int getPartitionsPerTopic()
    • bounce

      public void bounce(BrokerAddress brokerAddress)
    • restart

      public void restart(int index) throws Exception
      Throws:
      Exception
    • getBrokersAsString

      public String getBrokersAsString()
    • consumeFromAllEmbeddedTopics

      public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
      Subscribe a consumer to all the embedded topics.
      Parameters:
      consumer - the consumer.
    • consumeFromAllEmbeddedTopics

      public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd)
      Subscribe a consumer to all the embedded topics.
      Parameters:
      seekToEnd - true to seek to the end instead of the beginning.
      consumer - the consumer.
      Since:
      2.8.2
    • consumeFromAnEmbeddedTopic

      public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, String topic)
      Subscribe a consumer to one of the embedded topics.
      Parameters:
      consumer - the consumer.
      topic - the topic.
    • consumeFromAnEmbeddedTopic

      public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd, String topic)
      Subscribe a consumer to one of the embedded topics.
      Parameters:
      consumer - the consumer.
      seekToEnd - true to seek to the end instead of the beginning.
      topic - the topic.
      Since:
      2.8.2
    • consumeFromEmbeddedTopics

      public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, String... topicsToConsume)
      Subscribe a consumer to one or more of the embedded topics.
      Parameters:
      consumer - the consumer.
      topicsToConsume - the topics.
      Throws:
      IllegalStateException - if you attempt to consume from a topic that is not in the list of embedded topics (since 2.3.4).
    • consumeFromEmbeddedTopics

      public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd, String... topicsToConsume)
      Subscribe a consumer to one or more of the embedded topics.
      Parameters:
      consumer - the consumer.
      topicsToConsume - the topics.
      seekToEnd - true to seek to the end instead of the beginning.
      Throws:
      IllegalStateException - if you attempt to consume from a topic that is not in the list of embedded topics.
      Since:
      2.8.2