English 中文(简体)
gRPC - Quick Guide
  • 时间:2024-09-17

gRPC - Quick Guide


Previous Page Next Page  

gRPC - Introduction

Before we jump into gRPC, let us have a brief background about remote procedure calls which is what gRPC does.

What are Remote Procedure Calls?

Remote procedure calls are the function calls which look pke general/local function calls but differ in the fact that the execution of remote functions calls typically take place on a different machine. However, for the developer writing the code, there is minimal difference between a function call and a remote call. The calls typically follow the cpent-server model, where the machine which executes the call acts as the server.

Why do we need remote procedure calls?

Remote procedure calls provide a way to execute codes on another machine. It becomes of utmost importance in big, bulky products where a single machine cannot host all the code which is required for the overall product to function.

In microservice architecture, the apppcation is broken down into small services and these services communicate with each other via messaging queue and APIs. And all of this communication takes place over a network where different machines/nodes serve different functionapty based on the service they host. So, creating remote procedure calls becomes a critical aspect when it comes to working in a distributed environment.

Why gRPC?

Google Remote Procedure Calls (gRPC) provides a framework to perform the remote procedure calls. But there are some other pbraries and mechanisms to execute code on remote machine. So, what makes gRPC special? Let s find out.

    Language independent − gRPC uses Google Protocol Buffer internally. So, multiple languages can be used such as Java, Python, Go, Dart, etc. A Java cpent can make a procedure call and a server that uses Python can respond, effectively, achieving language independence.

    Efficient Data Compaction − In microservice environment, given that multiple communications take place over a network, it is critical that the data that we are sending is as succinct as possible. We need to avoid any superfluous data to ensure that the data is quickly transferred. Given that gRPC uses Google Protocol Buffer internally, it has this feature to its advantage.

    Efficient seriapzation and deseriapzation − In microservice environment, given that multiple communications take place over a network, it is critical that we seriapze and deseriapze the data as quickly as possible. Given that gRPC uses Google Protocol Buffer internally, it ensures quick seriapzing and deseriapzing of data.

    Simple to use − gRPC already has a pbrary and plugins that auto-generate procedure code (as we will see in the upcoming chapters). For simple use-cases, it can be used as local function calls.

gRPC vs REST using JSON

Let’s take a look at how other ways to transfer data over a network stack up against Protobuf.

Feature gRPC HTTP using JSON/XML
Language independent Yes Yes
HTTP version HTTP/2 HTTP 1.1
Specifying Domain Schema .proto files (Google Protocol Buffer) None
Seriapzed data size Least High (higher for XML)
Human Readable No, as it uses separate encoding schema Yes, as it uses text based format
Seriapzation speed Fastest Slower (slowest for XML)
Data type support Richer. Supports complex data types pke Any, one of etc. Supports basic data types
Support for evolving schema Yes No

gRPC - Setup

Protoc Setup

Note that the setup is required only for Python. For Java, all of this is handled by Maven file. Let us install the "proto" binary which we will use to auto-generate the code of our ".proto" files. The binaries can be found at https://github.com/protocolbuffers/protobuf/releases/. Choose the correct binary based on the OS. We will install the proto binary on Windows, but the steps are not very different for Linux.

Once installed, ensure that you are able to access it via command pne −


protoc --version
pbprotoc 3.15.6

It means that Protobuf is correctly installed. Now, let us move to the Project Structure.

We also need to setup the plugin required for gRPC code generation.

For Python, we need to execute the following commands −


python -m pip install grpcio
python -m pip install grpcio-tools

It will install all the required binaries and add them to the path.

Project Structure

Here is the overall project structure that we would have −

Project Structure

The code related to inspanidual languages go into their respective directories. We will have a separate directory to store our proto files. And, here is the project structure that we would be having for Java −

Project Structure

Project Dependency

Now that we have installed protoc, we can auto-generate the code from the proto files using protoc. Let us first create a Java project.

Following is the Maven configuration that we will use for our Java project. Note that it contains the required pbrary for Protobuf as well.

Example


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
   http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.grpc.point</groupId>
   <artifactId>grpc-point</artifactId>
   <version>1.0</version>
   <packaging>jar</packaging>

   <properties>
      <maven.compiler.source>1.8</maven.compiler.source>
      <maven.compiler.target>1.8</maven.compiler.target>
   </properties>

   <dependencies>
      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-netty-shaded</artifactId>
         <version>1.38.0</version>
      </dependency>
      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-protobuf</artifactId>
         <version>1.38.0</version>
      </dependency>
      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-stub</artifactId>
         <version>1.38.0</version>
      </dependency>
      <dependency> <!-- necessary for Java 9+ -->
         <groupId>org.apache.tomcat</groupId>
         <artifactId>annotations-api</artifactId>
         <version>6.0.53</version>
         <scope>provided</scope>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.slf4j/slf4jsimple-->
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.30</version>
      </dependency>
   </dependencies>
      
   <build>
      <extensions>
         <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.6.2</version>
            </extension>
      </extensions>
      <plugins>
         <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>build-helper-maven-plugin</artifactId>
            <version>1.1</version>
            <executions>
               <execution>
                  <id>test</id>
                  <phase>generate-sources</phase>
                  <goals>
                     <goal>add-source</goal>
                  </goals>
                  <configuration>
                     <sources>
                        <source>${basedir}/target/generated-sources</source>
                     </sources>
                  </configuration>
               </execution>
            </executions>
         </plugin>
         <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
               <protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
               <pluginId>grpc-java</pluginId>
               <pluginArtifact>io.grpc:protoc-gen-grpcjava:1.38.0:exe:${os.detected.classifier}</pluginArtifact>
               <protoSourceRoot>../common_proto_files</protoSourceRoot>
            </configuration>
            <executions>
               <execution>
                  <goals>
                     <goal>compile</goal>
                     <goal>compile-custom</goal>
                  </goals>
               </execution>
            </executions>
         </plugin>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <configuration>
            </configuration>
            <executions>
               <execution>
                  <phase>package</phase>
                  <goals>
                     <goal>shade</goal>
                  </goals>
               </execution>
            </executions>
         </plugin>
      </plugins>
   </build>
</project>

gRPC - Hello World App with Java

Let us now create a basic "Hello World" pke app that will use gRPC along with Java.

.proto file

First let us define the "greeting.proto" file in common_proto_files


syntax = "proto3";
option java_package = "com.tp.greeting";
service Greeter {
   rpc greet (CpentInput) returns (ServerOutput) {}
}
message CpentInput {
   string greeting = 1;
   string name = 2;
}
message ServerOutput {
   string message = 1;
}

Let us now take a closer look at this code block and see the role of each pne −


syntax = "proto3";

The "syntax" here represents the version of Protobuf that we are using. So, we are using the latest version 3 and the schema thus can use all the syntax which is vapd for version 3.


package tutorial;

The package here is used for confpct resolution if, say, we have multiple classes/members with the same name.


option java_package = "com.tp.greeting";

This argument is specific to Java, i.e., the package where to auto-generate the code from the ".proto" file.


service Greeter {
   rpc greet(CpentInput) returns (ServerOutput) {}
}

This block represents the name of the service "Greeter" and the function name "greet" which can be called. The "greet" function takes in the input of type "CpentInput" and returns the output of type "ServerOutput". Now let us look at these types.


