Building Simple Apache Kafka Producer/Consumer Clients in Java
This article uses the Kafka server created earlier and builds simple Kafka Producer and Consumer clients in Java.
Creating a Kafka Producer Project
Create a Producer project for Kafka in your IDE.
Kafka Producer Build Script
Configure the build script as follows. build.gradle
plugins {
id 'java'
}
group 'com.devkuma'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
implementation 'org.slf4j:slf4j-simple:1.7.36' // (2)
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}
test {
useJUnitPlatform()
}
(1) Dependency setting for using the Kafka client library. This example uses client version 2.8.1.
(2) Adds an SLF4J implementation library to check Kafka client logs. The SLF4J API is already included in the Kafka library.
Kafka Producer Project Code
Now write the actual Producer code. src/main/java/com/devkuma/kafka/SimpleKafkaProducer.java
package com.devkuma.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class SimpleKafkaProducer {
private final static String TOPIC_NAME = "devkuma-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(properties);
while (true) {
Scanner sc = new Scanner(System.in);
System.out.print("> ");
String message = sc.nextLine();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});
} finally {
producer.flush();
}
if (message.equals("/quit")) {
producer.close();
break;
}
}
}
}
The code initializes a Kafka Producer instance by setting option values with the Properties class. The related configuration keys are provided by the ProducerConfig class, so required options can be set through it.
The values configured in the code are as follows.
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG: Sets the Kafka cluster IP list.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG: Specifies the class that serializes record message keys.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG: Specifies the class that serializes record message values.
All Kafka messages are transmitted in serialized form, so you must specify the serializer to use. This example uses the string serializer class StringSerializer. Other formats such as JSON and Apache Avro can also be used.
It is obvious but important: if the Producer and Consumer serializers are not compatible, message serialization or deserialization may fail.
Creating a Kafka Consumer Project
Create a Consumer project for Kafka in your IDE.
Kafka Consumer Build Script
Configure the build script the same way as the Producer. build.gradle
plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
implementation 'org.slf4j:slf4j-simple:1.7.36' // (2)
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}
test {
useJUnitPlatform()
}
(1) Dependency setting for using the Kafka client library. This example uses client version 2.8.1.
(2) Adds an SLF4J implementation library to check Kafka client logs. The SLF4J API is already included in the Kafka library.
Kafka Consumer Project Code
Now write the actual Consumer code. src/main/java/com/devkuma/kafka/SimpleKafkaConsumer.java
package com.devkuma.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleKafkaConsumer {
private final static String TOPIC_NAME = "devkuma-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
String message = null;
try {
do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));
for (ConsumerRecord<String, String> record : records) {
message = record.value();
System.out.println(message);
}
} while (!message.equals("/quit"));
} finally {
consumer.close();
}
}
}
As with the Producer, the Consumer initializes a Kafka Consumer instance by setting option values with the Properties class. The related configuration keys are also provided through configuration classes, so required options can be set through them.
For Producer options, refer to the Producer section.
Option Descriptions
The main options for Producers and Consumers are as follows.
| ProducerConfig client option | Option value | Description |
|---|---|---|
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG |
bootstrap.server | Sets the Kafka cluster IP list. |
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG |
key.serializer | Specifies the class that serializes record message keys. |
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG |
value.serializer | Specifies the class that serializes record message values. |
Closing
The example code above is available on GitHub.