Bael 4466 error handling (#11269)

* Commit source code to branch

* BAEL-5065 improvement of groupBy with complex key

* BAEL-4466 Implementation of error handling
This commit is contained in:
mbarriola 2021-10-01 10:24:06 -04:00 committed by GitHub
parent be966d6d62
commit a97ae48958
10 changed files with 690 additions and 34 deletions

View File

@ -31,6 +31,12 @@
<artifactId>grpc-stub</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>${io.grpc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -0,0 +1,127 @@
package com.baeldung.grpc.errorhandling;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.baeldung.grpc.errorhandling.CommodityPriceProviderGrpc.CommodityPriceProviderStub;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Code;
import com.google.rpc.ErrorInfo;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
public class CommodityClient {
private static final Logger logger = LoggerFactory.getLogger(CommodityClient.class.getName());
private final CommodityPriceProviderStub nonBlockingStub;
public CommodityClient(Channel channel) {
nonBlockingStub = CommodityPriceProviderGrpc.newStub(channel);
}
public void getBidirectionalCommodityPriceLists() throws InterruptedException {
logger.info("#######START EXAMPLE#######: BidirectionalStreaming - getCommodityPriceLists from list of commodities");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<StreamingCommodityQuote> responseObserver = new StreamObserver<StreamingCommodityQuote>() {
@Override
public void onNext(StreamingCommodityQuote streamingCommodityQuote) {
switch (streamingCommodityQuote.getMessageCase()) {
case COMODITY_QUOTE:
CommodityQuote commodityQuote = streamingCommodityQuote.getComodityQuote();
logger.info("RESPONSE producer:" + commodityQuote.getCommodityName() + " price:" + commodityQuote.getPrice());
break;
case STATUS:
com.google.rpc.Status status = streamingCommodityQuote.getStatus();
logger.info("RESPONSE status error:");
logger.info("Status code:" + Code.forNumber(status.getCode()));
logger.info("Status message:" + status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo;
try {
errorInfo = any.unpack(ErrorInfo.class);
logger.info("Reason:" + errorInfo.getReason());
logger.info("Domain:" + errorInfo.getDomain());
logger.info("Insert Token:" + errorInfo.getMetadataMap().get("insertToken"));
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage());
}
}
}
break;
default:
logger.info("Unknow message case");
}
}
@Override
public void onCompleted() {
logger.info("Finished getBidirectionalCommodityPriceListss");
finishLatch.countDown();
}
@Override
public void onError(Throwable t) {
logger.error("getBidirectionalCommodityPriceLists Failed:" + Status.fromThrowable(t));
finishLatch.countDown();
}
};
StreamObserver<Commodity> requestObserver = nonBlockingStub.bidirectionalListOfPrices(responseObserver);
try {
for (int i = 1; i <= 2; i++) {
Commodity request = Commodity.newBuilder()
.setCommodityName("Commodity" + i)
.setAccessToken(i + "23validToken")
.build();
logger.info("REQUEST - commodity:" + request.getCommodityName());
requestObserver.onNext(request);
Thread.sleep(200);
if (finishLatch.getCount() == 0) {
return;
}
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logger.info("getBidirectionalCommodityPriceLists can not finish within 1 minute");
}
}
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
String target = "localhost:8980";
if (args.length > 0) {
target = args[0];
}
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
.usePlaintext()
.build();
try {
CommodityClient client = new CommodityClient(channel);
client.getBidirectionalCommodityPriceLists();
} finally {
channel.shutdownNow()
.awaitTermination(5, TimeUnit.SECONDS);
}
}
}

View File

