English 中文(简体)
gRPC - Timeouts & Cancellation
  • 时间:2024-09-17

gRPC - Timeouts & Cancellation


Previous Page Next Page  

gRPC supports assigning timeouts to the requests. It is a way to perform cancellation of requests. It helps to avoid using the resources for both the cpent and the server for a request whose result would not be useful to the cpent.

Request Timeout

gRPC supports specifying a timeout for both cpent as well as the server.

    The Cpent can specify during runtime the amount of time it wants to wait before cancelpng the request.

    The Server can also check on its end whether the requests need to be catered to or has the Cpent already given up on the request.

Let us take an example where the cpent expects a response in 2 seconds, but the server takes a longer time. So, here is our server code.

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

pubpc class BookeStoreServerUnaryTimeout {
   private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
        .setAuthor("Scott Fitzgerald")
        .setPrice(300).build());
      bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
        .setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
        .setAuthor("E.M.Forster")
        .setPrice(500).build());
      bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
        .setAuthor("Scott Fitzgerald")
        .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
        .setAuthor("Harper Lee")
        .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerUnaryTimeout greetServer = new
      BookeStoreServerUnaryTimeout();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends
   BookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " +searchQuery.getName());
         logger.info("This may take more time...");
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         List<String> matchingBookTitles = bookMap.keySet().stream()
            .filter(title ->
         title.startsWith(searchQuery.getName().trim()))
            .collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

In the above code, the server searches for the book the title of which the cpent has provided. We added a dummy sleep so that we can see the request getting timed out.

And here is our cpent code

Example


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.CpentInput;

pubpc class BookStoreCpentUnaryBlockingTimeout {
   private static final Logger logger = Logger.getLogger(BookStoreCpentUnaryBlockingTimeout.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   pubpc BookStoreCpentUnaryBlockingTimeout(Channel channel){
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   pubpc void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request =BookSearch.newBuilder().setName(bookName).build();
      Book response;
      try {
         response = blockingStub.withDeadpneAfter(2,TimeUnit.SECONDS).first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
      logger.info("Got following book from server: " +response);
   }
   pubpc static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreCpentUnaryBlockingTimeout cpent = newBookStoreCpentUnaryBlockingTimeout(channel);
         cpent.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5,
         TimeUnit.SECONDS);
      }
   }
}

The above code calls the server with a title to search for. But more importantly, it provides a timeout of 2 seconds to the gRPC call.

Let us now see this in action. For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerUnaryTimeout

We would see the following output −

Output


Jul 31, 2021 12:29:31 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout start
INFO: Server started, pstening on 50051

The above output shows that the server has started.


Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: Searching for book with title: Great

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: This may take more time...

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookStoreCpentUnaryBlockingTimeout Great

We would get the following output −

Output


Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreCpentUnaryBlockingTimeout getBook

INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreCpentUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadpne exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}

So, as we can see, the cpent did not get the response in 2 seconds, hence it cancelled the request and termed it as a timeout, i.e., DEADLINE_EXCEEDED

Request Cancellation

gRPC supports cancepng of requests both from the cpent as well as the server side. The Cpent can specify during runtime the amount of time it wants to wait before cancelpng the request. The Server can also check on its end whether the requests need to be catered to or has the cpent already given up on the request.

Let us look at an example of cpent streaming, where the cpent invokes cancellation. So, here is our server code

Example


package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

pubpc class BookeStoreServerCpentStreaming {
   private static final Logger logger = Logger.getLogger(BookeStoreServerCpentStreaming.class.getName());
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott   Fitzgerald").setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();

      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerCpentStreaming greetServer = newBookeStoreServerCpentStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc StreamObserver<Book>
      totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            pubpc void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding to cart:....");
                     bookCart.add(bookEntry.getValue());
                  }
               }
            }
            @Override
            pubpc void onError(Throwable t) {
               logger.info("Error while reading book stream:" + t);
            }
            @Override
            pubpc void onCompleted() {
               int cartValue = 0;
               for (Book book : bookCart) {
                  cartValue += book.getPrice();
               }
               responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
               responseObserver.onCompleted();
            }
         };   
      }
   }
}

This server code is a simple example of cpent side streaming. The server simply tracks the books which the cpent wants and at the end, it provides the total Cart Value of the order.

But there is nothing special here with respect to cancellation of request as that is something the cpent would invoke. So, let us look at the cpent code.


package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;

pubpc class BookStoreCpentStreamingCpentCancelation {
   private static final Logger logger = Logger.getLogger(BookStoreCpentStreamingCpentCancelation.class.getName());
   private final BookStoreStub stub;
   StreamObserver<Book> streamCpentSender;
   private CancellableContext withCancellation;
   pubpc BookStoreCpentStreamingCpentCancelation(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   pubpc StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver>Cart<(){
         @Override
         pubpc void onNext(Cart cart) {
            logger.info("Order summary:" 
               + "
Total number of Books: " 
               + cart.getBooks() 
               + "
Total Order Value: " 
               + cart.getPrice());
         }
         @Override
         pubpc void onError(Throwable t) {
            logger.info("Error while reading response from Server: " + t);
         }
         @Override
            pubpc void onCompleted() {
         }
      };
      return observer;
   }
   pubpc void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();

      if(streamCpentSender == null) {
         withCancellation = Context.current().withCancellation();
         streamCpentSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamCpentSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   pubpc void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamCpentSender != null);
      streamCpentSender.onCompleted();
   }
   pubpc void cancelOrder() {
      withCancellation.cancel(null);
   }
   pubpc static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreCpentStreamingCpentCancelation cpent = new BookStoreCpentStreamingCpentCancelation(channel); String bookName = "";
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               cpent.completeOrder();
               break;
            }
            if(bookName.equals("CANCEL")) {
               cpent.cancelOrder();
               break;
            }
            cpent.addBook(bookName);
         }
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

So, if we see the above code, the following pne defines a context which has cancellation enabled.


withCancellation = Context.current().withCancellation();

And here is the method which would be called when the user types CANCEL. This would cancel the order and also let the server know about it.


pubpc void cancelOrder() {
   withCancellation.cancel(null);
}

Let us now see this in action. For running the code, fire up two shells. Start the server on the first shell by executing the following command.


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerCpentStreaming

We would get to see the following output −

Output


Jul 31, 2021 3:29:58 PM
com.tp.bookstore.BookeStoreServerCpentStreaming start
INFO: Server started, pstening on 50051

The above output imppes that the server has started.

Now, let us start the cpent


java -cp .	argetgrpc-point-1.0.jar
com.tp.bookstore.BookStoreCpentStreamingCpentCancelation

We would get the following output −

Output


Type book name to be added to the cart....
Great
Jul 31, 2021 3:30:55 PM
com.tp.bookstore.BookStoreCpentStreamingCpentCancelation
addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
CANCEL
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookStoreCpentStreamingCpentCancelation$1
onError
INFO: Error while reading response from Server:

io.grpc.StatusRuntimeException: UNAVAILABLE: Channel
shutdownNow invoked

And we would get the following data in the server logs −


INFO: Searching for book with title starting with: Great
Jul 31, 2021 3:30:56 PM
com.tp.bookstore.BookeStoreServerCpentStreaming$BookStoreImp
l$1 onNext
INFO: Found book, adding to cart:....
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookeStoreServerCpentStreaming$BookStoreImp
l$1 onError
INFO: Error while reading book stream:
io.grpc.StatusRuntimeException: CANCELLED: cpent cancelled

So, as we can see, the cpent initiated a cancellation of the request it made to the server. The server was also notified about the cancellation.

Advertisements