Writing and Configuring Kafka Consumers

Apache Kafka is a widely-used distributed streaming platform that allows developers to build real-time streaming applications. Kafka consumers are an essential component of Kafka's architecture, responsible for reading and processing data from Kafka topics. In this article, we will explore the process of writing and configuring Kafka consumers to effectively retrieve data from Kafka topics.

Creating a Kafka Consumer

To start consuming data from Kafka, you need to create an instance of the KafkaConsumer class provided by the Kafka client library. The KafkaConsumer class provides various constructors, allowing you to customize the consumer based on your requirements. Typically, you need to specify the bootstrap server address, group id, and key/value deserializers.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Here, we have defined the bootstrap servers as localhost:9092 and the group id as my-consumer-group. The key and value deserializers are set as StringDeserializer, assuming that the data in the Kafka topic is string-based.

Subscribing to Topics

After creating a consumer instance, you need to subscribe to one or more Kafka topics from which you want to consume data. Kafka provides two methods for subscribing - subscribe() and assign(). The subscribe() method allows you to subscribe to topics using pattern-based subscriptions or by explicitly specifying the topic names.

// Subscribe to a single topic
consumer.subscribe(Collections.singleton("my-topic"));

// Subscribe to multiple topics
consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3"));

// Subscribe by providing a pattern
Pattern pattern = Pattern.compile("topic-[0-9]+");
consumer.subscribe(pattern);

You can choose the appropriate subscription method based on your use case.

Polling for Records

Once you have subscribed to the topics, you need to continuously poll for new records from Kafka. The poll() method retrieves records as a batch and returns them so that you can process them.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // Process each record
        System.out.println("Received record: " + record.value());
    }
}

In the above example, we poll for records every 100 milliseconds and process each record individually in the for loop.

Configuring Consumer Properties

Apart from the mandatory properties discussed earlier, Kafka consumers provide a wide range of configuration options to fine-tune their behavior. Some important properties include:

  • auto.offset.reset: Determines the behavior when no committed offset is available, such as "earliest" (start consuming from the beginning) or "latest" (start from the most recent).
  • fetch.min.bytes and fetch.max.wait.ms: Control the amount of data fetched by the consumer and the maximum time it waits before returning records.
  • enable.auto.commit: Specifies whether the consumer automatical

noob to master © copyleft