@ -0,0 +1,182 @@
package com.baeldung.grpc.errorhandling;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.Any;
import com.google.rpc.Code;
import com.google.rpc.ErrorInfo;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
public class CommodityServer {
private static final Logger logger = LoggerFactory.getLogger(CommodityServer.class.getName());
private final int port;
private final Server server;
private static Map<String, Double> commodityLookupBasePrice;
static {
commodityLookupBasePrice = new ConcurrentHashMap<>();
commodityLookupBasePrice.put("Commodity1", 5.0);
commodityLookupBasePrice.put("Commodity2", 6.0);
}
public static void main(String[] args) throws Exception {
CommodityServer commodityServer = new CommodityServer(8980);
commodityServer.start();
if (commodityServer.server != null) {
commodityServer.server.awaitTermination();
}
}
public CommodityServer(int port) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port)
.addService(new CommodityService())
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on {}", port);
Runtime.getRuntime()
.addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("shutting down server");
try {
CommodityServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("server shutted down");
}
});
}
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown()
.awaitTermination(30, TimeUnit.SECONDS);
}
}
public static class CommodityService extends CommodityPriceProviderGrpc.CommodityPriceProviderImplBase {
@Override
public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
if (commodityLookupBasePrice.get(request.getCommodityName()) == null) {
Metadata.Key<ErrorResponse> errorResponseKey = ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance());
ErrorResponse errorResponse = ErrorResponse.newBuilder()
.setCommodityName(request.getCommodityName())
.setAccessToken(request.getAccessToken())
.setExpectedValue("Only Commodity1, Commodity2 are supported")
.build();
Metadata metadata = new Metadata();
metadata.put(errorResponseKey, errorResponse);
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("The commodity is not supported")
.asRuntimeException(metadata));
} else if (request.getAccessToken().equals("123validToken") == false) {
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.baeldung.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
} else {
CommodityQuote commodityQuote = CommodityQuote.newBuilder()
.setPrice(fetchBestPriceBid(request))
.setCommodityName(request.getCommodityName())
.setProducerName("Best Producer with best price")
.build();
responseObserver.onNext(commodityQuote);
responseObserver.onCompleted();
}
}
@Override
public StreamObserver<Commodity> bidirectionalListOfPrices(StreamObserver<StreamingCommodityQuote> responseObserver) {
return new StreamObserver<Commodity>() {
@Override
public void onNext(Commodity request) {
logger.info("Access token:{}", request.getAccessToken());
if (request.getAccessToken()
.equals("123validToken") == false) {
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.baeldung.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
.setStatus(status)
.build();
responseObserver.onNext(streamingCommodityQuote);
} else {
for (int i = 1; i <= 5; i++) {
CommodityQuote commodityQuote = CommodityQuote.newBuilder()
.setPrice(fetchProviderPriceBid(request, "producer:" + i))
.setCommodityName(request.getCommodityName())
.setProducerName("producer:" + i)
.build();
StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
.setComodityQuote(commodityQuote)
.build();
responseObserver.onNext(streamingCommodityQuote);
}
}
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.info("error:{}", t.getMessage());
}
};
}
}
private static double fetchBestPriceBid(Commodity commodity) {
return commodityLookupBasePrice.get(commodity.getCommodityName()) + ThreadLocalRandom.current()
.nextDouble(-0.2d, 0.2d);
}
private static double fetchProviderPriceBid(Commodity commodity, String providerName) {
return commodityLookupBasePrice.get(commodity.getCommodityName()) + providerName.length() + ThreadLocalRandom.current()
.nextDouble(-0.2d, 0.2d);
}
}

View File

