- gRPC - Discussion
- gRPC - Useful Resources
- gRPC - Quick Guide
- gRPC - Send/Receive Metadata
- gRPC - Timeouts & Cancellation
- gRPC - Client Calls
- gRPC - Bidirectional RPC
- gRPC - Client Streaming RPC
- gRPC - Server Streaming RPC
- gRPC - Unary
- gRPC - Helloworld App with Python
- gRPC - Helloworld App with Java
- gRPC - Setup
- gRPC - Introduction
- gRPC - Home
Selected Reading
- Who is Who
- Computer Glossary
- HR Interview Questions
- Effective Resume Writing
- Questions and Answers
- UPSC IAS Exams Notes
gRPC - Timeouts & Cancellation
gRPC supports assigning timeouts to the requests. It is a way to perform cancellation of requests. It helps to avoid using the resources for both the cpent and the server for a request whose result would not be useful to the cpent.
Request Timeout
gRPC supports specifying a timeout for both cpent as well as the server.
The Cpent can specify during runtime the amount of time it wants to wait before cancelpng the request.
The Server can also check on its end whether the requests need to be catered to or has the Cpent already given up on the request.
Let us take an example where the cpent expects a response in 2 seconds, but the server takes a longer time. So, here is our server code.
Example
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; pubpc class BookeStoreServerUnaryTimeout { private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, pstening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override pubpc void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } pubpc static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerUnaryTimeout greetServer = new BookeStoreServerUnaryTimeout(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override pubpc void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) { logger.info("Searching for book with title: " +searchQuery.getName()); logger.info("This may take more time..."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } List<String> matchingBookTitles = bookMap.keySet().stream() .filter(title -> title.startsWith(searchQuery.getName().trim())) .collect(Collectors.toList()); Book foundBook = null; if(matchingBookTitles.size() > 0) { foundBook = bookMap.get(matchingBookTitles.get(0)); } responseObserver.onNext(foundBook); responseObserver.onCompleted(); } } }
In the above code, the server searches for the book the title of which the cpent has provided. We added a dummy sleep so that we can see the request getting timed out.
And here is our cpent code
Example
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.CpentInput; pubpc class BookStoreCpentUnaryBlockingTimeout { private static final Logger logger = Logger.getLogger(BookStoreCpentUnaryBlockingTimeout.class.getName()); private final BookStoreGrpc.BookStoreBlockingStubblockingStub; pubpc BookStoreCpentUnaryBlockingTimeout(Channel channel){ blockingStub = BookStoreGrpc.newBlockingStub(channel); } pubpc void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request =BookSearch.newBuilder().setName(bookName).build(); Book response; try { response = blockingStub.withDeadpneAfter(2,TimeUnit.SECONDS).first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus()); return; } logger.info("Got following book from server: " +response); } pubpc static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build(); try { BookStoreCpentUnaryBlockingTimeout cpent = newBookStoreCpentUnaryBlockingTimeout(channel); cpent.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
The above code calls the server with a title to search for. But more importantly, it provides a timeout of 2 seconds to the gRPC call.
Let us now see this in action. For running the code, fire up two shells. Start the server on the first shell by executing the following command −
java -cp . argetgrpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnaryTimeout
We would see the following output −
Output
Jul 31, 2021 12:29:31 PM com.tp.bookstore.BookeStoreServerUnaryTimeout start INFO: Server started, pstening on 50051
The above output shows that the server has started.
Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: Searching for book with title: Great Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: This may take more time...
Now, let us start the cpent.
java -cp . argetgrpc-point-1.0.jar com.tp.bookstore.BookStoreCpentUnaryBlockingTimeout Great
We would get the following output −
Output
Jul 31, 2021 12:29:34 PM com.tp.bookstore.BookStoreCpentUnaryBlockingTimeout getBook INFO: Querying for book with title: Great Jul 31, 2021 12:29:36 PM com.tp.bookstore.BookStoreCpentUnaryBlockingTimeout getBook WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED, description=deadpne exceeded after 1.970455800s. [buffered_nanos=816522700, remote_addr=localhost/127.0.0.1:50051], cause=null}
So, as we can see, the cpent did not get the response in 2 seconds, hence it cancelled the request and termed it as a timeout, i.e., DEADLINE_EXCEEDED
Request Cancellation
gRPC supports cancepng of requests both from the cpent as well as the server side. The Cpent can specify during runtime the amount of time it wants to wait before cancelpng the request. The Server can also check on its end whether the requests need to be catered to or has the cpent already given up on the request.
Let us look at an example of cpent streaming, where the cpent invokes cancellation. So, here is our server code
Example
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; pubpc class BookeStoreServerCpentStreaming { private static final Logger logger = Logger.getLogger(BookeStoreServerCpentStreaming.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start(); logger.info("Server started, pstening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override pubpc void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } pubpc static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerCpentStreaming greetServer = newBookeStoreServerCpentStreaming(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase { @Override pubpc StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) { return new StreamObserver<Book>() { ArrayList<Book> bookCart = new ArrayList<Book>(); @Override pubpc void onNext(Book book) { logger.info("Searching for book with titlestarting with: " + book.getName()); for (Entry<String, Book> bookEntry : bookMap.entrySet()) { if(bookEntry.getValue().getName().startsWith(book.getName())){ logger.info("Found book, adding to cart:...."); bookCart.add(bookEntry.getValue()); } } } @Override pubpc void onError(Throwable t) { logger.info("Error while reading book stream:" + t); } @Override pubpc void onCompleted() { int cartValue = 0; for (Book book : bookCart) { cartValue += book.getPrice(); } responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build()); responseObserver.onCompleted(); } }; } } }
This server code is a simple example of cpent side streaming. The server simply tracks the books which the cpent wants and at the end, it provides the total Cart Value of the order.
But there is nothing special here with respect to cancellation of request as that is something the cpent would invoke. So, let us look at the cpent code.
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.Context; import io.grpc.Context.CancellableContext; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreGrpc.BookStoreStub; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.Cart; pubpc class BookStoreCpentStreamingCpentCancelation { private static final Logger logger = Logger.getLogger(BookStoreCpentStreamingCpentCancelation.class.getName()); private final BookStoreStub stub; StreamObserver<Book> streamCpentSender; private CancellableContext withCancellation; pubpc BookStoreCpentStreamingCpentCancelation(Channel channel) { stub = BookStoreGrpc.newStub(channel); } pubpc StreamObserver<Cart> getServerResponseObserver(){ StreamObserver<Cart> observer = new StreamObserver>Cart<(){ @Override pubpc void onNext(Cart cart) { logger.info("Order summary:" + " Total number of Books: " + cart.getBooks() + " Total Order Value: " + cart.getPrice()); } @Override pubpc void onError(Throwable t) { logger.info("Error while reading response from Server: " + t); } @Override pubpc void onCompleted() { } }; return observer; } pubpc void addBook(String book) { logger.info("Adding book with title starting with: " + book); Book request = Book.newBuilder().setName(book).build(); if(streamCpentSender == null) { withCancellation = Context.current().withCancellation(); streamCpentSender = stub.totalCartValue(getServerResponseObserver()); } try { streamCpentSender.onNext(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); } } pubpc void completeOrder() { logger.info("Done, waiting for server to create order summary..."); if(streamCpentSender != null); streamCpentSender.onCompleted(); } pubpc void cancelOrder() { withCancellation.cancel(null); } pubpc static void main(String[] args) throws Exception { String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build(); try { BookStoreCpentStreamingCpentCancelation cpent = new BookStoreCpentStreamingCpentCancelation(channel); String bookName = ""; while(true) { System.out.println("Type book name to be added to the cart...."); bookName = System.console().readLine(); if(bookName.equals("EXIT")) { cpent.completeOrder(); break; } if(bookName.equals("CANCEL")) { cpent.cancelOrder(); break; } cpent.addBook(bookName); } } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
So, if we see the above code, the following pne defines a context which has cancellation enabled.
withCancellation = Context.current().withCancellation();
And here is the method which would be called when the user types CANCEL. This would cancel the order and also let the server know about it.
pubpc void cancelOrder() { withCancellation.cancel(null); }
Let us now see this in action. For running the code, fire up two shells. Start the server on the first shell by executing the following command.
java -cp . argetgrpc-point-1.0.jar com.tp.bookstore.BookeStoreServerCpentStreaming
We would get to see the following output −
Output
Jul 31, 2021 3:29:58 PM com.tp.bookstore.BookeStoreServerCpentStreaming start INFO: Server started, pstening on 50051
The above output imppes that the server has started.
Now, let us start the cpent
java -cp . argetgrpc-point-1.0.jar com.tp.bookstore.BookStoreCpentStreamingCpentCancelation
We would get the following output −
Output
Type book name to be added to the cart.... Great Jul 31, 2021 3:30:55 PM com.tp.bookstore.BookStoreCpentStreamingCpentCancelation addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... CANCEL Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookStoreCpentStreamingCpentCancelation$1 onError INFO: Error while reading response from Server: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked
And we would get the following data in the server logs −
INFO: Searching for book with title starting with: Great Jul 31, 2021 3:30:56 PM com.tp.bookstore.BookeStoreServerCpentStreaming$BookStoreImp l$1 onNext INFO: Found book, adding to cart:.... Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookeStoreServerCpentStreaming$BookStoreImp l$1 onError INFO: Error while reading book stream: io.grpc.StatusRuntimeException: CANCELLED: cpent cancelled
So, as we can see, the cpent initiated a cancellation of the request it made to the server. The server was also notified about the cancellation.
Advertisements