diff --git a/grpc/pom.xml b/grpc/pom.xml index 915777f3bd..77ec9be464 100644 --- a/grpc/pom.xml +++ b/grpc/pom.xml @@ -1,81 +1,82 @@ - 4.0.0 - grpc - 0.0.1-SNAPSHOT - grpc - jar + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + grpc + 0.0.1-SNAPSHOT + grpc + jar - - com.baeldung - parent-modules - 1.0.0-SNAPSHOT - + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + - - - io.grpc - grpc-netty - ${io.grpc.version} - - - io.grpc - grpc-protobuf - ${io.grpc.version} - - - io.grpc - grpc-stub - ${io.grpc.version} - - - junit - junit - ${junit.version} - test - - - - - - - kr.motd.maven - os-maven-plugin - ${os-maven-plugin.version} - - - - - org.xolstice.maven.plugins - protobuf-maven-plugin - ${protobuf-maven-plugin.version} - - - com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier} - - grpc-java - - io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier} - - - - - - compile - compile-custom - - - - - - - - - 1.16.1 - 1.6.1 - 0.6.1 - + + + io.grpc + grpc-netty-shaded + runtime + ${io.grpc.version} + + + io.grpc + grpc-protobuf + ${io.grpc.version} + + + io.grpc + grpc-stub + ${io.grpc.version} + + + junit + junit + ${junit.version} + test + + + javax.annotation + javax.annotation-api + 1.2 + + + + + + kr.motd.maven + os-maven-plugin + ${os-maven-plugin.version} + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + ${protobuf-maven-plugin.version} + + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + + + 1.40.1 + 3.17.2 + 1.6.2 + 0.6.1 + \ No newline at end of file diff --git a/grpc/src/main/java/com/baeldung/grpc/streaming/StockClient.java b/grpc/src/main/java/com/baeldung/grpc/streaming/StockClient.java new file mode 100644 index 0000000000..1850c975a2 --- /dev/null +++ b/grpc/src/main/java/com/baeldung/grpc/streaming/StockClient.java @@ -0,0 +1,183 @@ +package com.baeldung.grpc.streaming; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.baeldung.grpc.streaming.StockQuoteProviderGrpc.StockQuoteProviderBlockingStub; +import com.baeldung.grpc.streaming.StockQuoteProviderGrpc.StockQuoteProviderStub; + +import io.grpc.Channel; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +public class StockClient { + private static final Logger logger = Logger.getLogger(StockClient.class.getName()); + + private final StockQuoteProviderBlockingStub blockingStub; + private final StockQuoteProviderStub nonBlockingStub; + private List stocks; + + public StockClient(Channel channel) { + + blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel); + nonBlockingStub = StockQuoteProviderGrpc.newStub(channel); + initializeStocks(); + } + + public void serverSideStreamingListOfStockPrices() { + + logInfo("######START EXAMPLE######: ServerSideStreaming - list of Stock prices from a given stock"); + Stock request = Stock.newBuilder() + .setTickerSymbol("AU") + .setCompanyName("Austich") + .setDescription("server streaming example") + .build(); + Iterator stockQuotes; + try { + logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol()); + stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request); + for (int i = 1; stockQuotes.hasNext(); i++) { + StockQuote stockQuote = stockQuotes.next(); + logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice()); + } + } catch (StatusRuntimeException e) { + logInfo("RPC failed: {0}", e.getStatus()); + } + } + + public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException { + + logInfo("######START EXAMPLE######: ClientSideStreaming - getStatisticsOfStocks from a list of stocks"); + final CountDownLatch finishLatch = new CountDownLatch(1); + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(StockQuote summary) { + logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription()); + } + + @Override + public void onCompleted() { + logInfo("Finished clientSideStreamingGetStatisticsOfStocks"); + finishLatch.countDown(); + } + + @Override + public void onError(Throwable t) { + logWarning("Stock Statistics Failed: {0}", Status.fromThrowable(t)); + finishLatch.countDown(); + } + }; + + StreamObserver requestObserver = nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver); + try { + + for (Stock stock : stocks) { + logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName()); + requestObserver.onNext(stock); + if (finishLatch.getCount() == 0) { + return; + } + } + } catch (RuntimeException e) { + requestObserver.onError(e); + throw e; + } + requestObserver.onCompleted(); + if (!finishLatch.await(1, TimeUnit.MINUTES)) { + logWarning("clientSideStreamingGetStatisticsOfStocks can not finish within 1 minutes"); + } + } + + public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{ + + logInfo("#######START EXAMPLE#######: BidirectionalStreaming - getListsStockQuotes from list of stocks"); + final CountDownLatch finishLatch = new CountDownLatch(1); + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(StockQuote stockQuote) { + logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription()); + } + + @Override + public void onCompleted() { + logInfo("Finished bidirectionalStreamingGetListsStockQuotes"); + finishLatch.countDown(); + } + + @Override + public void onError(Throwable t) { + logWarning("bidirectionalStreamingGetListsStockQuotes Failed: {0}", Status.fromThrowable(t)); + finishLatch.countDown(); + } + }; + StreamObserver requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver); + try { + for (Stock stock : stocks) { + logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName()); + requestObserver.onNext(stock); + Thread.sleep(200); + if (finishLatch.getCount() == 0) { + return; + } + } + } catch (RuntimeException e) { + requestObserver.onError(e); + throw e; + } + requestObserver.onCompleted(); + + if (!finishLatch.await(1, TimeUnit.MINUTES)) { + logWarning("bidirectionalStreamingGetListsStockQuotes can not finish within 1 minute"); + } + + } + + public static void main(String[] args) throws InterruptedException { + String target = "localhost:8980"; + if (args.length > 0) { + target = args[0]; + } + + ManagedChannel channel = ManagedChannelBuilder.forTarget(target) + .usePlaintext() + .build(); + try { + StockClient client = new StockClient(channel); + + client.serverSideStreamingListOfStockPrices(); + + client.clientSideStreamingGetStatisticsOfStocks(); + + client.bidirectionalStreamingGetListsStockQuotes(); + + } finally { + channel.shutdownNow() + .awaitTermination(5, TimeUnit.SECONDS); + } + } + + private void initializeStocks() { + + this.stocks = Arrays.asList(Stock.newBuilder().setTickerSymbol("AU").setCompanyName("Auburn Corp").setDescription("Aptitude Intel").build() + , Stock.newBuilder().setTickerSymbol("BAS").setCompanyName("Bassel Corp").setDescription("Business Intel").build() + , Stock.newBuilder().setTickerSymbol("COR").setCompanyName("Corvine Corp").setDescription("Corporate Intel").build() + , Stock.newBuilder().setTickerSymbol("DIA").setCompanyName("Dialogic Corp").setDescription("Development Intel").build() + , Stock.newBuilder().setTickerSymbol("EUS").setCompanyName("Euskaltel Corp").setDescription("English Intel").build()); + } + + private void logInfo(String msg, Object... params) { + logger.log(Level.INFO, msg, params); + } + + private void logWarning(String msg, Object... params) { + logger.log(Level.WARNING, msg, params); + } +} diff --git a/grpc/src/main/java/com/baeldung/grpc/streaming/StockServer.java b/grpc/src/main/java/com/baeldung/grpc/streaming/StockServer.java new file mode 100644 index 0000000000..f4dc6c39ac --- /dev/null +++ b/grpc/src/main/java/com/baeldung/grpc/streaming/StockServer.java @@ -0,0 +1,147 @@ +package com.baeldung.grpc.streaming; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; + +public class StockServer { + + private static final Logger logger = Logger.getLogger(StockServer.class.getName()); + private final int port; + private final Server server; + + public StockServer(int port) throws IOException { + this.port = port; + server = ServerBuilder.forPort(port) + .addService(new StockService()) + .build(); + } + + public void start() throws IOException { + server.start(); + logger.info("Server started, listening on " + port); + Runtime.getRuntime() + .addShutdownHook(new Thread() { + @Override + public void run() { + System.err.println("shutting down server"); + try { + StockServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("server shutted down"); + } + }); + } + + public void stop() throws InterruptedException { + if (server != null) { + server.shutdown() + .awaitTermination(30, TimeUnit.SECONDS); + } + } + + public static void main(String[] args) throws Exception { + StockServer stockServer = new StockServer(8980); + stockServer.start(); + if (stockServer.server != null) { + stockServer.server.awaitTermination(); + } + } + + private static class StockService extends StockQuoteProviderGrpc.StockQuoteProviderImplBase { + + StockService() { + } + + @Override + public void serverSideStreamingGetListStockQuotes(Stock request, StreamObserver responseObserver) { + + for (int i = 1; i <= 5; i++) { + + StockQuote stockQuote = StockQuote.newBuilder() + .setPrice(fetchStockPriceBid(request)) + .setOfferNumber(i) + .setDescription("Price for stock:" + request.getTickerSymbol()) + .build(); + responseObserver.onNext(stockQuote); + } + responseObserver.onCompleted(); + } + + @Override + public StreamObserver clientSideStreamingGetStatisticsOfStocks(final StreamObserver responseObserver) { + return new StreamObserver() { + int count; + double price = 0.0; + StringBuffer sb = new StringBuffer(); + + @Override + public void onNext(Stock stock) { + count++; + price = +fetchStockPriceBid(stock); + sb.append(":") + .append(stock.getTickerSymbol()); + } + + @Override + public void onCompleted() { + responseObserver.onNext(StockQuote.newBuilder() + .setPrice(price / count) + .setDescription("Statistics-" + sb.toString()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + logger.log(Level.WARNING, "error:{0}", t.getMessage()); + } + }; + } + + @Override + public StreamObserver bidirectionalStreamingGetListsStockQuotes(final StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(Stock request) { + + for (int i = 1; i <= 5; i++) { + + StockQuote stockQuote = StockQuote.newBuilder() + .setPrice(fetchStockPriceBid(request)) + .setOfferNumber(i) + .setDescription("Price for stock:" + request.getTickerSymbol()) + .build(); + responseObserver.onNext(stockQuote); + } + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + logger.log(Level.WARNING, "error:{0}", t.getMessage()); + } + }; + } + } + + private static double fetchStockPriceBid(Stock stock) { + + return stock.getTickerSymbol() + .length() + + ThreadLocalRandom.current() + .nextDouble(-0.1d, 0.1d); + } +} \ No newline at end of file diff --git a/grpc/src/main/proto/stock_quote.proto b/grpc/src/main/proto/stock_quote.proto new file mode 100644 index 0000000000..66891a5008 --- /dev/null +++ b/grpc/src/main/proto/stock_quote.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package stockquote; + +option java_multiple_files = true; +option java_package = "com.baeldung.grpc.streaming"; +option java_outer_classname = "StockQuoteProto"; +option objc_class_prefix = "RTG"; + +//basic setup ... + +service StockQuoteProvider { + + rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {} + + rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {} + + rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {} +} + +message Stock { + string ticker_symbol = 1; + string company_name = 2; + string description = 3; +} + +message StockQuote { + double price = 1; + int32 offer_number = 2; + string description = 3; +}