English 中文(简体)
Consumer Group Example
  • 时间:2024-12-22

Apache Kafka - Consumer Group Example


Previous Page Next Page  

Consumer group is a multi-threaded or multi-machine consumption from Kafka topics.

Consumer Group

    Consumers can join a group by using the samegroup.id.

    The maximum parallepsm of a group is that the number of consumers in the group ← no of partitions.

    Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.

    Kafka guarantees that a message is only ever read by a single consumer in the group.

    Consumers can see the message in the order they were stored in the log.

Re-balancing of a Consumer

Adding more processes/threads will cause Kafka to re-balance. If any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. During this re-balance, Kafka will assign available partitions to the available threads, possibly moving a partition to another process.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.cpents.consumer.KafkaConsumer;
import org.apache.kafka.cpents.consumer.ConsumerRecords;
import org.apache.kafka.cpents.consumer.ConsumerRecord;

pubpc class ConsumerGroup {
   pubpc static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deseriapzer",          
         "org.apache.kafka.common.seriapzation.ByteArraySeriapzer");
      props.put("value.deseriapzer", 
         "org.apache.kafka.common.seriapza-tion.StringDeseriapzer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " &plus; topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s
", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Compilation

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/pbs/*" ConsumerGroup.java

Execution

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/pbs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/pbs/*":. 
ConsumerGroup <topic-name> my-group

Here we have created a sample group name as my-group with two consumers. Similarly, you can create your group and number of consumers in the group.

Input

Open producer CLI and send some messages pke −

Test consumer group 01
Test consumer group 02

Output of the First Process

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Output of the Second Process

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Now hopefully you would have understood SimpleConsumer and ConsumeGroup by using the Java cpent demo. Now you have an idea about how to send and receive messages using a Java cpent. Let us continue Kafka integration with big data technologies in the next chapter.

Advertisements