Javaで簡単なApache Kafka Producer/Consumerクライアントを作成する
ここでは前に作成したKafkaサーバーを活用し、Javaで簡単なKafka ProducerとConsumerクライアントを作成してみる。
Kafka Producerプロジェクトの作成
IDEを使ってKafkaのProducerプロジェクトを作成する。
Kafka Producerのビルドスクリプト
ビルドスクリプトは次のように設定する。 build.gradle
plugins {
id 'java'
}
group 'com.devkuma'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
implementation 'org.slf4j:slf4j-simple:1.7.36' // (2)
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}
test {
useJUnitPlatform()
}
(1) Kafkaクライアントライブラリを使用するための依存関係設定。ここでは2.8.1バージョンのクライアントを設定している。
(2) Kafkaクライアントのログを確認するため、SLF4J実装ライブラリを追加している。SLF4JのAPIはすでにKafkaライブラリに含まれている。
Kafka Producerプロジェクトのコード
次に実際のProducerコードを書いてみる。 src/main/java/com/devkuma/kafka/SimpleKafkaProducer.java
package com.devkuma.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class SimpleKafkaProducer {
private final static String TOPIC_NAME = "devkuma-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(properties);
while (true) {
Scanner sc = new Scanner(System.in);
System.out.print("> ");
String message = sc.nextLine();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});
} finally {
producer.flush();
}
if (message.equals("/quit")) {
producer.close();
break;
}
}
}
}
Propertiesクラスを使ってオプション値を設定し、Kafka Producerインスタンスを初期化している。関連する設定キーはProducerConfigクラスで提供されるため、必要なオプション値はこれを通じて設定できる。
コードで簡単に設定した値は次のとおりである。
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG:KafkaクラスターのIP一覧を設定する。ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG:レコードのメッセージキーをシリアライズするクラスを指定する。ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG:レコードのメッセージ値をシリアライズするクラスを指定する。
Kafkaのすべてのメッセージはシリアライズされた状態で送信されるため、使用するシリアライザーを指定する必要がある。ここでは文字列シリアライザークラスであるStringSerializerを使用した。その他にJSONやApache Avroなども使用できる。
当然だが、ProducerとConsumerのシリアライザーに互換性がなければ、メッセージのシリアライズまたはデシリアライズができない可能性があるので注意が必要である。
Kafka Consumerプロジェクトの作成
IDEを使ってKafkaのConsumerプロジェクトを作成する。
Kafka Consumerのビルドスクリプト
ビルドスクリプトはProducerと同じように次のように設定する。 build.gradle
plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
implementation 'org.slf4j:slf4j-simple:1.7.36' // (2)
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}
test {
useJUnitPlatform()
}
(1) Kafkaクライアントライブラリを使用するための依存関係設定。ここでは2.8.1バージョンのクライアントを設定している。
(2) Kafkaクライアントのログを確認するため、SLF4J実装ライブラリを追加している。SLF4JのAPIはすでにKafkaライブラリに含まれている。
Kafka Consumerプロジェクトのコード
次に実際のConsumerコードを書いてみる。 src/main/java/com/devkuma/kafka/SimpleKafkaConsumer.java
package com.devkuma.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleKafkaConsumer {
private final static String TOPIC_NAME = "devkuma-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
String message = null;
try {
do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));
for (ConsumerRecord<String, String> record : records) {
message = record.value();
System.out.println(message);
}
} while (!message.equals("/quit"));
} finally {
consumer.close();
}
}
}
ConsumerもProducerと同じように、Propertiesクラスを使ってオプション値を設定し、Kafka Consumerインスタンスを初期化すればよい。関連する設定キーも設定クラスから提供されるため、必要なオプション値はそれを通じて設定できる。
ProducerConfigのオプションについてはProducerの説明を参照する。
オプション説明
Producer、Consumerに関する主要なオプションは次のとおりである。
| ProducerConfigクライアントオプション | オプション値 | 説明 |
|---|---|---|
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG |
bootstrap.server | KafkaクラスターのIP一覧を設定する。 |
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG |
key.serializer | レコードのメッセージキーをシリアライズするクラスを指定する。 |
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG |
value.serializer | レコードのメッセージ値をシリアライズするクラスを指定する。 |
まとめ
上記のサンプルコードはGitHubで確認できる。