Kotest Kafka (EmbeddedKafkaListener)拡張

KotestのEmbeddedKafkaListenerは、Apache Kafkaを組み込み形式でテスト環境から使用できるようにする拡張機能である。

組み込みKafka拡張

Kotestは、組み込みKafkaインスタンスを実行する拡張機能を提供する。これは、Kafka Dockerイメージの使用が問題になる状況で便利に使える。

一般に、Apache Kafkaを使ってメッセージキューベースのシステムを開発またはテストする場合、実際のKafkaサーバーをインストールして設定する過程は手間がかかり時間もかかる。EmbeddedKafkaListenerは、テスト環境でApache Kafkaを組み込み形式で使用できるようにし、この問題を解決する。

依存関係の追加

組み込みKafka拡張機能を使用するには、テストコンパイルパスにio.kotest.extensions:kotest-extensions-embedded-kafkaモジュールを依存関係として追加する必要がある。最新バージョンはMaven Centralで確認できる。

testImplementation("io.kotest.extensions:kotest-extensions-embedded-kafka:<version>")

Configuration

テストクラスにembeddedKafkaListenerリスナーを登録する。

import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener

class EmbeddedKafkaListenerTest : FunSpec({
    listener(embeddedKafkaListener)
})

または:

import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener

class EmbeddedKafkaListenerTest : FunSpec() {
    init {
        listener(embeddedKafkaListener)
    }
}

そして、ブローカーはSpecが生成されると開始され、Specが完了すると停止される。

EmbeddedKafkaListenerを使用すると、テスト実行中に組み込みKafkaサーバーが開始および停止される。これにより、テスト実行中に実際のKafkaサーバーへ依存せず、Kafkaベースのアプリケーションをテストできる。

NOTE: 基本の組み込みKafkaライブラリはstateにグローバルオブジェクトを使用する。複数のKafkaインスタンスを同時に開始してはならない。

Consumer / Producer

コンシューマとプロデューサを作成するために、リスナーの便利なメソッドを使用できる。

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds

class EmbeddedKafkaListenerTest : FunSpec({

    listener(embeddedKafkaListener)

    test("send / receive") {

        val producer = embeddedKafkaListener.stringStringProducer()
        producer.send(ProducerRecord("foo", "a"))
        producer.close()

        val consumer = embeddedKafkaListener.stringStringConsumer("foo")
        eventually(10.seconds) {
            consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
        }
        consumer.close()
    }
})

stringStringProducerstringStringConsumerメソッドは、キーと値に文字列を受け付けるプロデューサ/コンシューマを返す。バイトペアにも似たメソッドがある。

または、Kafkaインスタンスが配置されたホスト/ポートへアクセスし、直接クライアントを作成することもできる。

class EmbeddedKafkaListenerTest : FunSpec({

  listener(embeddedKafkaListener)
   
  val props = Properties().apply {
      put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "${embeddedKafkaListener.host}:${embeddedKafkaListener.port}")
  }
   
  val producer = KafkaProducer<String, String>(props)
   
})

カスタムポートの指定

ポートを指定するリスナーの新しいインスタンスを作成し、デフォルトインスタンスの代わりにそのインスタンスを使用できる。

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.EmbeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds

class EmbeddedKafkaCustomPortTest : FunSpec({

    val listener = EmbeddedKafkaListener(5678)
    listener(listener)

    test("send / receive") {

        val producer = listener.stringStringProducer()
        producer.send(ProducerRecord("foo", "a"))
        producer.close()

        val consumer = listener.stringStringConsumer("foo")
        eventually(10.seconds) {
            consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
        }
        consumer.close()
    }
})

さらにZooKeeperポートも指定できる。

val listener = EmbeddedKafkaListener(kafkaPort = 6005, zookeeperPort = 9005)

参照