message CpentInput {
   string greeting = 1;
   string name = 2;
}

In the above block, we have defined the CpentInput which contains two attributes, "greeting" and the "name", both of them being strings. The cpent is supposed to send the object of type "CpentInput" to the server.


message ServerOutput {
   string message = 1;
}

In the above block, we have defined that, given a "CpentInput", the server would return the "ServerOutput" which will contain a single attribute "message". The server is supposed to send the object of type "ServerOutput" to the cpent.

Note that we already had the Maven setup done to auto-generating our class files as well as our RPC code. So, now we can simply compile our project −


mvn clean install

This should auto-generate the source code required for us to use gRPC. The source code would be placed under −


Protobuf class code: target/generated-sources/protobuf/java/com.tp.greeting
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.greeting

Setting up gRPC server

Now that we have defined the proto file which contains the function definition, let us setup a server which can serve to call these functions.

Let us write our server code to serve the above function and save it in com.tp.grpc.GreetServer.java

Example


package com.tp.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.CpentInput;
import com.tp.greeting.Greeting.ServerOutput;

pubpc class GreetServer {
   private static final Logger logger = Logger.getLogger(GreetServer.class.getName());
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();
       
      logger.info("Server started, pstening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
      @Override
      pubpc void greet(CpentInput req, StreamObserver<ServerOutput> responseObserver) {
         logger.info("Got request from cpent: " + req);
         ServerOutput reply = ServerOutput.newBuilder().setMessage(
            "Server says " + """ + req.getGreeting() + " " + req.getName() + """
         ).build();
         responseObserver.onNext(reply);
         responseObserver.onCompleted();
      }
   }
   pubpc static void main(String[] args) throws IOException, InterruptedException {
      final GreetServer greetServer = new GreetServer();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
} 

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we create a gRPC server at a specified port.

    But before starting the server, we assign the server the service which we want to run, i.e., in our case, the Greeter service.

    For this purpose, we need to pass the service instance to the server, so we go ahead and create a service instance, i.e., in our case, the GreeterImpl.

    The service instance needs to provide an implementation of the method/function which is present in the ".proto" file, i.e., in our case,the greet method.

    The method expects an object of type as defined in the ".proto" file, i.e., in our case, the CpentInput.

    The method works on the above input, does the computation, and then is supposed to return the mentioned output in the ".proto" file, i.e., in our case, the ServerOutput.

    Finally, we also have a shutdown hook to ensure clean shutting down of the server when we are done executing our code.

Setting up gRPC cpent

Now that we have written the code for the server, let us setup a cpent which can call these functions.

Let us write our cpent code to call the above function and save it in com.tp.grpc.GreetCpent.java

Example


package com.tp.grpc;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class GreetCpent {
   private static final Logger logger = Logger.getLogger(GreetCpent.class.getName());
   private final GreeterGrpc.GreeterBlockingStub blockingStub;
   
   pubpc GreetCpent(Channel channel) {
      blockingStub = GreeterGrpc.newBlockingStub(channel);
   }
   pubpc void makeGreeting(String greeting, String username) {
      logger.info("Sending greeting to server: " + greeting + " for name: " + username);
      CpentInput request = CpentInput.newBuilder().setName(username).setGreeting(greeting).build();
      logger.info("Sending to server: " + request);
      ServerOutput response;
      try {
         response = blockingStub.greet(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following from the server: " + response.getMessage());
   }
   
   pubpc static void main(String[] args) throws Exception {
      String greeting = args[0];
      String username = args[1];
      String serverAddress = "localhost:50051";
	   ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         GreetCpent cpent = new GreetCpent(channel);
         cpent.makeGreeting(greeting, username);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from main method, we accept two arguments, i.e., name and the greeting.

    We setup a Channel for gRPC communication to our server.

    Next, we create a blocking stub using the channel we created. This is where we have the service "Greeter" whose functions we plan to call.A stub is nothing but a wrapper that hides the remote call complexity from the caller.

    Then, we simply create the expected input defined in the ".proto" file,i.e., in our case, the CpentInput.

    We ultimately make the call and await result from the server.

    Finally, we close the channel to avoid any resource leak.

So, that is our cpent code.

Cpent Server Call

Now, that we have defined our proto file, written our server, and the cpent code, let us proceed and execute this code and see things in actions.

For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar com.tp.grpc.GreetServer

We would see the following output −

Output


Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start
INFO: Server started, pstening on 50051

The above output imppes that the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar com.tp.grpc.GreetCpent 
Hello Jane

We would see the following output −

Output


Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetCpent greet
INFO: Sending greeting to server: Hello for name: Jane
Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetCpent greet
INFO: Sending to server: greeting: "Hello"
name: "Jane"

Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetCpent greet
INFO: Got following from the server: Server says "Hello Jane"

And now, if we open the server logs, we will get to see the following −


Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start
INFO: Server started, pstening on 50051
Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetServer$GreeterImpl 
greet
INFO: Got request from cpent: greeting: "Hello"
name: "Jane"

So, the cpent was able to call the server as expected and the server responded with greeting the cpent back.

gRPC - Hello World App with Python

Let us now create a basic "Hello World" pke app that will use gRPC along with Python.

.proto file

First let us define the greeting.proto file in common_proto_files


syntax = "proto3";

service Greeter {
   rpc greet (CpentInput) returns (ServerOutput) {}
}
message CpentInput {
   string greeting = 1;
   string name = 2;
}
message ServerOutput {
   string message = 1;
}

Let us now take a closer look at each of the pnes in the above block −


syntax = "proto3";

The "syntax" here represents the version of Protobuf we are using. So, we are using the latest version 3 and the schema thus can use all the syntax which is vapd for version 3.


package tutorial;

The package here is used for confpct resolution if, say, we have multiple classes/members with the same name.


service Greeter {
   rpc greet(CpentInput) returns (ServerOutput) {}
}

This block represents the name of the service "Greeter" and the function name "greet" which can be called. The "greet" function takes in the input of type "CpentInput" and returns the output of type "ServerOutput". Now let us look at these types.


message CpentInput {
   string greeting = 1;
   string name = 2;
}

In the above block, we have defined the CpentInput which contains two attributes, "greeting" and the "name" both of them being strings. The cpent is supposed to send the object of type of "CpentInput" to the server.


message ServerOutput {
   string message = 1;
}

Here, we have also defined that, given a "CpentInput", the server would return the "ServerOutput" with a single attribute "message". The server is supposed to send the object of type "ServerOutput" to the cpent.

Now, let us generate the underlying code for the Protobuf classes and the gRPC classes. For doing that, we need to execute the following command −


python -m grpc_tools.protoc -I ..common_proto_files --
python_out=../python --grpc_python_out=. greeting.proto

However, note that to execute the command, we need to install the correct dependency as mentioned in the setup section of the tutorial.

This should auto-generate the source code required for us to use gRPC. The source code would be placed under −


Protobuf class code: python/greeting_pb2.py
Protobuf gRPC code: python/greeting_pb2_grpcpb2.py

Setting up gRPC server

Now that we have defined the proto file which contains the function definition, let us setup a server which can call these functions.

Let us write our server code to serve the above function and save it in server.py

Example


from concurrent import futures

import grpc
import greeting_pb2
import greeting_pb2_grpc

class Greeter(greeting_pb2_grpc.GreeterServicer):
   def greet(self, request, context):
      print("Got request " + str(request))
      return greeting_pb2.ServerOutput(message= {0} {1}! .format(request.greeting, request.name))
	  
def server():
   server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
   greeting_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
   server.add_insecure_port( [::]:50051 )
   print("gRPC starting")
   server.start()
   server.wait_for_termination()
server()

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from main method, we create a gRPC server at a specified port.

    But before starting the server, we assign the server the service which we want to run, i.e., in our case, the Greeter service.

    For this purpose, we need to pass the service instance to the server, so we go ahead and create a service instance, i.e., in our case, the Greeter.

    The service instance need to provide an implementation of the method/function which is present in the .proto file, i.e., in our case, the greet method.

    The method expects the an object of type as defined in the .proto file,i.e., for us, the request.

    The method works on the above input, does the computation, and then is supposed to return the mentioned output in the .proto file, i.e., in our case, the ServerOutput.

Setting up gRPC cpent

Now that we have written the code for the server, let us setup a cpent which can call these functions.

Let us write our cpent code to call the above function and save it in cpent.py

Example


import grpc

import greeting_pb2
import greeting_pb2_grpc

def run():
   with grpc.insecure_channel( localhost:50051 ) as channel:
      stub = greeting_pb2_grpc.GreeterStub(channel)
      response = stub.greet(greeting_pb2.CpentInput(name= John , greeting = "Yo"))
   print("Greeter cpent received following from server: " + response.message)   
run()

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we have setup a Channel for gRPC communication with our server.

    And then, we create a stub using the channel. This is where we use the service "Greeter" whose functions we plan to call. A stub is nothing but a wrapper which hides the complexity of the remote call from the caller.

    Then, we simply create the expected input defined in the proto file, i.e., in our case, the CpentInput. We have hard-coded two arguments, i.e., name and the greeting.

    We ultimately make the call and await the result from the server.

So, that is our cpent code.

Cpent Server Call

Now, that we have defined our proto file, written our server, and the cpent code, let us proceed and execute this code and see things in action.

For running the code, fire up two shells. Start the server on the first shell by executing the following command −


python .server.py

Output

We would get the following output −


gRPC starting

The above output means that the server has started.

Now, let us start the cpent.


python .cpent.py

We would see the following output −

Output


Greeter cpent received following from server: Yo John!

And now, if we open the server logs, we will get to see the following data −


gRPC starting
Got request greeting: "Yo"
name: "John"

So, as we see, the cpent was able to call the server as expected and the server responded with greeting the cpent back.

gRPC - Unary gRPC

We will now look at various types of communication that the gRPC framework supports. We will use an example of Bookstore where the cpent can search and place an order for book depvery.

Let us see unary gRPC communication where we let the cpent search for a title and return randomly one of the book matching the title queried for.

.proto file

First let us define the bookstore.proto file in common_proto_files −


syntax = "proto3";
option java_package = "com.tp.bookstore";

service BookStore {
   rpc first (BookSearch) returns (Book) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

Let us now take a closer look at each of the pnes in the above block.


syntax = "proto3";

Syntax

The "syntax" here represents the version of Protobuf we are using. We are using the latest version 3 and the schema thus can use all the syntax which is vapd for version 3.


package tutorial;

The package here is used for confpct resolution if, say, we have multiple classes/members with the same name.


option java_package = "com.tp.bookstore";

This argument is specific to Java, i.e., the package where to auto-generate the code from the .proto file.


service BookStore {
   rpc first (BookSearch) returns (Book) {}
}

This represents the name of the service "BookStore" and the function name "first" which can be called. The "first" function takes in the input of type "BookSearch" and returns the output of type "Book". So, effectively, we let the cpent search for a title and return one of the book matching the title queried for.

Now let us look at these types.


message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}

In the above block, we have defined the BookSearch which contains the attributes pke name, author and genre. The cpent is supposed to send the object of type of "BookSearch" to the server.


message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

Here, we have also defined that, given a "BookSearch", the server would return the "Book" which contains book attributes along with the price of the book. The server is supposed to send the object of type of "Book" to the cpent.

Note that we already had the Maven setup done for auto-generating our class files as well as our RPC code. So, now we can simply compile our project −


mvn clean install

This should auto-generate the source code required for us to use gRPC. The source code would be placed under −


Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

Setting up gRPC Server

Now that we have defined the proto file which contains the function definition, let us setup a server which can call these functions.

Let us write our server code to serve the above function and save it in com.tp.bookstore.BookeStoreServerUnary.java

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
pubpc class BookeStoreServerUnary {
   private static final Logger logger = Logger.getLogger(BookeStoreServerUnary.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, pstening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " + searchQuery.getName());
         List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->
            title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());

         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we create a gRPC server at a specified port.

    But before starting the server, we assign the server the service which we want to run, i.e., in our case, the BookStore service.

    For this purpose, we need to pass the service instance to the server, so we go ahead and create a service instance, i.e., in our case, the BookStoreImpl

    The service instance need to provide an implementation of the method/function which is present in the .proto file, i.e., in our case, the first method.

    The method expects an object of type as defined in the .proto file, i.e.,for us the BookSearch

    The method searches for the book in the available bookMap and then returns the Book by calpng the onNext() method. Once done, the server announces that it is done with the output by calpng onCompleted()

    Finally, we also have a shutdown hook to ensure clean shutting down of the server when we are done executing our code.

Setting up gRPC Cpent

Now that we have written the code for the server, let us setup a cpent which can call these functions.

Let us write our cpent code to call the above function and save it in com.tp.bookstore.BookStoreCpentUnaryBlocking.java

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentUnaryBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreCpentUnaryBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
	
   pubpc BookStoreCpentUnaryBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   pubpc void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request = BookSearch.newBuilder().setName(bookName).build();
 
   Book response; 
   try {
      response = blockingStub.first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   }
   pubpc static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
	  
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
 
      try {
         BookStoreCpentUnaryBlocking cpent = new 
         BookStoreCpentUnaryBlocking(channel);
         cpent.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, 
         TimeUnit.SECONDS);
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we accept one argument, i.e., the title of the book we want to search for.

    We setup a Channel for gRPC communication with our server.

    And then, we create a blocking stub using the channel. This is where we choose the service "BookStore" whose functions we plan to call. A "stub" is nothing but a wrapper which hides the complexity of the remote call from the caller.

    Then, we simply create the expected input defined in the .proto file,i.e., in our case BookSearch and we add the title name we want the server to search for.

    We ultimately make the call and await the result from the server.

    Finally, we close the channel to avoid any resource leak.

So, that is our cpent code.

Cpent Server Call

To sum up, what we want to do is the following −

    Start the gRPC server.

    The Cpent queries the Server for a book with a given name/title.

    The Server searches the book in its store.

    The Server then responds with the book and its other attributes.

Now, that we have defined our proto file, written our server and the cpent code, let us proceed to execute this code and see things in action.

For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerUnary

We would see the following output −

Output


Jul 03, 2021 7:21:58 PM 
com.tp.bookstore.BookeStoreServerUnary start
INFO: Server started, pstening on 50051

The above output means the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookStoreCpentUnaryBlocking "To Kill"

We would see the following output −

Output


Jul 03, 2021 7:22:03 PM 
com.tp.bookstore.BookStoreCpentUnaryBlocking getBook
INFO: Querying for book with title: To Kill

Jul 03, 2021 7:22:04 PM 
com.tp.bookstore.BookStoreCpentUnaryBlocking getBook
INFO: Got following book from server: name: "To Kill 

MockingBird"
author: "Harper Lee"
price: 400

So, as we see, the cpent was able to get the book details by querying the server with the name of the book.

gRPC - Server Streaming RPC

Let us now discuss how server streaming works while using gRPC communication. In this case, the cpent will search for books with a given author. Assume the server requires some time to go through all the books. Instead of waiting to give all the books after going through all the books, the server instead would provide books in a streaming fashion, i.e., as soon as it finds one.

.proto file

First let us define the bookstore.proto file in common_proto_files −


syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc first (BookSearch) returns (stream Book) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

The following block represents the name of the service "BookStore" and the function name "searchByAuthor" which can be called. The "searchByAuthor" function takes in the input of type "BookSearch" and returns the stream of type "Book". So, effectively, we let the cpent search for a title and return one of the book matching the author queried for.


service BookStore {
   rpc searchByAuthor (BookSearch) returns (stream Book) {}
}

Now let us look at these types.


message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}

Here, we have defined BookSearch which contains a few attributes pke name, author and genre. The cpent is supposed to send the object of type "BookSearch" to the server.


message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

We have also defined that, given a "BookSearch", the server would return a stream of "Book" which contains the book attributes along with the price of the book. The server is supposed to send a stream of "Book".

Note that we already had the Maven setup done for auto-generating our class files as well as our RPC code. So, now we can simply compile our project −


mvn clean install

This should auto-generate the source code required for us to use gRPC. The source code would be placed under −


Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

Setting up gRPC Server

Now that we have defined the proto file which contains the function definition, let us setup a server which can serve call these functions.

Let us write our server code to serve the above function and save it in com.tp.bookstore.BookeStoreServerStreaming.java

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

pubpc class BookeStoreServerUnary {
   private static final Logger logger = Logger.getLogger(BookeStoreServerrStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, pstening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc void searchByAuthor(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with author: " + searchQuery.getAuthor());
         for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
            try {
               logger.info("Going through more books....");
               Thread.sleep(5000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
    
            if(bookEntry.getValue().getAuthor().startsWith(searchQuery.getAuthor())){
               logger.info("Found book with required author: " + bookEntry.getValue().getName()+ ". Sending....");

               responseObserver.onNext(bookEntry.getValue());
            } 
         }
         responseObserver.onCompleted();
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we create a gRPC server at a specified port.

    But before starting the server, we assign the server the service which we want to run, i.e., in our case, the BookStore service.

    For this purpose, we need to pass the service instance to the server, so we go ahead and create a service instance, i.e., in our case, the BookStoreImpl

    The service instance need to provide an implementation of the method/function which is present in the .proto file, i.e., in our case, the searchByAuthor method.

    The method expects an object of type as defined in the .proto file, i.e.,for us the BookSearch

    Note that we have added a sleep to mimic the operation of searching through all the books. In case of streaming, the server does not wait for all the searched books to be available. It returns the book as soon it is available by using the onNext() call.

    When the server is done with the request, it shuts down the channel by calpng onCompleted().

    Finally, we also have a shutdown hook to ensure clean shutting down of the server when we are done executing our code.

Setting up gRPC Cpent

Now that we have written the code for the server, let us setup a cpent which can call these functions.

Let us write our cpent code to call the above function and save it in com.tp.bookstore.BookStoreCpentServerStreamingBlocking.java

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentServerStreamingBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreCpentServerStreamingBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
	pubpc BookStoreCpentServerStreamingBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   pubpc void getBook((String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder()..setAuthor(author).build();
      Iterator<Book> response; 
      try {
         response = blockingStub.searchByAuthor(request);
         while(response.hasNext()) {
            logger.info("Found book: " + response.next());
         }
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
   }
   pubpc static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
	   
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
        .usePlaintext()
        .build();
 
      try {
         BookStoreCpentServerStreamingBlocking cpent = new BookStoreCpentUnaryBlocking(channel);
         cpent.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we accept one argument, i.e., the title of the book we want to search for.

    We setup a Channel for gRPC communication with our server.

    And then, we create a blocking stub using the channel. This is where we choose the service "BookStore" whose functions we plan to call.

    Then, we simply create the expected input defined in the .proto file,i.e., in our case, BookSearch, and we add the title that we want the server to search for.

    We ultimately make the call and get an iterator on vapd Books. When we iterate, we get the corresponding Books made available by the Server.

    Finally, we close the channel to avoid any resource leak.

So, that is our cpent code.

Cpent Server Call

To sum up, what we want to do is the following −

    Start the gRPC server.

    The Cpent queries the Server for a book with a given author.

    The Server searches the book in its store which is a time-consuming process.

    The Server responds whenever it finds a book with the given criteria. The Server does not wait for all the vapd books to be available. It sends the output as soon as it finds one. And then repeats the process.

Now, that we have defined our proto file, written our server and the cpent code, let us proceed to execute this code and see things in action.

For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerStreaming

We would see the following output −

Output


Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, pstening on 50051

The above output means the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookStoreCpentServerStreamingBlocking "Har"

We would see the following output −

Output


Jul 03, 2021 10:40:31 PM 
com.tp.bookstore.BookStoreCpentServerStreamingBlocking 
getBook
INFO: Querying for book with author: Har

Jul 03, 2021 10:40:37 PM 
com.tp.bookstore.BookStoreCpentServerStreamingBlocking 
getBook
INFO: Found book: name: "Go Set a Watchman"
author: "Harper Lee"
price: 700

Jul 03, 2021 10:40:42 PM 
com.tp.bookstore.BookStoreCpentServerStreamingBlocking 
getBook
INFO: Found book: name: "To Kill MockingBird"
author: "Harper Lee"
price: 400

So, as we see, the cpent was able to get the book details by querying the server with the name of the book. But more importantly, the cpent got the 1st book and the 2nd book at different timestamps, i.e., a gap of almost 5 seconds.

gRPC - Cpent Streaming RPC

Let us see now see how cpent streaming works while using gRPC communication. In this case, the cpent will search and add books to the cart. Once the cpent is done adding all the books, the server would provide the checkout cart value to the cpent.

.proto file

First let us define the bookstore.proto file in common_proto_files


syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

Here, the following block represents the name of the service "BookStore" and the function name "totalCartValue" which can be called. The "totalCartValue" function takes in the input of type "Book" which is a stream. And the function returns an object of type "Cart". So, effectively, we let the cpent add books in a streaming fashion and once the cpent is done, the server provides the total cart value to the cpent.


service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}

Now let us look at these types.


message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

The cpent would send in the "Book" it wants to buy. It may not be the complete book info; it can simply be the title of the book.


message Cart {
   int32 books = 1;
   int32 price = 2;
}

The server, on getting the pst of books, would return the "Cart" object which is nothing but the total number of books the cpent has purchased and the total price.

Note that we already had the Maven setup done for auto-generating our class files as well as our RPC code. So, now we can simply compile our project −


mvn clean install

This should auto-generate the source code required for us to use gRPC. The source code would be placed under −


Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

Setting up gRPC Server

Now that we have defined the proto file which contains the function definition, let us setup a server which can serve call these functions.

Let us write our server code to serve the above function and save it in com.tp.bookstore.BookeStoreServerCpentStreaming.java

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

pubpc class  BookeStoreServerCpentStreaming {
   private static final Logger logger = Logger.getLoggerr(BookeStoreServerCpentStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerCpentStreaming greetServer = new  BookeStoreServerCpentStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book&gt() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            pubpc void onNext(Book book) 
            logger.info("Searching for book with title starting with: " + book.getName());
            for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
               if(bookEntry.getValue().getName().startsWith(book.getName())){
                  logger.info("Found book, adding to cart:....");
                  bookCart.add(bookEntry.getValue());
               }
            }
         }
         @Override
         pubpc void onError(Throwable t) {
            logger.info("Error while reading book stream: " + t);
         }
         @Override
         pubpc void onCompleted() {
            int cartValue = 0;
            for (Book book : bookCart) {
               cartValue += book.getPrice();
            }
            responseObserver.onNext(Cart.newBuilder()
               .setPrice(cartValue)
               .setBooks(bookCart.size()).build());
            responseObserver.onCompleted();
         }
      };
 
   }
}          

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we create a gRPC server at a specified port.

    But before starting the server, we assign the server the service which we want to run, i.e., in our case, the BookStore service.

    For this purpose, we need to pass the service instance to the server, so we go ahead and create a service instance, i.e., in our case, the BookStoreImpl

    The service instance need to provide an implementation of the method/function which is present in the .proto file, i.e., in our case, the totalCartValue method.

    Now, given that this is the case of cpent streaming, the server will get a pst of Book (defined in the proto file) as the cpent adds them. The server thus returns a custom stream observer. This stream observer implements what happens when a new Book is found and what happens when the stream is closed.

    The onNext() method would be called by the gRPC framework when the cpent adds a Book. At this point, the server adds that to the cart. In case of streaming, the server does not wait for all the books available.

    When the cpent is done with the addition of Books, the stream observer s onCompleted() method is called. This method implements what the server wants to send when the cpent is done adding Book, i.e., it returns the Cart object to the cpent.

    Finally, we also have a shutdown hook to ensure clean shutting down of the server when we are done executing our code.

Setting up gRPC cpent

Now that we have written the code for the server, let us setup a cpent which can call these functions.

Let us write our cpent code to call the above function and save it in com.tp.bookstore.BookStoreCpentServerStreamingBlocking.java

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentStreamingCpent {
   private static final Logger logger = Logger.getLogger(BookStoreCpentStreaming.class.getName());
   private final BookStoreStub stub;
	private boolean serverResponseCompleted = false; 
   StreamObserver<Book> streamCpentSender;
   
   pubpc BookStoreCpentStreamingCpent(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   pubpc StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver<Cart>(){
         @Override
         pubpc void onNext(Cart cart) {
            logger.info("Order summary:" + "
Total number of Books:" + cart.getBooks() + 
               "
Total Order Value:" + cart.getPrice());
         }
         @Override
         pubpc void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   pubpc void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
 
      if(streamCpentSender == null) {
         streamCpentSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamCpentSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   pubpc void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamCpentSender != null);
      streamCpentSender.onCompleted();
   }
   pubpc static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
	   ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreCpentStreamingCpent cpent = new BookStoreCpentStreamingCpent(channel);
         String bookName = ""; 
 
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               cpent.completeOrder();
               break; 
            }
            cpent.addBook(bookName);
         }
 
         while(cpent.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
 
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we accept one argument, i.e., the title of the book we want to search for.

    We setup a Channel for gRPC communication with our server.

    Next, we create a non-blocking stub using the channel we created. This is where we are choosing the service "BookStore" whose functions we plan to call.

    Then, we simply create the expected input defined in the .proto file,i.e., in our case, Book, and we add the title that we want the server to add.

    But given this is the case of cpent streaming, we first create a stream observer for the server. This server stream observer psts the behavior on what needs to be done when the server responds, i.e., onNext()and onCompleted()

    And using the stub, we also get the cpent stream observer. We use this stream observer for sending the data, i.e., Book, to be added to the cart. We ultimately, make the call and get an iterator on vapd Books. When we iterate, we get the corresponding Books made available by the Server.

    And once our order is complete, we ensure that the cpent stream observer is closed. It tells the server to calculate the Cart Value and provide that as an output.

    Finally, we close the channel to avoid any resource leak.

So, that is our cpent code.

Cpent Server Call

To sum up, what we want to do is the following −

    Start the gRPC server.

    The Cpent adds a stream of books by notifying them to the server.

    The Server searches the book in its store and adds them to the cart.

    When the cpent is done ordering, the Server responds the total cart value of the cpent.

Now, that we have defined our proto file, written our server and the cpent code, let us proceed to execute this code and see things in action.

For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerCpentStreaming

We would see the following output −

Output


Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, pstening on 50051

The above output means the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookStoreCpentServerStreamingCpent

Let us add a few books to our cpent.


Type book name to be added to the cart....
Gr
Jul 24, 2021 5:53:07 PM 
com.tp.bookstore.BookStoreCpentStreamingCpent addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
Pa
Jul 24, 2021 5:53:20 PM 
com.tp.bookstore.BookStoreCpentStreamingCpent addBook
INFO: Adding book with title starting with: Passage

Type book name to be added to the cart....

Once we have added the books and we input "EXIT", the server then calculates the cart value and here is the output we get −

Output


EXIT
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreCpentStreamingCpent completeOrder
INFO: Done, waiting for server to create order summary...
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreCpentStreamingCpent$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

So, as we can see, the cpent was able to add books. And once all the books were added, the server responds with the total number of books and the total price.

gRPC - Bidirectional RPC

Let us see now see how the cpent-server streaming works while using gRPCcommunication. In this case, the cpent will search and add books to the cart. The server would respond with pve cart value every time a book is added.

.proto file

First let us define the bookstore.proto file in common_proto_files


syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc pveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

The following block represents the name of the service "BookStore" and the function name "pveCartValue" which can be called. The "pveCartValue" function takes in the input of type "Book" which is a stream. And the function returns a stream of object of type "Cart". So, effectively, we let the cpent add books in a streaming fashion and whenever a new book is added, the server responds the current cart value to the cpent.


service BookStore {
   rpc pveCartValue (stream Book) returns (stream Cart) {}
}

Now let us look at these types.


message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

The cpent would send in the "Book" it wants to buy. It does not have to be thecomplete book info; it can simply be title of the book.


message Cart {
   int32 books = 1;
   int32 price = 2;
}

The server, on getting the pst of books, would return the "Cart" object which is nothing but the total number of books the cpent has purchased and the totalprice.

Note that we already had the Maven setup done for auto-generating our class files as well as our RPC code. So, now we can simply compile our project:


mvn clean install

This should auto-generate the source code required for us to use gRPC. The source code would be placed under


Protobuf class code: target/generatedsources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpcjava/com.tp.bookstore

Setting up gRPC Server

Now that we have defined the proto file which contains the function definition,let us setup a server which can call these functions.

Let us write our server code to serve the above function and save it in com.tp.bookstore.BookeStoreServerBothStreaming.java −

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

pubpc class BookeStoreServerBothStreaming {
   private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
               } catch (InterruptedException e) {
                  e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc StreamObserver<Book>pveCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            int cartValue = 0;
            @Override
            pubpc void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry :bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding tocart:....");
                     bookCart.add(bookEntry.getValue());
                     cartValue +=bookEntry.getValue().getPrice();
                  }
               }
               logger.info("Updating cart value...");

               responseObserver.onNext(Cart.newBuilder()
                  .setPrice(cartValue)
                  .setBooks(bookCart.size()).build());
            }
            @Override
            pubpc void onError(Throwable t) {
               logger.info("Error while reading book stream: " + t);
            }
            @Override
            pubpc void onCompleted() {
               logger.info("Order completed");
               responseObserver.onCompleted();
            }
         };
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we create a gRPC server at a specifiedport.

    But before starting the server, we assign the server the service which we want to run, i.e., in our case, the BookStore service.

    For this purpose, we need to pass in the service instance to the server, so we go ahead and create service instance i.e. in our case theBookStoreImpl

    The service instance need to provide an implementation of the method/function which is present in the proto file, i.e., in our case, the totalCartValue method.

    Now, given that this is the case of server and cpent streaming, the server will get the pst of Books (defined in the proto file) as the cpent adds them. The server thus returns a custom stream observer. This stream observer implements what happens when a new Book is found and what happens when the stream is closed.

    The onNext() method would be called by the gRPC framework when the cpent adds a Book. At this point, the server adds that to the cart and uses the response observer to return the Cart Value. In case of streaming, the server does not wait for all the vapd books to be available.

    When the cpent is done with the addition of Books, the stream observer s onCompleted() method is called. This method implements what the server wants to do when the cpent is done adding the Books, i.e., claim it is done with taking the cpent order.

    Finally, we also have a shutdown hook to ensure clean shutting down of the server when we are done executing our code.

Setting up gRPC Cpent

Now that we have written the code for the server, let us setup a cpent which can call these functions.

Let us write our cpent code to call the above function and save it in com.tp.bookstore.BookStoreCpentBothStreaming.java

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentBothStreaming {
   private static final Logger logger = Logger.getLogger(BookStoreCpentBothStreaming.class.getName());
   private final BookStoreStub stub;
   private boolean serverIntermediateResponseCompleted = true;
   private boolean serverResponseCompleted = false;

   StreamObserver<Book> streamCpentSender;
   
   pubpc BookStoreCpentBothStreaming(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   pubpc StreamObserver>Cart< getServerResponseObserver(){
      StreamObserver>Cart< observer = new StreamObserver<Cart>(){
         @Override
         pubpc void onNext(Cart cart) {
            logger.info("Order summary:" + 
               "
Total number of Books:" + cart.getBooks()+ 
               "
Total Order Value:" cart.getPrice());

            serverIntermediateResponseCompleted = true;
         }
         @Override
         pubpc void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         pubpc void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   pubpc void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
      if(streamCpentSender == null) {
         streamCpentSender =stub.pveCartValue(getServerResponseObserver());
      }
      try {
         streamCpentSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   pubpc void completeOrder() {
      logger.info("Done, waiting for server to create ordersummary...");
      if(streamCpentSender != null); {
         streamCpentSender.onCompleted();
      }
   }
   pubpc static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreCpentBothStreaming cpent = new
         BookStoreCpentBothStreaming(channel);
         String bookName = "";

         while(true) {
            if(cpent.serverIntermediateResponseCompleted ==true) {
               System.out.println("Type book name to beadded to the cart....");
               bookName = System.console().readLine();
               if(bookName.equals("EXIT")) {
                  cpent.completeOrder();
                  break;
               }
               cpent.serverIntermediateResponseCompleted = false;
               cpent.addBook(bookName);
               Thread.sleep(500);
            }
         }
         while(cpent.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
            
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }   
   }
}

The above code starts a gRPC cpent and connects to a server at a specified port and call the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we accept the name of the books to be added to the cart. Once all the books are to be added, the user is expected to print "EXIT".

    We setup a Channel for gRPC communication with our server.

    Next, we create a non-blocking stub using the channel. This is where we are choosing the service "BookStore" whose functions we plan to call.

    Then, we simply create the expected input defined in the proto file, i.e., in our case, Book, and we add the title we want the server to add.

    But given that this is the case of both server and cpent streaming, we first create a stream observer for the server. This server stream observer psts the behavior on what needs to be done when the server responds, i.e., onNext() and onCompleted().

    And using the stub, we also get the cpent stream observer. We use this stream observer for sending the data, i.e., the Book to be added to the cart.

    And once our order is complete, we ensure that the cpent stream observer is closed. This tells the server to close the stream and perform the cleanup.

    Finally, we close the channel to avoid any resource leak.

So, that is our cpent code.

Cpent Server Call

To sum up, what we want to do is the following −

    Start the gRPC server.

    The Cpent adds a stream of books by notifying them to the server.

    The Server searches the book in its store and adds them to the cart.

    With each book addition, the server tells the cpent about the cart value.

    When the cpent is done ordering, both the server and the cpent close the stream.

Now that we have defined our proto file, written our server and the cpent code, let us now execute this code and see things in action.

For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerCpentStreaming

We would see the following output −

Output


Jul 03, 2021 10:37:21 PM
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, pstening on 50051

This output imppes that the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookStoreCpentBothStreaming

Let us add a book to our cpent.


Jul 24, 2021 7:21:45 PM
com.tp.bookstore.BookStoreCpentBothStreaming main
Type book name to be added to the cart....
Great

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreCpentBothStreaming addBook
INFO: Adding book with title starting with: Gr

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreCpentBothStreaming$1 onNext
INFO: Order summary:

Total number of Books: 1
Total Order Value: 300

So, as we can see, we get the current cart value of the order. Let us now add one more book to our cpent.


Type book name to be added to the cart....
Passage

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreCpentBothStreaming addBook
INFO: Adding book with title starting with: Pa

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreCpentBothStreaming$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

Once we have added the books and we input "EXIT", the cpent shuts down.


Type book name to be added to the cart....
EXIT
Jul 24, 2021 7:21:59 PM
com.tp.bookstore.BookStoreCpentBothStreaming completeOrder
INFO: Done, waiting for server to create order summary...

So, as we can see the cpent was able to add books. And as the books are being added, the server responds with the current cart value.

gRPC - Cpent Calls

gRPC cpent supports two types of cpent calls, i.e., how the cpent calls the server. Following are the two ways −

    Blocking cpent call

    Async cpent call

In this chapter, we will look at both of them one by one.

Blocking Cpent Calls

gRPC supports blocking cpent call. What this means is that once the cpent makes the call to the service, the cpent would not proceed with rest of the code execution until it gets the response back from the server. Note that a blocking cpent call is possible for unary calls and server streaming calls.

Note that a blocking cpent call is possible for unary calls and server streaming calls.

Here is an example of a unary blocking cpent call.

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentUnaryBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreCpentUnaryBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   pubpc BookStoreCpentUnaryBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   pubpc void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request = BookSearch.newBuilder().setName(bookName).build();

      Book response;
      try {
         response = blockingStub.first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   }
   pubpc static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreCpentUnaryBlocking cpent = new
         BookStoreCpentUnaryBlocking(channel);
         cpent.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

In the above example, we have,


pubpc BookStoreCpentUnaryBlocking(Channel channel) {
   blockingStub = BookStoreGrpc.newBlockingStub(channel);
}

which means we will be using a blocking RPC call.

And then, we have,


BookSearch request = BookSearch.newBuilder().setName(bookName).build();

Book response;
response = blockingStub.first(request);

This is where we use the blockingStub to call the RPC method first() to get the book details.

Similarly, for server streaming, we can use the blocking stub −


logger.info("Querying for book with author: " + author);
BookSearch request =
BookSearch.newBuilder().setAuthor(author).build();

Iterator<Book> response;
try {
   response = blockingStub.searchByAuthor(request);
   while(response.hasNext()) {
   logger.info("Found book: " + response.next());
}

Where we call the RPC method searchByAuthor method and iterate over the response till the server stream has not ended.

Non-Blocking Cpent Calls

gRPC supports non-blocking cpent calls. What this means is that when the cpent makes a call to the service, it does not need to wait for the server response. To handle the server response, the cpent can simply pass in the observer which dictates what to do when the response is received. Note that a non-blocking cpent call is possible for unary calls as well as streaming calls. However, we would specifically look at the case of server streaming call to compare it against a blocking call.

Note that a non-blocking cpent call is possible for unary calls as well as streaming calls. However, we would specifically look at the case of server streaming call to compare it against a blocking call.

Here is an example of a server streaming non-blocking cpent call

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentServerStreamingNonBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreCpentServerStreamingNonBlocking.class.getName());
   private final BookStoreGrpc.BookStoreStub nonBlockingStub;
   pubpc BookStoreCpentServerStreamingNonBlocking(Channelchannel) {
      nonBlockingStub = BookStoreGrpc.newStub(channel);
   }
   pubpc StreamObserver<Book> getServerResponseObserver(){
      StreamObserver<Book> observer = new
      StreamObserver<Book>(){
         @Override
         pubpc void onNext(Book book) {
            logger.info("Server returned following book: " +book);
         }
         @Override
         pubpc void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         pubpc void onCompleted() {
            logger.info("Server returned following book: " + book);
         }
      };
      return observer;
   }
   pubpc void getBook(String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
      try {
         nonBlockingStub.searchByAuthor(request,getServerResponseObserver());
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
   }
   pubpc static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreCpentServerStreamingNonBlocking cpent = new
         BookStoreCpentServerStreamingNonBlocking(channel);
         cpent.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

As we see in the above example,


pubpc BookStoreCpentUnaryNonBlocking(Channel channel) {
   nonBlockingStub = BookStoreGrpc.newStub(channel);
}

It defines that the stub is a non-blocking one. Similarly, the following code is used to handle the response that we get from the server. Once the server sends the response, we log the output.


pubpc StreamObserver<Book> getServerResponseObserver(){
   StreamObserver<Book> observer = new
   StreamObserver<Book>(){
   ....
   ....
   return observer;
}

The following gRPC call is a non-blocking call.


logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
try {
   nonBlockingStub.searchByAuthor(request, getServerResponseObserver());
}

This is how we ensure that our cpent does not need to wait till we have the server complete the execution of searchByAuthor. That would be handled directly by the stream observer object as and when the server returns the Book objects.

gRPC - Timeouts & Cancellation

gRPC supports assigning timeouts to the requests. It is a way to perform cancellation of requests. It helps to avoid using the resources for both the cpent and the server for a request whose result would not be useful to the cpent.

Request Timeout

gRPC supports specifying a timeout for both cpent as well as the server.

    The Cpent can specify during runtime the amount of time it wants to wait before cancelpng the request.

    The Server can also check on its end whether the requests need to be catered to or has the Cpent already given up on the request.

Let us take an example where the cpent expects a response in 2 seconds, but the server takes a longer time. So, here is our server code.

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

pubpc class BookeStoreServerUnaryTimeout {
   private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
        .setAuthor("Scott Fitzgerald")
        .setPrice(300).build());
      bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
        .setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
        .setAuthor("E.M.Forster")
        .setPrice(500).build());
      bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
        .setAuthor("Scott Fitzgerald")
        .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
        .setAuthor("Harper Lee")
        .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerUnaryTimeout greetServer = new
      BookeStoreServerUnaryTimeout();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends
   BookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " +searchQuery.getName());
         logger.info("This may take more time...");
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         List<String> matchingBookTitles = bookMap.keySet().stream()
            .filter(title ->
         title.startsWith(searchQuery.getName().trim()))
            .collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

In the above code, the server searches for the book the title of which the cpent has provided. We added a dummy sleep so that we can see the request getting timed out.

And here is our cpent code

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentUnaryBlockingTimeout {
   private static final Logger logger = Logger.getLogger(BookStoreCpentUnaryBlockingTimeout.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   pubpc BookStoreCpentUnaryBlockingTimeout(Channel channel){
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   pubpc void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request =BookSearch.newBuilder().setName(bookName).build();
      Book response;
      try {
         response = blockingStub.withDeadpneAfter(2,TimeUnit.SECONDS).first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
      logger.info("Got following book from server: " +response);
   }
   pubpc static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreCpentUnaryBlockingTimeout cpent = newBookStoreCpentUnaryBlockingTimeout(channel);
         cpent.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5,
         TimeUnit.SECONDS);
      }
   }
}

The above code calls the server with a title to search for. But more importantly, it provides a timeout of 2 seconds to the gRPC call.

Let us now see this in action. For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerUnaryTimeout

We would see the following output −

Output


Jul 31, 2021 12:29:31 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout start
INFO: Server started, pstening on 50051

The above output shows that the server has started.


Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: Searching for book with title: Great

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: This may take more time...

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookStoreCpentUnaryBlockingTimeout Great

We would get the following output −

Output


Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreCpentUnaryBlockingTimeout getBook

INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreCpentUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadpne exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}

So, as we can see, the cpent did not get the response in 2 seconds, hence it cancelled the request and termed it as a timeout, i.e., DEADLINE_EXCEEDED

Request Cancellation

gRPC supports cancepng of requests both from the cpent as well as the server side. The Cpent can specify during runtime the amount of time it wants to wait before cancelpng the request. The Server can also check on its end whether the requests need to be catered to or has the cpent already given up on the request.

Let us look at an example of cpent streaming, where the cpent invokes cancellation. So, here is our server code

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

pubpc class BookeStoreServerCpentStreaming {
   private static final Logger logger = Logger.getLogger(BookeStoreServerCpentStreaming.class.getName());
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott   Fitzgerald").setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();

      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerCpentStreaming greetServer = newBookeStoreServerCpentStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc StreamObserver<Book>
      totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            pubpc void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding to cart:....");
                     bookCart.add(bookEntry.getValue());
                  }
               }
            }
            @Override
            pubpc void onError(Throwable t) {
               logger.info("Error while reading book stream:" + t);
            }
            @Override
            pubpc void onCompleted() {
               int cartValue = 0;
               for (Book book : bookCart) {
                  cartValue += book.getPrice();
               }
               responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
               responseObserver.onCompleted();
            }
         };   
      }
   }
}

This server code is a simple example of cpent side streaming. The server simply tracks the books which the cpent wants and at the end, it provides the total Cart Value of the order.

But there is nothing special here with respect to cancellation of request as that is something the cpent would invoke. So, let us look at the cpent code.


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;

pubpc class BookStoreCpentStreamingCpentCancelation {
   private static final Logger logger = Logger.getLogger(BookStoreCpentStreamingCpentCancelation.class.getName());
   private final BookStoreStub stub;
   StreamObserver<Book> streamCpentSender;
   private CancellableContext withCancellation;
   pubpc BookStoreCpentStreamingCpentCancelation(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   pubpc StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver>Cart<(){
         @Override
         pubpc void onNext(Cart cart) {
            logger.info("Order summary:" 
               + "
Total number of Books: " 
               + cart.getBooks() 
               + "
Total Order Value: " 
               + cart.getPrice());
         }
         @Override
         pubpc void onError(Throwable t) {
            logger.info("Error while reading response from Server: " + t);
         }
         @Override
            pubpc void onCompleted() {
         }
      };
      return observer;
   }
   pubpc void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();

      if(streamCpentSender == null) {
         withCancellation = Context.current().withCancellation();
         streamCpentSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamCpentSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   pubpc void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamCpentSender != null);
      streamCpentSender.onCompleted();
   }
   pubpc void cancelOrder() {
      withCancellation.cancel(null);
   }
   pubpc static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreCpentStreamingCpentCancelation cpent = new BookStoreCpentStreamingCpentCancelation(channel); String bookName = "";
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               cpent.completeOrder();
               break;
            }
            if(bookName.equals("CANCEL")) {
               cpent.cancelOrder();
               break;
            }
            cpent.addBook(bookName);
         }
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

So, if we see the above code, the following pne defines a context which has cancellation enabled.


withCancellation = Context.current().withCancellation();

And here is the method which would be called when the user types CANCEL. This would cancel the order and also let the server know about it.


pubpc void cancelOrder() {
   withCancellation.cancel(null);
}

Let us now see this in action. For running the code, fire up two shells. Start the server on the first shell by executing the following command.


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerCpentStreaming

We would get to see the following output −

Output


Jul 31, 2021 3:29:58 PM
com.tp.bookstore.BookeStoreServerCpentStreaming start
INFO: Server started, pstening on 50051

The above output imppes that the server has started.

Now, let us start the cpent


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookStoreCpentStreamingCpentCancelation

We would get the following output −

Output


Type book name to be added to the cart....
Great
Jul 31, 2021 3:30:55 PM
com.tp.bookstore.BookStoreCpentStreamingCpentCancelation
addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
CANCEL
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookStoreCpentStreamingCpentCancelation$1
onError
INFO: Error while reading response from Server:

io.grpc.StatusRuntimeException: UNAVAILABLE: Channel
shutdownNow invoked

And we would get the following data in the server logs −


INFO: Searching for book with title starting with: Great
Jul 31, 2021 3:30:56 PM
com.tp.bookstore.BookeStoreServerCpentStreaming$BookStoreImp
l$1 onNext
INFO: Found book, adding to cart:....
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookeStoreServerCpentStreaming$BookStoreImp
l$1 onError
INFO: Error while reading book stream:
io.grpc.StatusRuntimeException: CANCELLED: cpent cancelled

So, as we can see, the cpent initiated a cancellation of the request it made to the server. The server was also notified about the cancellation.

gRPC - Send/Receive Metadata

gRPC supports sending metadata. The metadata is basically a set of data we want to send that is not part of the business logic, while making gRPC calls.

Let us look at the following two cases −

    The Cpent sends Metadata and the Server reads it.

    The Server sends Metadata and Cpent reads it.

We will go through both these cases one by one.

Cpent Sends Metadata

As mentioned, gRPC supports the cpent sending the metadata which the server can read. gRPC supports extending the cpent and server interceptor which can be used to write and read metadata, respectively. Let us take an example to understand it better. Here is our cpent code which sends the hostname as metadata −

Let us take an example to understand it better. Here is our cpent code which sends the hostname as metadata −

Example


package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.CpentCall;
import io.grpc.CpentInterceptor;
import io.grpc.ForwardingCpentCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

pubpc class BookStoreCpentUnaryBlockingMetadata {
   private static final Logger logger = Logger.getLogger(BookStoreCpentUnaryBlockingMetadata.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
   pubpc BookStoreCpentUnaryBlockingMetadata(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   static class BookCpentInterceptor implements CpentInterceptor {
      @Override
      pubpc <ReqT, RespT> CpentCall<ReqT, RespT>
      interceptCall(
         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next
      ) {
         return new 
         ForwardingCpentCall.SimpleForwardingCpentCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            @Override
            pubpc void start(Listener<RespT> responseListener, Metadata headers) {
               logger.info("Added metadata");
               headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
               super.start(responseListener, headers);
            }
         };
      }
   }
   pubpc void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName); 
      BookSearch request = BookSearch.newBuilder().setName(bookName).build(); 
      Book response; 
      CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key");
      try {
         response = blockingStub.withOption(metaDataKey, "bar").first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   } 
   pubpc static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =  ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookCpentInterceptor()).build();
      try {
         BookStoreCpentUnaryBlockingMetadata cpent = new BookStoreCpentUnaryBlockingMetadata(channel);
         cpent.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

The interesting bit here is the interceptor.


static class BookCpentInterceptor implements CpentInterceptor{
   @Override
   pubpc <ReqT, RespT> CpentCall<ReqT, RespT>
   interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
      return new 
      ForwardingCpentCall.SimpleForwardingCpentCall<ReqT, RespT>(next.newCall(method, callOptions)) {
         @Override
         pubpc void start(Listener<RespT>responseListener, Metadata headers) {
            logger.info("Added metadata");
            headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
            super.start(responseListener, headers);
         }
      };
   }
}

We intercept any call which is being made by the cpent and then add hostname metadata to it before it is called further.

Server Reads Metadata

Now, let us look at the server code which reads this metadata −


package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

pubpc class BookeStoreServerMetadata {
   private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
   }
   private Server server;
   class BookServerInterceptor implements ServerInterceptor{
      @Override
      pubpc <ReqT, RespT> Listener<ReqT> 
      interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
 
         logger.info("Recieved following metadata: " + headers);
         return next.startCall(call, headers);
      } 
   }
   private void start() throws IOException {
      int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start();
      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " + searchQuery.getName());
         List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

Again, the interesting bit here is the interceptor.


class BookServerInterceptor implements ServerInterceptor{
   @Override
   pubpc <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
      logger.info("Recieved following metadata: " + headers);
      return next.startCall(call, headers);
   }
}

We intercept any call which is incoming to the server and then log the metadata before the call can be handled by the actual method.

Cpent-Server Call

Let us now see this in action. For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerMetadata

We would see the following output −

Output


Jul 31, 2021 5:29:14 PM 
com.tp.bookstore.BookeStoreServerMetadata start
INFO: Server started, pstening on 50051

The above output imppes that the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookStoreCpentUnaryBlockingMetadata Great

We would see the following output −

Output


Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreCpentUnaryBlockingMetadata getBook
INFO: Querying for book with title: Great
Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreCpentUnaryBlockingMetadata$BookCp
entInterceptor$1 start
INFO: Added metadata
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookStoreCpentUnaryBlockingMetadata getBook
INFO: Got following book from server: name: "Great Gatsby"
author: "Scott Fitzgerald"
price: 300

And we will have the following data in the server logs −


Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept
or interceptCall
INFO: Recieved following metadata:
Metadata(content-type=apppcation/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip)
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first
INFO: Searching for book with title: Great

As we can see, the server is able to read the metadata: hostname=MY_HOST which was added by the cpent.

Advertisements