English 中文(简体)
gRPC - Server Streaming RPC
  • 时间:2024-09-17

gRPC - Server Streaming RPC


Previous Page Next Page  

Let us now discuss how server streaming works while using gRPC communication. In this case, the cpent will search for books with a given author. Assume the server requires some time to go through all the books. Instead of waiting to give all the books after going through all the books, the server instead would provide books in a streaming fashion, i.e., as soon as it finds one.

.proto file

First let us define the bookstore.proto file in common_proto_files −


syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc first (BookSearch) returns (stream Book) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

The following block represents the name of the service "BookStore" and the function name "searchByAuthor" which can be called. The "searchByAuthor" function takes in the input of type "BookSearch" and returns the stream of type "Book". So, effectively, we let the cpent search for a title and return one of the book matching the author queried for.


service BookStore {
   rpc searchByAuthor (BookSearch) returns (stream Book) {}
}

Now let us look at these types.


message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}

Here, we have defined BookSearch which contains a few attributes pke name, author and genre. The cpent is supposed to send the object of type "BookSearch" to the server.


message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

We have also defined that, given a "BookSearch", the server would return a stream of "Book" which contains the book attributes along with the price of the book. The server is supposed to send a stream of "Book".

Note that we already had the Maven setup done for auto-generating our class files as well as our RPC code. So, now we can simply compile our project −


mvn clean install

This should auto-generate the source code required for us to use gRPC. The source code would be placed under −


Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

Setting up gRPC Server

Now that we have defined the proto file which contains the function definition, let us setup a server which can serve call these functions.

Let us write our server code to serve the above function and save it in com.tp.bookstore.BookeStoreServerStreaming.java

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

pubpc class BookeStoreServerUnary {
   private static final Logger logger = Logger.getLogger(BookeStoreServerrStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, pstening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc void searchByAuthor(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with author: " + searchQuery.getAuthor());
         for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
            try {
               logger.info("Going through more books....");
               Thread.sleep(5000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
    
            if(bookEntry.getValue().getAuthor().startsWith(searchQuery.getAuthor())){
               logger.info("Found book with required author: " + bookEntry.getValue().getName()+ ". Sending....");

               responseObserver.onNext(bookEntry.getValue());
            } 
         }
         responseObserver.onCompleted();
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we create a gRPC server at a specified port.

    But before starting the server, we assign the server the service which we want to run, i.e., in our case, the BookStore service.

    For this purpose, we need to pass the service instance to the server, so we go ahead and create a service instance, i.e., in our case, the BookStoreImpl

    The service instance need to provide an implementation of the method/function which is present in the .proto file, i.e., in our case, the searchByAuthor method.

    The method expects an object of type as defined in the .proto file, i.e.,for us the BookSearch

    Note that we have added a sleep to mimic the operation of searching through all the books. In case of streaming, the server does not wait for all the searched books to be available. It returns the book as soon it is available by using the onNext() call.

    When the server is done with the request, it shuts down the channel by calpng onCompleted().

    Finally, we also have a shutdown hook to ensure clean shutting down of the server when we are done executing our code.

Setting up gRPC Cpent

Now that we have written the code for the server, let us setup a cpent which can call these functions.

Let us write our cpent code to call the above function and save it in com.tp.bookstore.BookStoreCpentServerStreamingBlocking.java

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentServerStreamingBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreCpentServerStreamingBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
	pubpc BookStoreCpentServerStreamingBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   pubpc void getBook((String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder()..setAuthor(author).build();
      Iterator<Book> response; 
      try {
         response = blockingStub.searchByAuthor(request);
         while(response.hasNext()) {
            logger.info("Found book: " + response.next());
         }
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
   }
   pubpc static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
	   
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
        .usePlaintext()
        .build();
 
      try {
         BookStoreCpentServerStreamingBlocking cpent = new BookStoreCpentUnaryBlocking(channel);
         cpent.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

The above code starts a gRPC server at a specified port and serves the functions and services which we had written in our proto file. Let us walk through the above code −

    Starting from the main method, we accept one argument, i.e., the title of the book we want to search for.

    We setup a Channel for gRPC communication with our server.

    And then, we create a blocking stub using the channel. This is where we choose the service "BookStore" whose functions we plan to call.

    Then, we simply create the expected input defined in the .proto file,i.e., in our case, BookSearch, and we add the title that we want the server to search for.

    We ultimately make the call and get an iterator on vapd Books. When we iterate, we get the corresponding Books made available by the Server.

    Finally, we close the channel to avoid any resource leak.

So, that is our cpent code.

Cpent Server Call

To sum up, what we want to do is the following −

    Start the gRPC server.

    The Cpent queries the Server for a book with a given author.

    The Server searches the book in its store which is a time-consuming process.

    The Server responds whenever it finds a book with the given criteria. The Server does not wait for all the vapd books to be available. It sends the output as soon as it finds one. And then repeats the process.

Now, that we have defined our proto file, written our server and the cpent code, let us proceed to execute this code and see things in action.

For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerStreaming

We would see the following output −

Output


Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, pstening on 50051

The above output means the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookStoreCpentServerStreamingBlocking "Har"

We would see the following output −

Output


Jul 03, 2021 10:40:31 PM 
com.tp.bookstore.BookStoreCpentServerStreamingBlocking 
getBook
INFO: Querying for book with author: Har

Jul 03, 2021 10:40:37 PM 
com.tp.bookstore.BookStoreCpentServerStreamingBlocking 
getBook
INFO: Found book: name: "Go Set a Watchman"
author: "Harper Lee"
price: 700

Jul 03, 2021 10:40:42 PM 
com.tp.bookstore.BookStoreCpentServerStreamingBlocking 
getBook
INFO: Found book: name: "To Kill MockingBird"
author: "Harper Lee"
price: 400

So, as we see, the cpent was able to get the book details by querying the server with the name of the book. But more importantly, the cpent got the 1st book and the 2nd book at different timestamps, i.e., a gap of almost 5 seconds.

Advertisements