English 中文(简体)
gRPC - Bidirectional RPC
  • 时间:2024-09-17

gRPC - Bidirectional RPC


Previous Page Next Page  

Let us see now see how the cpent-server streaming works while using gRPCcommunication. In this case, the cpent will search and add books to the cart. The server would respond with pve cart value every time a book is added.

.proto file

First let us define the bookstore.proto file in common_proto_files


syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc pveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

The following block represents the name of the service "BookStore" and the function name "pveCartValue" which can be called. The "pveCartValue" function takes in the input of type "Book" which is a stream. And the function returns a stream of object of type "Cart". So, effectively, we let the cpent add books in a streaming fashion and whenever a new book is added, the server responds the current cart value to the cpent.


service BookStore {
   rpc pveCartValue (stream Book) returns (stream Cart) {}
}

Now let us look at these types.


message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

The cpent would send in the "Book" it wants to buy. It does not have to be thecomplete book info; it can simply be title of the book.


message Cart {
   int32 books = 1;
   int32 price = 2;
}

The server, on getting the pst of books, would return the "Cart" object which is nothing but the total number of books the cpent has purchased and the totalprice.

Note that we already had the Maven setup done for auto-generating our class files as well as our RPC code. So, now we can simply compile our project:


mvn clean install

This should auto-generate the source code required for us to use gRPC. The source code would be placed under


Protobuf class code: target/generatedsources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpcjava/com.tp.bookstore

Setting up gRPC Server

Now that we have defined the proto file which contains the function definition,let us setup a server which can call these functions.

Let us write our server code to serve the above function and save it in com.tp.bookstore.BookeStoreServerBothStreaming.java −

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

pubpc class BookeStoreServerBothStreaming {
   private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
               } catch (InterruptedException e) {
                  e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc StreamObserver<Book>pveCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            int cartValue = 0;
            @Override
            pubpc void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry :bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding tocart:....");
                     bookCart.add(bookEntry.getValue());
                     cartValue +=bookEntry.getValue().getPrice();
                  }
               }
               logger.info("Updating cart value...");

               responseObserver.onNext(Cart.newBuilder()
                  .setPrice(cartValue)
                  .setBooks(bookCart.size()).build());
            }
            @Override
            pubpc void onError(Throwable t) {
               logger.info("Error while reading book stream: " + t);
            }
            @Override
            pubpc void onCompleted() {
               logger.info("Order completed");
               responseObserver.onCompleted();
            }
         };
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we create a gRPC server at a specifiedport.

    But before starting the server, we assign the server the service which we want to run, i.e., in our case, the BookStore service.

    For this purpose, we need to pass in the service instance to the server, so we go ahead and create service instance i.e. in our case theBookStoreImpl

    The service instance need to provide an implementation of the method/function which is present in the proto file, i.e., in our case, the totalCartValue method.

    Now, given that this is the case of server and cpent streaming, the server will get the pst of Books (defined in the proto file) as the cpent adds them. The server thus returns a custom stream observer. This stream observer implements what happens when a new Book is found and what happens when the stream is closed.

    The onNext() method would be called by the gRPC framework when the cpent adds a Book. At this point, the server adds that to the cart and uses the response observer to return the Cart Value. In case of streaming, the server does not wait for all the vapd books to be available.

    When the cpent is done with the addition of Books, the stream observer s onCompleted() method is called. This method implements what the server wants to do when the cpent is done adding the Books, i.e., claim it is done with taking the cpent order.

    Finally, we also have a shutdown hook to ensure clean shutting down of the server when we are done executing our code.

Setting up gRPC Cpent

Now that we have written the code for the server, let us setup a cpent which can call these functions.

