Javaで簡単なApache Kafka Producer/Consumerクライアントを作成する

ここでは前に作成したKafkaサーバーを活用し、Javaで簡単なKafka ProducerとConsumerクライアントを作成してみる。

Kafka Producerプロジェクトの作成

IDEを使ってKafkaのProducerプロジェクトを作成する。

Kafka Producerのビルドスクリプト

ビルドスクリプトは次のように設定する。 build.gradle

plugins {
    id 'java'
}

group 'com.devkuma'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
    implementation 'org.slf4j:slf4j-simple:1.7.36'        // (2)

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
    useJUnitPlatform()
}

(1) Kafkaクライアントライブラリを使用するための依存関係設定。ここでは2.8.1バージョンのクライアントを設定している。
(2) Kafkaクライアントのログを確認するため、SLF4J実装ライブラリを追加している。SLF4JのAPIはすでにKafkaライブラリに含まれている。

Kafka Producerプロジェクトのコード

次に実際のProducerコードを書いてみる。 src/main/java/com/devkuma/kafka/SimpleKafkaProducer.java

package com.devkuma.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Scanner;

public class SimpleKafkaProducer {

    private final static String TOPIC_NAME = "devkuma-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<>(properties);

        while (true) {
            Scanner sc = new Scanner(System.in);
            System.out.print("> ");
            String message = sc.nextLine();

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            try {
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        exception.printStackTrace();
                    }
                });
            } finally {
                producer.flush();
            }

            if (message.equals("/quit")) {
                producer.close();
                break;
            }
        }
    }
}

Propertiesクラスを使ってオプション値を設定し、Kafka Producerインスタンスを初期化している。関連する設定キーはProducerConfigクラスで提供されるため、必要なオプション値はこれを通じて設定できる。

コードで簡単に設定した値は次のとおりである。

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG:KafkaクラスターのIP一覧を設定する。
  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG:レコードのメッセージキーをシリアライズするクラスを指定する。
  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG:レコードのメッセージ値をシリアライズするクラスを指定する。

Kafkaのすべてのメッセージはシリアライズされた状態で送信されるため、使用するシリアライザーを指定する必要がある。ここでは文字列シリアライザークラスであるStringSerializerを使用した。その他にJSONやApache Avroなども使用できる。

当然だが、ProducerとConsumerのシリアライザーに互換性がなければ、メッセージのシリアライズまたはデシリアライズができない可能性があるので注意が必要である。

Kafka Consumerプロジェクトの作成

IDEを使ってKafkaのConsumerプロジェクトを作成する。

Kafka Consumerのビルドスクリプト

ビルドスクリプトはProducerと同じように次のように設定する。 build.gradle

plugins {
    id 'java'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
    implementation 'org.slf4j:slf4j-simple:1.7.36'        // (2) 

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
    useJUnitPlatform()
}

(1) Kafkaクライアントライブラリを使用するための依存関係設定。ここでは2.8.1バージョンのクライアントを設定している。
(2) Kafkaクライアントのログを確認するため、SLF4J実装ライブラリを追加している。SLF4JのAPIはすでにKafkaライブラリに含まれている。

Kafka Consumerプロジェクトのコード

次に実際のConsumerコードを書いてみる。 src/main/java/com/devkuma/kafka/SimpleKafkaConsumer.java

package com.devkuma.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {
    private final static String TOPIC_NAME = "devkuma-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        String message = null;
        try {
            do {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));

                for (ConsumerRecord<String, String> record : records) {
                    message = record.value();
                    System.out.println(message);
                }
            } while (!message.equals("/quit"));
        } finally {
            consumer.close();
        }
    }
}

ConsumerもProducerと同じように、Propertiesクラスを使ってオプション値を設定し、Kafka Consumerインスタンスを初期化すればよい。関連する設定キーも設定クラスから提供されるため、必要なオプション値はそれを通じて設定できる。

ProducerConfigのオプションについてはProducerの説明を参照する。

オプション説明

Producer、Consumerに関する主要なオプションは次のとおりである。

ProducerConfigクライアントオプション オプション値 説明
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG bootstrap.server KafkaクラスターのIP一覧を設定する。
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG key.serializer レコードのメッセージキーをシリアライズするクラスを指定する。
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG value.serializer レコードのメッセージ値をシリアライズするクラスを指定する。

まとめ

上記のサンプルコードはGitHubで確認できる。