- Apache Kafka - Applications
- Apache Kafka - Tools
- Real Time Application(Twitter)
- Integration With Spark
- Integration With Storm
- Consumer Group Example
- Simple Producer Example
- Apache Kafka - Basic Operations
- Apache Kafka - Installation Steps
- Apache Kafka - Work Flow
- Apache Kafka - Cluster Architecture
- Apache Kafka - Fundamentals
- Apache Kafka - Introduction
- Apache Kafka - Home
Apache Kafka Useful Resources
Selected Reading
- Who is Who
- Computer Glossary
- HR Interview Questions
- Effective Resume Writing
- Questions and Answers
- UPSC IAS Exams Notes
Apache Kafka - Consumer Group Example
Consumer group is a multi-threaded or multi-machine consumption from Kafka topics.
Consumer Group
Consumers can join a group by using the samegroup.id.
The maximum parallepsm of a group is that the number of consumers in the group ← no of partitions.
Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.
Kafka guarantees that a message is only ever read by a single consumer in the group.
Consumers can see the message in the order they were stored in the log.
Re-balancing of a Consumer
Adding more processes/threads will cause Kafka to re-balance. If any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. During this re-balance, Kafka will assign available partitions to the available threads, possibly moving a partition to another process.
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.cpents.consumer.KafkaConsumer; import org.apache.kafka.cpents.consumer.ConsumerRecords; import org.apache.kafka.cpents.consumer.ConsumerRecord; pubpc class ConsumerGroup { pubpc static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer <topic> <groupname>"); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deseriapzer", "org.apache.kafka.common.seriapzation.ByteArraySeriapzer"); props.put("value.deseriapzer", "org.apache.kafka.common.seriapza-tion.StringDeseriapzer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s ", record.offset(), record.key(), record.value()); } } }
Compilation
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/pbs/*" ConsumerGroup.java
Execution
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/pbs/*":. ConsumerGroup <topic-name> my-group >>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/pbs/*":. ConsumerGroup <topic-name> my-group
Here we have created a sample group name as my-group
with two consumers. Similarly, you can create your group and number of consumers in the group.
Input
Open producer CLI and send some messages pke −
Test consumer group 01 Test consumer group 02
Output of the First Process
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 01
Output of the Second Process
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 02
Now hopefully you would have understood SimpleConsumer and ConsumeGroup by using the Java cpent demo. Now you have an idea about how to send and receive messages using a Java cpent. Let us continue Kafka integration with big data technologies in the next chapter.
Advertisements