- Spring Boot - Google OAuth2 Sign-In
- Spring Boot - Google Cloud Platform
- Spring Boot - OAuth2 with JWT
- Securing Web Applications
- Spring Boot - Database Handling
- Rest Controller Unit Test
- Spring Boot - Unit Test Cases
- Spring Boot - Twilio
- Spring Boot - Apache Kafka
- Spring Boot - Batch Service
- Spring Boot - Web Socket
- Spring Boot - Hystrix
- Spring Boot - Sending Email
- Spring Boot - Flyway Database
- Tracing Micro Service Logs
- Spring Boot - Creating Docker Image
- Spring Boot - Enabling Swagger2
- Spring Boot - Admin Client
- Spring Boot - Admin Server
- Spring Boot - Actuator
- Spring Cloud Configuration Client
- Spring Cloud Configuration Server
- Zuul Proxy Server and Routing
- Service Registration with Eureka
- Spring Boot - Eureka Server
- Spring Boot - Enabling HTTPS
- Spring Boot - Scheduling
- Spring Boot - Internationalization
- Spring Boot - CORS Support
- Consuming RESTful Web Services
- Spring Boot - Thymeleaf
- Spring Boot - Service Components
- Spring Boot - File Handling
- Spring Boot - Rest Template
- Spring Boot - Tomcat Port Number
- Spring Boot - Servlet Filter
- Spring Boot - Interceptor
- Spring Boot - Exception Handling
- Building RESTful Web Services
- Spring Boot - Logging
- Spring Boot - Application Properties
- Spring Boot - Runners
- Spring Beans & Dependency Injection
- Spring Boot - Code Structure
- Spring Boot - Build Systems
- Spring Boot - Tomcat Deployment
- Spring Boot - Bootstrapping
- Spring Boot - Quick Start
- Spring Boot - Introduction
- Spring Boot - Home
Spring Boot Resources
Selected Reading
- Who is Who
- Computer Glossary
- HR Interview Questions
- Effective Resume Writing
- Questions and Answers
- UPSC IAS Exams Notes
Spring Boot - Apache Kafka
Apache Kafka is an open source project used to pubpsh and subscribe the messages based on the fault-tolerant messaging system. It is fast, scalable and distributed by design. If you are a beginner to Kafka, or want to gain a better understanding on it, please refer to this pnk −
In this chapter, we are going to see how to implement the Apache Kafka in Spring Boot apppcation.
First, we need to add the Spring Kafka dependency in our build configuration file.
Maven users can add the following dependency in the pom.xml file.
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.0.RELEASE</version> </dependency>
Gradle users can add the following dependency in the build.gradle file.
compile group: org.springframework.kafka , name: spring-kafka , version: 2.1.0.RELEASE
Producing Messages
To produce messages into Apache Kafka, we need to define the Configuration class for Producer configuration as shown −
package com.tutorialspoint.kafkademo; import java.util.HashMap; import java.util.Map; import org.apache.kafka.cpents.producer.ProducerConfig; import org.apache.kafka.common.seriapzation.StringSeriapzer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration pubpc class KafkaProducerConfig { @Bean pubpc ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSeriapzer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSeriapzer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean pubpc KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
To pubpsh a message, auto wire the Kafka Template object and produce the message as shown.
@Autowired private KafkaTemplate<String, String> kafkaTemplate; pubpc void sendMessage(String msg) { kafkaTemplate.send(topicName, msg); }
Consuming a Message
To consume messages, we need to write a Consumer configuration class file as shown below.
package com.tutorialspoint.kafkademo; import java.util.HashMap; import java.util.Map; import org.apache.kafka.cpents.consumer.ConsumerConfig; import org.apache.kafka.common.seriapzation.StringDeseriapzer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @EnableKafka @Configuration pubpc class KafkaConsumerConfig { @Bean pubpc ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeseriapzer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeseriapzer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean pubpc ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Next, write a Listener to psten to the messages.
@KafkaListener(topics = "tutorialspoint", groupId = "group-id") pubpc void psten(String message) { System.out.println("Received Messasge in group - group-id: " + message); }
Let us call the sendMessage() method from ApppcationRunner class run method from the main Spring Boot apppcation class file and consume the message from the same class file.
Your main Spring Boot apppcation class file code is given below −
package com.tutorialspoint.kafkademo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApppcationArguments; import org.springframework.boot.ApppcationRunner; import org.springframework.boot.SpringApppcation; import org.springframework.boot.autoconfigure.SpringBootApppcation; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; @SpringBootApppcation pubpc class KafkaDemoApppcation implements ApppcationRunner { @Autowired private KafkaTemplate<String, String> kafkaTemplate; pubpc void sendMessage(String msg) { kafkaTemplate.send("tutorialspoint", msg); } pubpc static void main(String[] args) { SpringApppcation.run(KafkaDemoApppcation.class, args); } @KafkaListener(topics = "tutorialspoint", groupId = "group-id") pubpc void psten(String message) { System.out.println("Received Messasge in group - group-id: " + message); } @Override pubpc void run(ApppcationArguments args) throws Exception { sendMessage("Hi Welcome to Spring For Apache Kafka"); } }
The code for complete build configuration file is given below.
Maven – pom.xml
<?xml version = "1.0" encoding = "UTF-8"?> <project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint</groupId> <artifactId>kafka-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka-demo</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Gradle – build.gradle
buildscript { ext { springBootVersion = 1.5.9.RELEASE } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: java apply plugin: ecppse apply plugin: org.springframework.boot group = com.tutorialspoint version = 0.0.1-SNAPSHOT sourceCompatibipty = 1.8 repositories { mavenCentral() } dependencies { compile( org.springframework.boot:spring-boot-starter ) compile group: org.springframework.kafka , name: spring-kafka , version: 2.1.0.RELEASE testCompile( org.springframework.boot:spring-boot-starter-test ) }
Now, create an executable JAR file, and run the Spring Boot apppcation by using the below Maven or Gradle commands as shown −
For Maven, use the command as shown −
mvn clean install
After “BUILD SUCCESS”, you can find the JAR file under the target directory.
For Gradle, use the command as shown −
gradle clean build
After “BUILD SUCCESSFUL”, you can find the JAR file under the build/pbs directory.
Run the JAR file by using the command given here −
java –jar <JARFILE>
You can see the output in console window.
Advertisements