@ -5,8 +5,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.baeldung.grpc.streaming.StockQuoteProviderGrpc.StockQuoteProviderBlockingStub;
import com.baeldung.grpc.streaming.StockQuoteProviderGrpc.StockQuoteProviderStub;
@ -19,7 +20,7 @@ import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
public class StockClient {
private static final Logger logger = Logger.getLogger(StockClient.class.getName());
private static final Logger logger = LoggerFactory.getLogger(StockClient.class.getName());
private final StockQuoteProviderBlockingStub blockingStub;
private final StockQuoteProviderStub nonBlockingStub;
@ -34,7 +35,7 @@ public class StockClient {
public void serverSideStreamingListOfStockPrices() {
logInfo("######START EXAMPLE######: ServerSideStreaming - list of Stock prices from a given stock");
logger.info("######START EXAMPLE######: ServerSideStreaming - list of Stock prices from a given stock");
Stock request = Stock.newBuilder()
.setTickerSymbol("AU")
.setCompanyName("Austich")
@ -42,36 +43,36 @@ public class StockClient {
.build();
Iterator<StockQuote> stockQuotes;
try {
logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
logger.info("REQUEST - ticker symbol {}", request.getTickerSymbol());
stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
for (int i = 1; stockQuotes.hasNext(); i++) {
StockQuote stockQuote = stockQuotes.next();
logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
logger.info("RESPONSE - Price #" + i + ": {}", stockQuote.getPrice());
}
} catch (StatusRuntimeException e) {
logInfo("RPC failed: {0}", e.getStatus());
logger.info("RPC failed: {}", e.getStatus());
}
}
public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException {
logInfo("######START EXAMPLE######: ClientSideStreaming - getStatisticsOfStocks from a list of stocks");
logger.info("######START EXAMPLE######: ClientSideStreaming - getStatisticsOfStocks from a list of stocks");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote summary) {
logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription());
logger.info("RESPONSE, got stock statistics - Average Price: {}, description: {}", summary.getPrice(), summary.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
logger.info("Finished clientSideStreamingGetStatisticsOfStocks");
finishLatch.countDown();
}
@Override
public void onError(Throwable t) {
logWarning("Stock Statistics Failed: {0}", Status.fromThrowable(t));
logger.warn("Stock Statistics Failed: {}", Status.fromThrowable(t));
finishLatch.countDown();
}
};
@ -80,7 +81,7 @@ public class StockClient {
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
logger.info("REQUEST: {}, {}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
if (finishLatch.getCount() == 0) {
return;
@ -92,36 +93,36 @@ public class StockClient {
}
requestObserver.onCompleted();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logWarning("clientSideStreamingGetStatisticsOfStocks can not finish within 1 minutes");
logger.warn("clientSideStreamingGetStatisticsOfStocks can not finish within 1 minutes");
}
}
public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{
logInfo("#######START EXAMPLE#######: BidirectionalStreaming - getListsStockQuotes from list of stocks");
logger.info("#######START EXAMPLE#######: BidirectionalStreaming - getListsStockQuotes from list of stocks");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote stockQuote) {
logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
logger.info("RESPONSE price#{} : {}, description:{}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
logger.info("Finished bidirectionalStreamingGetListsStockQuotes");
finishLatch.countDown();
}
@Override
public void onError(Throwable t) {
logWarning("bidirectionalStreamingGetListsStockQuotes Failed: {0}", Status.fromThrowable(t));
logger.warn("bidirectionalStreamingGetListsStockQuotes Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
};
StreamObserver<Stock> requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
logger.info("REQUEST: {}, {}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
Thread.sleep(200);
if (finishLatch.getCount() == 0) {
@ -135,7 +136,7 @@ public class StockClient {
requestObserver.onCompleted();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logWarning("bidirectionalStreamingGetListsStockQuotes can not finish within 1 minute");
logger.warn("bidirectionalStreamingGetListsStockQuotes can not finish within 1 minute");
}
}
@ -172,12 +173,4 @@ public class StockClient {
, Stock.newBuilder().setTickerSymbol("DIA").setCompanyName("Dialogic Corp").setDescription("Development Intel").build()
, Stock.newBuilder().setTickerSymbol("EUS").setCompanyName("Euskaltel Corp").setDescription("English Intel").build());
}
private void logInfo(String msg, Object... params) {
logger.log(Level.INFO, msg, params);
}
private void logWarning(String msg, Object... params) {
logger.log(Level.WARNING, msg, params);
}
}

View File

@ -3,8 +3,9 @@ package com.baeldung.grpc.streaming;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.grpc.Server;
import io.grpc.ServerBuilder;
@ -12,7 +13,7 @@ import io.grpc.stub.StreamObserver;
public class StockServer {
private static final Logger logger = Logger.getLogger(StockServer.class.getName());
private static final Logger logger = LoggerFactory.getLogger(StockClient.class.getName());
private final int port;
private final Server server;
@ -102,7 +103,7 @@ public class StockServer {
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "error:{0}", t.getMessage());
logger.warn("error:{}", t.getMessage());
}
};
}
@ -131,7 +132,7 @@ public class StockServer {
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "error:{0}", t.getMessage());
logger.warn("error:{}", t.getMessage());
}
};
}

View File

@ -0,0 +1,42 @@
syntax = "proto3";
import "google/rpc/status.proto";
package commodityprice;
option java_multiple_files = true;
option java_package = "com.baeldung.grpc.errorhandling";
option java_outer_classname = "CommodityPriceProto";
option objc_class_prefix = "RTG";
service CommodityPriceProvider {
rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
rpc bidirectionalListOfPrices(stream Commodity) returns (stream StreamingCommodityQuote) {}
}
message Commodity {
string access_token = 1;
string commodity_name = 2;
}
message CommodityQuote {
string commodity_name = 1;
string producer_name = 2;
double price = 3;
}
message ErrorResponse {
string commodity_name = 1;
string access_token = 2;
string expected_token = 3;
string expected_value = 4;
}
message StreamingCommodityQuote{
oneof message{
CommodityQuote comodity_quote = 1;
google.rpc.Status status = 2;
}
}

