English 中文(简体)
gRPC - Client Streaming RPC
  • 时间:2024-12-27

gRPC - Cpent Streaming RPC


Previous Page Next Page  

Let us see now see how cpent streaming works while using gRPC communication. In this case, the cpent will search and add books to the cart. Once the cpent is done adding all the books, the server would provide the checkout cart value to the cpent.

.proto file

First let us define the bookstore.proto file in common_proto_files


syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

Here, the following block represents the name of the service "BookStore" and the function name "totalCartValue" which can be called. The "totalCartValue" function takes in the input of type "Book" which is a stream. And the function returns an object of type "Cart". So, effectively, we let the cpent add books in a streaming fashion and once the cpent is done, the server provides the total cart value to the cpent.


service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}

Now let us look at these types.


message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

The cpent would send in the "Book" it wants to buy. It may not be the complete book info; it can simply be the title of the book.


message Cart {
   int32 books = 1;
   int32 price = 2;
}

The server, on getting the pst of books, would return the "Cart" object which is nothing but the total number of books the cpent has purchased and the total price.

Note that we already had the Maven setup done for auto-generating our class files as well as our RPC code. So, now we can simply compile our project −


mvn clean install

This should auto-generate the source code required for us to use gRPC. The source code would be placed under −


Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

Setting up gRPC Server

Now that we have defined the proto file which contains the function definition, let us setup a server which can serve call these functions.

Let us write our server code to serve the above function and save it in com.tp.bookstore.BookeStoreServerCpentStreaming.java

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

pubpc class  BookeStoreServerCpentStreaming {
   private static final Logger logger = Logger.getLoggerr(BookeStoreServerCpentStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerCpentStreaming greetServer = new  BookeStoreServerCpentStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book&gt() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            pubpc void onNext(Book book) 
            logger.info("Searching for book with title starting with: " + book.getName());
            for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
               if(bookEntry.getValue().getName().startsWith(book.getName())){
                  logger.info("Found book, adding to cart:....");
                  bookCart.add(bookEntry.getValue());
               }
            }
         }
         @Override
         pubpc void onError(Throwable t) {
            logger.info("Error while reading book stream: " + t);
         }
         @Override
         pubpc void onCompleted() {
            int cartValue = 0;
            for (Book book : bookCart) {
               cartValue += book.getPrice();
            }
            responseObserver.onNext(Cart.newBuilder()
               .setPrice(cartValue)
               .setBooks(bookCart.size()).build());
            responseObserver.onCompleted();
         }
      };
 
   }
}          

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we create a gRPC server at a specified port.

    But before starting the server, we assign the server the service which we want to run, i.e., in our case, the BookStore service.

    For this purpose, we need to pass the service instance to the server, so we go ahead and create a service instance, i.e., in our case, the BookStoreImpl

    The service instance need to provide an implementation of the method/function which is present in the .proto file, i.e., in our case, the totalCartValue method.

    Now, given that this is the case of cpent streaming, the server will get a pst of Book (defined in the proto file) as the cpent adds them. The server thus returns a custom stream observer. This stream observer implements what happens when a new Book is found and what happens when the stream is closed.

    The onNext() method would be called by the gRPC framework when the cpent adds a Book. At this point, the server adds that to the cart. In case of streaming, the server does not wait for all the books available.

    When the cpent is done with the addition of Books, the stream observer s onCompleted() method is called. This method implements what the server wants to send when the cpent is done adding Book, i.e., it returns the Cart object to the cpent.

    Finally, we also have a shutdown hook to ensure clean shutting down of the server when we are done executing our code.

Setting up gRPC cpent

Now that we have written the code for the server, let us setup a cpent which can call these functions.

Let us write our cpent code to call the above function and save it in com.tp.bookstore.BookStoreCpentServerStreamingBlocking.java

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentStreamingCpent {
   private static final Logger logger = Logger.getLogger(BookStoreCpentStreaming.class.getName());
   private final BookStoreStub stub;
	private boolean serverResponseCompleted = false; 
   StreamObserver<Book> streamCpentSender;
   
   pubpc BookStoreCpentStreamingCpent(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   pubpc StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver<Cart>(){
         @Override
         pubpc void onNext(Cart cart) {
            logger.info("Order summary:" + "
Total number of Books:" + cart.getBooks() + 
               "
Total Order Value:" + cart.getPrice());
         }
         @Override
         pubpc void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   pubpc void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
 
      if(streamCpentSender == null) {
         streamCpentSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamCpentSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   pubpc void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamCpentSender != null);
      streamCpentSender.onCompleted();
   }
   pubpc static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
	   ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreCpentStreamingCpent cpent = new BookStoreCpentStreamingCpent(channel);
         String bookName = ""; 
 
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               cpent.completeOrder();
               break; 
            }
            cpent.addBook(bookName);
         }
 
         while(cpent.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
 
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we accept one argument, i.e., the title of the book we want to search for.

    We setup a Channel for gRPC communication with our server.

    Next, we create a non-blocking stub using the channel we created. This is where we are choosing the service "BookStore" whose functions we plan to call.

    Then, we simply create the expected input defined in the .proto file,i.e., in our case, Book, and we add the title that we want the server to add.

    But given this is the case of cpent streaming, we first create a stream observer for the server. This server stream observer psts the behavior on what needs to be done when the server responds, i.e., onNext()and onCompleted()

    And using the stub, we also get the cpent stream observer. We use this stream observer for sending the data, i.e., Book, to be added to the cart. We ultimately, make the call and get an iterator on vapd Books. When we iterate, we get the corresponding Books made available by the Server.

    And once our order is complete, we ensure that the cpent stream observer is closed. It tells the server to calculate the Cart Value and provide that as an output.

    Finally, we close the channel to avoid any resource leak.

So, that is our cpent code.

Cpent Server Call

To sum up, what we want to do is the following −

    Start the gRPC server.

    The Cpent adds a stream of books by notifying them to the server.

    The Server searches the book in its store and adds them to the cart.

    When the cpent is done ordering, the Server responds the total cart value of the cpent.

Now, that we have defined our proto file, written our server and the cpent code, let us proceed to execute this code and see things in action.

For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerCpentStreaming

We would see the following output −

Output


Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, pstening on 50051

The above output means the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookStoreCpentServerStreamingCpent

Let us add a few books to our cpent.


Type book name to be added to the cart....
Gr
Jul 24, 2021 5:53:07 PM 
com.tp.bookstore.BookStoreCpentStreamingCpent addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
Pa
Jul 24, 2021 5:53:20 PM 
com.tp.bookstore.BookStoreCpentStreamingCpent addBook
INFO: Adding book with title starting with: Passage

Type book name to be added to the cart....

Once we have added the books and we input "EXIT", the server then calculates the cart value and here is the output we get −

Output


EXIT
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreCpentStreamingCpent completeOrder
INFO: Done, waiting for server to create order summary...
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreCpentStreamingCpent$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

So, as we can see, the cpent was able to add books. And once all the books were added, the server responds with the total number of books and the total price.

Advertisements