Building Simple Apache Kafka Producer/Consumer Clients in Java

This article uses the Kafka server created earlier and builds simple Kafka Producer and Consumer clients in Java.

Creating a Kafka Producer Project

Create a Producer project for Kafka in your IDE.

Kafka Producer Build Script

Configure the build script as follows. build.gradle

plugins {
    id 'java'
}

group 'com.devkuma'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
    implementation 'org.slf4j:slf4j-simple:1.7.36'        // (2)

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
    useJUnitPlatform()
}

(1) Dependency setting for using the Kafka client library. This example uses client version 2.8.1.
(2) Adds an SLF4J implementation library to check Kafka client logs. The SLF4J API is already included in the Kafka library.

Kafka Producer Project Code

Now write the actual Producer code. src/main/java/com/devkuma/kafka/SimpleKafkaProducer.java

package com.devkuma.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Scanner;

public class SimpleKafkaProducer {

    private final static String TOPIC_NAME = "devkuma-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<>(properties);

        while (true) {
            Scanner sc = new Scanner(System.in);
            System.out.print("> ");
            String message = sc.nextLine();

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            try {
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        exception.printStackTrace();
                    }
                });
            } finally {
                producer.flush();
            }

            if (message.equals("/quit")) {
                producer.close();
                break;
            }
        }
    }
}

The code initializes a Kafka Producer instance by setting option values with the Properties class. The related configuration keys are provided by the ProducerConfig class, so required options can be set through it.

The values configured in the code are as follows.

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG: Sets the Kafka cluster IP list.
  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG: Specifies the class that serializes record message keys.
  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG: Specifies the class that serializes record message values.

All Kafka messages are transmitted in serialized form, so you must specify the serializer to use. This example uses the string serializer class StringSerializer. Other formats such as JSON and Apache Avro can also be used.

It is obvious but important: if the Producer and Consumer serializers are not compatible, message serialization or deserialization may fail.

Creating a Kafka Consumer Project

Create a Consumer project for Kafka in your IDE.

Kafka Consumer Build Script

Configure the build script the same way as the Producer. build.gradle

plugins {
    id 'java'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
    implementation 'org.slf4j:slf4j-simple:1.7.36'        // (2) 

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
    useJUnitPlatform()
}

(1) Dependency setting for using the Kafka client library. This example uses client version 2.8.1.
(2) Adds an SLF4J implementation library to check Kafka client logs. The SLF4J API is already included in the Kafka library.

Kafka Consumer Project Code

Now write the actual Consumer code. src/main/java/com/devkuma/kafka/SimpleKafkaConsumer.java

package com.devkuma.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {
    private final static String TOPIC_NAME = "devkuma-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        String message = null;
        try {
            do {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));

                for (ConsumerRecord<String, String> record : records) {
                    message = record.value();
                    System.out.println(message);
                }
            } while (!message.equals("/quit"));
        } finally {
            consumer.close();
        }
    }
}

As with the Producer, the Consumer initializes a Kafka Consumer instance by setting option values with the Properties class. The related configuration keys are also provided through configuration classes, so required options can be set through them.

For Producer options, refer to the Producer section.

Option Descriptions

The main options for Producers and Consumers are as follows.

ProducerConfig client option Option value Description
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG bootstrap.server Sets the Kafka cluster IP list.
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG key.serializer Specifies the class that serializes record message keys.
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG value.serializer Specifies the class that serializes record message values.

Closing

The example code above is available on GitHub.