View File

@ -0,0 +1,158 @@
// Protocol Buffers - Google's data interchange format
// Copyright 2008 Google Inc. All rights reserved.
// https://developers.google.com/protocol-buffers/
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package google.protobuf;
option csharp_namespace = "Google.Protobuf.WellKnownTypes";
option go_package = "google.golang.org/protobuf/types/known/anypb";
option java_package = "com.google.protobuf";
option java_outer_classname = "AnyProto";
option java_multiple_files = true;
option objc_class_prefix = "GPB";
// `Any` contains an arbitrary serialized protocol buffer message along with a
// URL that describes the type of the serialized message.
//
// Protobuf library provides support to pack/unpack Any values in the form
// of utility functions or additional generated methods of the Any type.
//
// Example 1: Pack and unpack a message in C++.
//
// Foo foo = ...;
// Any any;
// any.PackFrom(foo);
// ...
// if (any.UnpackTo(&foo)) {
// ...
// }
//
// Example 2: Pack and unpack a message in Java.
//
// Foo foo = ...;
// Any any = Any.pack(foo);
// ...
// if (any.is(Foo.class)) {
// foo = any.unpack(Foo.class);
// }
//
// Example 3: Pack and unpack a message in Python.
//
// foo = Foo(...)
// any = Any()
// any.Pack(foo)
// ...
// if any.Is(Foo.DESCRIPTOR):
// any.Unpack(foo)
// ...
//
// Example 4: Pack and unpack a message in Go
//
// foo := &pb.Foo{...}
// any, err := anypb.New(foo)
// if err != nil {
// ...
// }
// ...
// foo := &pb.Foo{}
// if err := any.UnmarshalTo(foo); err != nil {
// ...
// }
//
// The pack methods provided by protobuf library will by default use
// 'type.googleapis.com/full.type.name' as the type URL and the unpack
// methods only use the fully qualified type name after the last '/'
// in the type URL, for example "foo.bar.com/x/y.z" will yield type
// name "y.z".
//
//
// JSON
// ====
// The JSON representation of an `Any` value uses the regular
// representation of the deserialized, embedded message, with an
// additional field `@type` which contains the type URL. Example:
//
// package google.profile;
// message Person {
// string first_name = 1;
// string last_name = 2;
// }
//
// {
// "@type": "type.googleapis.com/google.profile.Person",
// "firstName": <string>,
// "lastName": <string>
// }
//
// If the embedded message type is well-known and has a custom JSON
// representation, that representation will be embedded adding a field
// `value` which holds the custom JSON in addition to the `@type`
// field. Example (for message [google.protobuf.Duration][]):
//
// {
// "@type": "type.googleapis.com/google.protobuf.Duration",
// "value": "1.212s"
// }
//
message Any {
// A URL/resource name that uniquely identifies the type of the serialized
// protocol buffer message. This string must contain at least
// one "/" character. The last segment of the URL's path must represent
// the fully qualified name of the type (as in
// `path/google.protobuf.Duration`). The name should be in a canonical form
// (e.g., leading "." is not accepted).
//
// In practice, teams usually precompile into the binary all types that they
// expect it to use in the context of Any. However, for URLs which use the
// scheme `http`, `https`, or no scheme, one can optionally set up a type
// server that maps type URLs to message definitions as follows:
//
// * If no scheme is provided, `https` is assumed.
// * An HTTP GET on the URL must yield a [google.protobuf.Type][]
// value in binary format, or produce an error.
// * Applications are allowed to cache lookup results based on the
// URL, or have them precompiled into a binary to avoid any
// lookup. Therefore, binary compatibility needs to be preserved
// on changes to types. (Use versioned type names to manage
// breaking changes.)
//
// Note: this functionality is not currently available in the official
// protobuf release, and it is not used for type URLs beginning with
// type.googleapis.com.
//
// Schemes other than `http`, `https` (or the empty scheme) might be
// used with implementation specific semantics.
//
string type_url = 1;
// Must be a valid serialized protocol buffer of the above specified type.
bytes value = 2;
}

View File

