Apache Kafka is a widely-used distributed streaming platform that allows developers to build real-time streaming applications. Kafka consumers are an essential component of Kafka's architecture, responsible for reading and processing data from Kafka topics. In this article, we will explore the process of writing and configuring Kafka consumers to effectively retrieve data from Kafka topics.
To start consuming data from Kafka, you need to create an instance of the KafkaConsumer class provided by the Kafka client library. The KafkaConsumer class provides various constructors, allowing you to customize the consumer based on your requirements. Typically, you need to specify the bootstrap server address, group id, and key/value deserializers.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Here, we have defined the bootstrap servers as localhost:9092
and the group id as my-consumer-group
. The key and value deserializers are set as StringDeserializer
, assuming that the data in the Kafka topic is string-based.
After creating a consumer instance, you need to subscribe to one or more Kafka topics from which you want to consume data. Kafka provides two methods for subscribing - subscribe()
and assign()
. The subscribe()
method allows you to subscribe to topics using pattern-based subscriptions or by explicitly specifying the topic names.
// Subscribe to a single topic
consumer.subscribe(Collections.singleton("my-topic"));
// Subscribe to multiple topics
consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3"));
// Subscribe by providing a pattern
Pattern pattern = Pattern.compile("topic-[0-9]+");
consumer.subscribe(pattern);
You can choose the appropriate subscription method based on your use case.
Once you have subscribed to the topics, you need to continuously poll for new records from Kafka. The poll()
method retrieves records as a batch and returns them so that you can process them.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process each record
System.out.println("Received record: " + record.value());
}
}
In the above example, we poll for records every 100 milliseconds and process each record individually in the for
loop.
Apart from the mandatory properties discussed earlier, Kafka consumers provide a wide range of configuration options to fine-tune their behavior. Some important properties include:
auto.offset.reset
: Determines the behavior when no committed offset is available, such as "earliest" (start consuming from the beginning) or "latest" (start from the most recent).fetch.min.bytes
and fetch.max.wait.ms
: Control the amount of data fetched by the consumer and the maximum time it waits before returning records.enable.auto.commit
: Specifies whether the consumer automaticalnoob to master © copyleft