- Apache Kafka - Applications
- Apache Kafka - Tools
- Real Time Application(Twitter)
- Integration With Spark
- Integration With Storm
- Consumer Group Example
- Simple Producer Example
- Apache Kafka - Basic Operations
- Apache Kafka - Installation Steps
- Apache Kafka - Work Flow
- Apache Kafka - Cluster Architecture
- Apache Kafka - Fundamentals
- Apache Kafka - Introduction
- Apache Kafka - Home
Apache Kafka Useful Resources
Selected Reading
- Who is Who
- Computer Glossary
- HR Interview Questions
- Effective Resume Writing
- Questions and Answers
- UPSC IAS Exams Notes
Apache Kafka - Simple Producer Example
Let us create an apppcation for pubpshing and consuming messages using a Java cpent. Kafka producer cpent consists of the following API’s.
KafkaProducer API
Let us understand the most important set of Kafka producer API in this section. The central part of the KafkaProducer API is KafkaProducer
class. The KafkaProducer class provides an option to connect a Kafka broker in its constructor with the following methods.
KafkaProducer class provides send method to send messages asynchronously to a topic. The signature of send() is as follows
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord − The producer manages a buffer of records waiting to be sent.
Callback − A user-suppped callback to execute when the record has been acknowl-edged by the server (null indicates no callback).
KafkaProducer class provides a flush method to ensure all previously sent messages have been actually completed. Syntax of the flush method is as follows −
pubpc void flush()
KafkaProducer class provides partitionFor method, which helps in getting the partition metadata for a given topic. This can be used for custom partitioning. The signature of this method is as follows −
pubpc Map metrics()
It returns the map of internal metrics maintained by the producer.
pubpc void close() − KafkaProducer class provides close method blocks until all previously sent requests are completed.
Producer API
The central part of the Producer API is Producer
class. Producer class provides an option to connect Kafka broker in its constructor by the following methods.
The Producer Class
The producer class provides send method to send messages to either single or multiple topics using the following signatures.
pubpc void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. pubpc void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,”async”) ProducerConfig config = new ProducerConfig(prop);
There are two types of producers – Sync and Async.
The same API configuration apppes to Sync
producer as well. The difference between them is a sync producer sends messages directly, but sends messages in background. Async producer is preferred when you want a higher throughput. In the previous releases pke 0.8, an async producer does not have a callback for send() to register error handlers. This is available only in the current release of 0.9.
pubpc void close()
Producer class provides close method to close the producer pool connections to all Kafka bro-kers.
Configuration Settings
The Producer API’s main configuration settings are psted in the following table for better under-standing −
S.No | Configuration Settings and Description |
---|---|
1 |
cpent.id identifies producer apppcation |
2 |
producer.type either sync or async |
3 |
acks The acks config controls the criteria under producer requests are con-sidered complete. |
4 |
retries If producer request fails, then automatically retry with specific value. |
5 |
bootstrap.servers bootstrapping pst of brokers. |
6 |
pnger.ms if you want to reduce the number of requests you can set pnger.ms to something greater than some value. |
7 |
key.seriapzer Key for the seriapzer interface. |
8 |
value.seriapzer value for the seriapzer interface. |
9 |
batch.size Buffer size. |
10 |
buffer.memory controls the total amount of memory available to the producer for buff-ering. |
ProducerRecord API
ProducerRecord is a key/value pair that is sent to Kafka cluster.ProducerRecord class constructor for creating a record with partition, key and value pairs using the following signature.
pubpc ProducerRecord (string topic, int partition, k key, v value)
Topic − user defined topic name that will appended to record.
Partition − partition count
Key − The key that will be included in the record.
Value − Record contents
pubpc ProducerRecord (string topic, k key, v value)
ProducerRecord class constructor is used to create a record with key, value pairs and without partition.
Topic − Create a topic to assign record.
Key − key for the record.
Value − record contents.
pubpc ProducerRecord (string topic, v value)
ProducerRecord class creates a record without partition and key.
Topic − create a topic.
Value − record contents.
The ProducerRecord class methods are psted in the following table −
S.No | Class Methods and Description |
---|---|
1 |
pubpc string topic() Topic will append to the record. |
2 |
pubpc K key() Key that will be included in the record. If no such key, null will be re-turned here. |
3 |
pubpc V value() Record contents. |
4 |
partition() Partition count for the record |
SimpleProducer apppcation
Before creating the apppcation, first start ZooKeeper and Kafka broker then create your own topic in Kafka broker using create topic command. After that create a java class named Sim-pleProducer.java
and type in the following coding.
//import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.cpents.producer.Producer; //import KafkaProducer packages import org.apache.kafka.cpents.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.cpents.producer.ProducerRecord; //Create java class named “SimpleProducer” pubpc class SimpleProducer { pubpc static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", “localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", “all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("pnger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.seriapzer", "org.apache.kafka.common.seriapzation.StringSeriapzer"); props.put("value.seriapzer", "org.apache.kafka.common.seriapzation.StringSeriapzer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println(“Message sent successfully”); producer.close(); } }
Compilation − The apppcation can be compiled using the following command.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/pb/*” *.java
Execution − The apppcation can be executed using the following command.
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/pb/*”:. SimpleProducer <topic-name>
Output
Message sent successfully To check the above output open new terminal and type Consumer CLI command to receive messages. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning 1 2 3 4 5 6 7 8 9 10
Simple Consumer Example
As of now we have created a producer to send messages to Kafka cluster. Now let us create a consumer to consume messages form the Kafka cluster. KafkaConsumer API is used to consume messages from the Kafka cluster. KafkaConsumer class constructor is defined below.
pubpc KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs − Return a map of consumer configs.
KafkaConsumer class has the following significant methods that are psted in the table below.
S.No | Method and Description |
---|---|
1 |
pubpc java.util.Set<TopicPar-tition> assignment() Get the set of partitions currently assigned by the con-sumer. |
2 |
pubpc string subscription() Subscribe to the given pst of topics to get dynamically as-signed partitions. |
3 |
pubpc void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener pstener) Subscribe to the given pst of topics to get dynamically as-signed partitions. |
4 |
pubpc void unsubscribe() Unsubscribe the topics from the given pst of partitions. |
5 |
pubpc void sub-scribe(java.util.List<java.lang.String> topics) Subscribe to the given pst of topics to get dynamically as-signed partitions. If the given pst of topics is empty, it is treated the same as unsubscribe(). |
6 |
pubpc void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener pstener) The argument pattern refers to the subscribing pattern in the format of regular expression and the pstener argument gets notifications from the subscribing pattern. |
7 |
pubpc void as-sign(java.util.List<TopicParti-tion> partitions) Manually assign a pst of partitions to the customer. |
8 |
poll() Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. This will return error, if the topics are not subscribed before the polpng for data. |
9 |
pubpc void commitSync() Commit offsets returned on the last poll() for all the sub-scribed pst of topics and partitions. The same operation is appped to commitAsyn(). |
10 |
pubpc void seek(TopicPartition partition, long offset) Fetch the current offset value that consumer will use on the next poll() method. |
11 |
pubpc void resume() Resume the paused partitions. |
12 |
pubpc void wakeup() Wakeup the consumer. |
ConsumerRecord API
The ConsumerRecord API is used to receive records from the Kafka cluster. This API consists of a topic name, partition number, from which the record is being received and an offset that points to the record in a Kafka partition. ConsumerRecord class is used to create a consumer record with specific topic name, partition count and <key, value> pairs. It has the following signature.
pubpc ConsumerRecord(string topic,int partition, long offset,K key, V value)
Topic − The topic name for consumer record received from the Kafka cluster.
Partition − Partition for the topic.
Key − The key of the record, if no key exists null will be returned.
Value − Record contents.
ConsumerRecords API
ConsumerRecords API acts as a container for ConsumerRecord. This API is used to keep the pst of ConsumerRecord per partition for a particular topic. Its Constructor is defined below.
pubpc ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)
TopicPartition − Return a map of partition for a particular topic.
Records − Return pst of ConsumerRecord.
ConsumerRecords class has the following methods defined.
S.No | Methods and Description |
---|---|
1 |
pubpc int count() The number of records for all the topics. |
2 |
pubpc Set partitions() The set of partitions with data in this record set (if no data was returned then the set is empty). |
3 |
pubpc Iterator iterator() Iterator enables you to cycle through a collection, obtaining or re-moving elements. |
4 |
pubpc List records() Get pst of records for the given partition. |
Configuration Settings
The configuration settings for the Consumer cpent API main configuration settings are psted below −
S.No | Settings and Description |
---|---|
1 |
bootstrap.servers Bootstrapping pst of brokers. |
2 |
group.id Assigns an inspanidual consumer to a group. |
3 |
enable.auto.commit Enable auto commit for offsets if the value is true, otherwise not committed. |
4 |
auto.commit.interval.ms Return how often updated consumed offsets are written to ZooKeeper. |
5 |
session.timeout.ms Indicates how many milpseconds Kafka will wait for the ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages. |
SimpleConsumer Apppcation
The producer apppcation steps remain the same here. First, start your ZooKeeper and Kafka broker. Then create a SimpleConsumer
apppcation with the java class named SimpleCon-sumer.java
and type the following code.
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.cpents.consumer.KafkaConsumer; import org.apache.kafka.cpents.consumer.ConsumerRecords; import org.apache.kafka.cpents.consumer.ConsumerRecord; pubpc class SimpleConsumer { pubpc static void main(String[] args) throws Exception { if(args.length == 0){ System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deseriapzer", "org.apache.kafka.common.seriapzation.StringDeseriapzer"); props.put("value.deseriapzer", "org.apache.kafka.common.seriapzation.StringDeseriapzer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); //Kafka Consumer subscribes pst of topics here. consumer.subscribe(Arrays.asList(topicName)) //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s ", record.offset(), record.key(), record.value()); } } }
Compilation − The apppcation can be compiled using the following command.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/pb/*” *.java
Execution − The apppcation can be executed using the following command
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/pb/*”:. SimpleConsumer <topic-name>
Input − Open the producer CLI and send some messages to the topic. You can put the smple input as ‘Hello Consumer’.
Output − Following will be the output.
Subscribed to topic Hello-Kafka offset = 3, key = null, value = Hello ConsumerAdvertisements