@ -0,0 +1,47 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.rpc;
import "google/protobuf/any.proto";
option cc_enable_arenas = true;
option go_package = "google.golang.org/genproto/googleapis/rpc/status;status";
option java_multiple_files = true;
option java_outer_classname = "StatusProto";
option java_package = "com.google.rpc";
option objc_class_prefix = "RPC";
// The `Status` type defines a logical error model that is suitable for
// different programming environments, including REST APIs and RPC APIs. It is
// used by [gRPC](https://github.com/grpc). Each `Status` message contains
// three pieces of data: error code, error message, and error details.
//
// You can find out more about this error model and how to work with it in the
// [API Design Guide](https://cloud.google.com/apis/design/errors).
message Status {
// The status code, which should be an enum value of [google.rpc.Code][google.rpc.Code].
int32 code = 1;
// A developer-facing error message, which should be in English. Any
// user-facing error message should be localized and sent in the
// [google.rpc.Status.details][google.rpc.Status.details] field, or localized by the client.
string message = 2;
// A list of messages that carry the error details. There is a common set of
// message types for APIs to use.
repeated google.protobuf.Any details = 3;
}

View File

@ -7,8 +7,6 @@ option java_package = "com.baeldung.grpc.streaming";
option java_outer_classname = "StockQuoteProto";
option objc_class_prefix = "RTG";
//basic setup ...
service StockQuoteProvider {
rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}

View File

@ -0,0 +1,102 @@
package com.baeldung.grpc.errorhandling;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import org.junit.Rule;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.baeldung.grpc.errorhandling.CommodityServer.CommodityService;
import com.google.protobuf.Any;
import com.google.rpc.Code;
import com.google.rpc.ErrorInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.protobuf.StatusProto;
import io.grpc.testing.GrpcCleanupRule;
public class CommodityServerUnitTest {
CommodityPriceProviderGrpc.CommodityPriceProviderBlockingStub blockingStub;
@Rule
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
@BeforeEach
public void setup() throws Exception{
String serverName = InProcessServerBuilder.generateName();
grpcCleanup.register(InProcessServerBuilder.forName(serverName)
.directExecutor()
.addService(new CommodityService())
.build()
.start());
blockingStub = CommodityPriceProviderGrpc.newBlockingStub(grpcCleanup.register(InProcessChannelBuilder.forName(serverName)
.directExecutor()
.build()));
}
@Test
public void whenUsingValidRequest_thenReturnResponse() throws Exception {
CommodityQuote reply = blockingStub.getBestCommodityPrice(Commodity.newBuilder()
.setCommodityName("Commodity1")
.setAccessToken("123validToken")
.build());
assertEquals("Commodity1", reply.getCommodityName());
}
@Test
public void whenUsingInvalidRequestToken_thenReturnExceptionGoogleRPCStatus() throws Exception {
Commodity request = Commodity.newBuilder()
.setAccessToken("invalidToken")
.setCommodityName("Commodity1")
.build();
StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class, () -> blockingStub.getBestCommodityPrice(request));
com.google.rpc.Status status = StatusProto.fromThrowable(thrown);
assertNotNull(status);
assertEquals("NOT_FOUND", Code.forNumber(status.getCode()).toString());
assertEquals("The access token not found", status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
assertEquals("Invalid Token", errorInfo.getReason());
assertEquals("com.baeldung.grpc.errorhandling", errorInfo.getDomain());
assertEquals("123validToken", errorInfo.getMetadataMap().get("insertToken"));
}
}
}
@Test
public void whenUsingInvalidCommodityName_thenReturnExceptionIoRpcStatus() throws Exception {
Commodity request = Commodity.newBuilder()
.setAccessToken("123validToken")
.setCommodityName("Commodity5")
.build();
StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class, () -> blockingStub.getBestCommodityPrice(request));
assertEquals("INVALID_ARGUMENT", thrown.getStatus().getCode().toString());
assertEquals("INVALID_ARGUMENT: The commodity is not supported", thrown.getMessage());
Metadata metadata = Status.trailersFromThrowable(thrown);
ErrorResponse errorResponse = metadata.get(ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance()));
assertEquals("Commodity5",errorResponse.getCommodityName());
assertEquals("123validToken", errorResponse.getAccessToken());
assertEquals("Only Commodity1, Commodity2 are supported", errorResponse.getExpectedValue());
}
}