English 中文(简体)
gRPC - Send/Receive Metadata
  • 时间:2024-09-17

gRPC - Send/Receive Metadata


Previous Page Next Page  

gRPC supports sending metadata. The metadata is basically a set of data we want to send that is not part of the business logic, while making gRPC calls.

Let us look at the following two cases −

    The Cpent sends Metadata and the Server reads it.

    The Server sends Metadata and Cpent reads it.

We will go through both these cases one by one.

Cpent Sends Metadata

As mentioned, gRPC supports the cpent sending the metadata which the server can read. gRPC supports extending the cpent and server interceptor which can be used to write and read metadata, respectively. Let us take an example to understand it better. Here is our cpent code which sends the hostname as metadata −

Let us take an example to understand it better. Here is our cpent code which sends the hostname as metadata −

Example


package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.CpentCall;
import io.grpc.CpentInterceptor;
import io.grpc.ForwardingCpentCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

pubpc class BookStoreCpentUnaryBlockingMetadata {
   private static final Logger logger = Logger.getLogger(BookStoreCpentUnaryBlockingMetadata.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
   pubpc BookStoreCpentUnaryBlockingMetadata(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   static class BookCpentInterceptor implements CpentInterceptor {
      @Override
      pubpc <ReqT, RespT> CpentCall<ReqT, RespT>
      interceptCall(
         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next
      ) {
         return new 
         ForwardingCpentCall.SimpleForwardingCpentCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            @Override
            pubpc void start(Listener<RespT> responseListener, Metadata headers) {
               logger.info("Added metadata");
               headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
               super.start(responseListener, headers);
            }
         };
      }
   }
   pubpc void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName); 
      BookSearch request = BookSearch.newBuilder().setName(bookName).build(); 
      Book response; 
      CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key");
      try {
         response = blockingStub.withOption(metaDataKey, "bar").first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   } 
   pubpc static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =  ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookCpentInterceptor()).build();
      try {
         BookStoreCpentUnaryBlockingMetadata cpent = new BookStoreCpentUnaryBlockingMetadata(channel);
         cpent.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

The interesting bit here is the interceptor.


static class BookCpentInterceptor implements CpentInterceptor{
   @Override
   pubpc <ReqT, RespT> CpentCall<ReqT, RespT>
   interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
      return new 
      ForwardingCpentCall.SimpleForwardingCpentCall<ReqT, RespT>(next.newCall(method, callOptions)) {
         @Override
         pubpc void start(Listener<RespT>responseListener, Metadata headers) {
            logger.info("Added metadata");
            headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
            super.start(responseListener, headers);
         }
      };
   }
}

We intercept any call which is being made by the cpent and then add hostname metadata to it before it is called further.

Server Reads Metadata

Now, let us look at the server code which reads this metadata −


package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

pubpc class BookeStoreServerMetadata {
   private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
   }
   private Server server;
   class BookServerInterceptor implements ServerInterceptor{
      @Override
      pubpc <ReqT, RespT> Listener<ReqT> 
      interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
 
         logger.info("Recieved following metadata: " + headers);
         return next.startCall(call, headers);
      } 
   }
   private void start() throws IOException {
      int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start();
      logger.info("Server started, pstening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         pubpc void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   pubpc static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      pubpc void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " + searchQuery.getName());
         List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

Again, the interesting bit here is the interceptor.


class BookServerInterceptor implements ServerInterceptor{
   @Override
   pubpc <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
      logger.info("Recieved following metadata: " + headers);
      return next.startCall(call, headers);
   }
}

We intercept any call which is incoming to the server and then log the metadata before the call can be handled by the actual method.

Cpent-Server Call

Let us now see this in action. For running the code, fire up two shells. Start the server on the first shell by executing the following command −


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerMetadata

We would see the following output −

Output


Jul 31, 2021 5:29:14 PM 
com.tp.bookstore.BookeStoreServerMetadata start
INFO: Server started, pstening on 50051

The above output imppes that the server has started.

Now, let us start the cpent.


java -cp .	argetgrpc-point-1.0.jar 
com.tp.bookstore.BookStoreCpentUnaryBlockingMetadata Great

We would see the following output −

Output


Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreCpentUnaryBlockingMetadata getBook
INFO: Querying for book with title: Great
Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreCpentUnaryBlockingMetadata$BookCp
entInterceptor$1 start
INFO: Added metadata
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookStoreCpentUnaryBlockingMetadata getBook
INFO: Got following book from server: name: "Great Gatsby"
author: "Scott Fitzgerald"
price: 300

And we will have the following data in the server logs −


Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept
or interceptCall
INFO: Recieved following metadata:
Metadata(content-type=apppcation/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip)
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first
INFO: Searching for book with title: Great

As we can see, the server is able to read the metadata: hostname=MY_HOST which was added by the cpent.

Advertisements