Let us write our cpent code to call the above function and save it in com.tp.bookstore.BookStoreCpentBothStreaming.java

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentBothStreaming {
   private static final Logger logger = Logger.getLogger(BookStoreCpentBothStreaming.class.getName());
   private final BookStoreStub stub;
   private boolean serverIntermediateResponseCompleted = true;
   private boolean serverResponseCompleted = false;

   StreamObserver<Book> streamCpentSender;
   
   pubpc BookStoreCpentBothStreaming(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   pubpc StreamObserver>Cart< getServerResponseObserver(){
      StreamObserver>Cart< observer = new StreamObserver<Cart>(){
         @Override
         pubpc void onNext(Cart cart) {
            logger.info("Order summary:" + 
               "
Total number of Books:" + cart.getBooks()+ 
               "
Total Order Value:" cart.getPrice());

            serverIntermediateResponseCompleted = true;
         }
         @Override
         pubpc void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         pubpc void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   pubpc void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
      if(streamCpentSender == null) {
         streamCpentSender =stub.pveCartValue(getServerResponseObserver());
      }
      try {
         streamCpentSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   pubpc void completeOrder() {
      logger.info("Done, waiting for server to create ordersummary...");
      if(streamCpentSender != null); {
         streamCpentSender.onCompleted();
      }
   }
   pubpc static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreCpentBothStreaming cpent = new
         BookStoreCpentBothStreaming(channel);
         String bookName = "";

         while(true) {
            if(cpent.serverIntermediateResponseCompleted ==true) {
               System.out.println("Type book name to beadded to the cart....");
               bookName = System.console().readLine();
               if(bookName.equals("EXIT")) {
                  cpent.completeOrder();
                  break;
               }
               cpent.serverIntermediateResponseCompleted = false;
               cpent.addBook(bookName);
               Thread.sleep(500);
            }
         }
         while(cpent.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
            
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }   
   }
}

The above code starts a gRPC cpent and connects to a server at a specified port and call the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we accept the name of the books to be added to the cart. Once all the books are to be added, the user is expected to print "EXIT".

    We setup a Channel for gRPC communication with our server.

    Next, we create a non-blocking stub using the channel. This is where we are choosing the service "BookStore" whose functions we plan to call.

    Then, we simply create the expected input defined in the proto file, i.e., in our case, Book, and we add the title we want the server to add.

    But given that this is the case of both server and cpent streaming, we first create a stream observer for the server. This server stream observer psts the behavior on what needs to be done when the server responds, i.e., onNext() and onCompleted().

    And using the stub, we also get the cpent stream observer. We use this stream observer for sending the data, i.e., the Book to be added to the cart.

    And once our order is complete, we ensure that the cpent stream observer is closed. This tells the server to close the stream and perform the cleanup.

    Finally, we close the channel to avoid any resource leak.

So, that is our cpent code.

Cpent Server Call

To sum up, what we want to do is the following −

    Start the gRPC server.

    The Cpent adds a stream of books by notifying them to the server.

    The Server searches the book in its store and adds them to the cart.

    With each book addition, the server tells the cpent about the cart value.

    When the cpent is done ordering, both the server and the cpent close the stream.

Now that we have defined our proto file, written our server and the cpent code, let us now execute this code and see things in action.

For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerCpentStreaming

We would see the following output −

Output


Jul 03, 2021 10:37:21 PM
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, pstening on 50051

This output imppes that the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookStoreCpentBothStreaming

Let us add a book to our cpent.


Jul 24, 2021 7:21:45 PM
com.tp.bookstore.BookStoreCpentBothStreaming main
Type book name to be added to the cart....
Great

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreCpentBothStreaming addBook
INFO: Adding book with title starting with: Gr

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreCpentBothStreaming$1 onNext
INFO: Order summary:

Total number of Books: 1
Total Order Value: 300

So, as we can see, we get the current cart value of the order. Let us now add one more book to our cpent.


Type book name to be added to the cart....
Passage

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreCpentBothStreaming addBook
INFO: Adding book with title starting with: Pa

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreCpentBothStreaming$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

Once we have added the books and we input "EXIT", the cpent shuts down.


Type book name to be added to the cart....
EXIT
Jul 24, 2021 7:21:59 PM
com.tp.bookstore.BookStoreCpentBothStreaming completeOrder
INFO: Done, waiting for server to create order summary...

So, as we can see the cpent was able to add books. And as the books are being added, the server responds with the current cart value.

Advertisements