Kotest Kafka (EmbeddedKafkaListener) Extension

Kotest’s EmbeddedKafkaListener is an extension that lets you use Apache Kafka in embedded form in a test environment.

Embedded Kafka Extension

Kotest provides an extension that runs an embedded Kafka instance. This can be useful in situations where using a Kafka Docker image is problematic.

In general, when developing or testing message-queue-based systems with Apache Kafka, installing and configuring a real Kafka server can be cumbersome and time-consuming. EmbeddedKafkaListener solves this problem by letting you use Apache Kafka in embedded form in the test environment.

Adding the Dependency

To use the embedded Kafka extension, add the io.kotest.extensions:kotest-extensions-embedded-kafka module to the test compile path. You can check the latest version on Maven Central.

testImplementation("io.kotest.extensions:kotest-extensions-embedded-kafka:<version>")

Configuration

Register the embeddedKafkaListener listener on the test class:

import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener

class EmbeddedKafkaListenerTest : FunSpec({
    listener(embeddedKafkaListener)
})

Or:

import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener

class EmbeddedKafkaListenerTest : FunSpec() {
    init {
        listener(embeddedKafkaListener)
    }
}

The broker starts when the spec is created and stops when the spec is complete.

With EmbeddedKafkaListener, an embedded Kafka server is started and stopped while tests run. This lets you test Kafka-based applications during test execution without depending on a real Kafka server.

NOTE: The underlying embedded Kafka library uses a global object for state. Do not start multiple Kafka instances at the same time.

Consumer / Producer

You can use convenient methods from the listener to create consumers and producers:

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds

class EmbeddedKafkaListenerTest : FunSpec({

    listener(embeddedKafkaListener)

    test("send / receive") {

        val producer = embeddedKafkaListener.stringStringProducer()
        producer.send(ProducerRecord("foo", "a"))
        producer.close()

        val consumer = embeddedKafkaListener.stringStringConsumer("foo")
        eventually(10.seconds) {
            consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
        }
        consumer.close()
    }
})

The stringStringProducer and stringStringConsumer methods return producers and consumers that accept strings for keys and values. There are similar methods for byte pairs.

Alternatively, you can create clients directly by accessing the host and port where the Kafka instance is deployed:

class EmbeddedKafkaListenerTest : FunSpec({

  listener(embeddedKafkaListener)
   
  val props = Properties().apply {
      put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "${embeddedKafkaListener.host}:${embeddedKafkaListener.port}")
  }
   
  val producer = KafkaProducer<String, String>(props)
   
})

Specifying a Custom Port

You can create a new instance of the listener that specifies a port and use that instance instead of the default instance.

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.EmbeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds

class EmbeddedKafkaCustomPortTest : FunSpec({

    val listener = EmbeddedKafkaListener(5678)
    listener(listener)

    test("send / receive") {

        val producer = listener.stringStringProducer()
        producer.send(ProducerRecord("foo", "a"))
        producer.close()

        val consumer = listener.stringStringConsumer("foo")
        eventually(10.seconds) {
            consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
        }
        consumer.close()
    }
})

You can also specify an additional ZooKeeper port.

val listener = EmbeddedKafkaListener(kafkaPort = 6005, zookeeperPort = 9005)

References