Kotest Kafka (EmbeddedKafkaListener) Extension
Embedded Kafka Extension
Kotest provides an extension that runs an embedded Kafka instance. This can be useful in situations where using a Kafka Docker image is problematic.
In general, when developing or testing message-queue-based systems with Apache Kafka, installing and configuring a real Kafka server can be cumbersome and time-consuming. EmbeddedKafkaListener solves this problem by letting you use Apache Kafka in embedded form in the test environment.
Adding the Dependency
To use the embedded Kafka extension, add the io.kotest.extensions:kotest-extensions-embedded-kafka module to the test compile path. You can check the latest version on Maven Central.
testImplementation("io.kotest.extensions:kotest-extensions-embedded-kafka:<version>")
Configuration
Register the embeddedKafkaListener listener on the test class:
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener
class EmbeddedKafkaListenerTest : FunSpec({
listener(embeddedKafkaListener)
})
Or:
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener
class EmbeddedKafkaListenerTest : FunSpec() {
init {
listener(embeddedKafkaListener)
}
}
The broker starts when the spec is created and stops when the spec is complete.
With EmbeddedKafkaListener, an embedded Kafka server is started and stopped while tests run. This lets you test Kafka-based applications during test execution without depending on a real Kafka server.
NOTE: The underlying embedded Kafka library uses a global object for state. Do not start multiple Kafka instances at the same time.
Consumer / Producer
You can use convenient methods from the listener to create consumers and producers:
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds
class EmbeddedKafkaListenerTest : FunSpec({
listener(embeddedKafkaListener)
test("send / receive") {
val producer = embeddedKafkaListener.stringStringProducer()
producer.send(ProducerRecord("foo", "a"))
producer.close()
val consumer = embeddedKafkaListener.stringStringConsumer("foo")
eventually(10.seconds) {
consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
}
consumer.close()
}
})
The stringStringProducer and stringStringConsumer methods return producers and consumers that accept strings for keys and values. There are similar methods for byte pairs.
Alternatively, you can create clients directly by accessing the host and port where the Kafka instance is deployed:
class EmbeddedKafkaListenerTest : FunSpec({
listener(embeddedKafkaListener)
val props = Properties().apply {
put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "${embeddedKafkaListener.host}:${embeddedKafkaListener.port}")
}
val producer = KafkaProducer<String, String>(props)
})
Specifying a Custom Port
You can create a new instance of the listener that specifies a port and use that instance instead of the default instance.
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.EmbeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds
class EmbeddedKafkaCustomPortTest : FunSpec({
val listener = EmbeddedKafkaListener(5678)
listener(listener)
test("send / receive") {
val producer = listener.stringStringProducer()
producer.send(ProducerRecord("foo", "a"))
producer.close()
val consumer = listener.stringStringConsumer("foo")
eventually(10.seconds) {
consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
}
consumer.close()
}
})
You can also specify an additional ZooKeeper port.
val listener = EmbeddedKafkaListener(kafkaPort = 6005, zookeeperPort = 9005)