Bael 5119 streaming in g rpc (#11215)

* Commit source code to branch

* BAEL-5065 improvement of groupBy with complex key

* Streaming in gRPC
This commit is contained in:
mbarriola 2021-09-18 04:43:16 -04:00 committed by GitHub
parent 3260466f5b
commit 2c837e030f
4 changed files with 437 additions and 75 deletions

View File

@ -17,7 +17,8 @@
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<artifactId>grpc-netty-shaded</artifactId>
<scope>runtime</scope>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
@ -36,6 +37,11 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
<build>
@ -52,13 +58,9 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
</pluginArtifact>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
@ -71,11 +73,10 @@
</plugin>
</plugins>
</build>
<properties>
<io.grpc.version>1.16.1</io.grpc.version>
<os-maven-plugin.version>1.6.1</os-maven-plugin.version>
<io.grpc.version>1.40.1</io.grpc.version>
<protoc.version>3.17.2</protoc.version>
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
</properties>
</project>

View File

@ -0,0 +1,183 @@
package com.baeldung.grpc.streaming;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.baeldung.grpc.streaming.StockQuoteProviderGrpc.StockQuoteProviderBlockingStub;
import com.baeldung.grpc.streaming.StockQuoteProviderGrpc.StockQuoteProviderStub;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
public class StockClient {
private static final Logger logger = Logger.getLogger(StockClient.class.getName());
private final StockQuoteProviderBlockingStub blockingStub;
private final StockQuoteProviderStub nonBlockingStub;
private List<Stock> stocks;
public StockClient(Channel channel) {
blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
initializeStocks();
}
public void serverSideStreamingListOfStockPrices() {
logInfo("######START EXAMPLE######: ServerSideStreaming - list of Stock prices from a given stock");
Stock request = Stock.newBuilder()
.setTickerSymbol("AU")
.setCompanyName("Austich")
.setDescription("server streaming example")
.build();
Iterator<StockQuote> stockQuotes;
try {
logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
for (int i = 1; stockQuotes.hasNext(); i++) {
StockQuote stockQuote = stockQuotes.next();
logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
}
} catch (StatusRuntimeException e) {
logInfo("RPC failed: {0}", e.getStatus());
}
}
public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException {
logInfo("######START EXAMPLE######: ClientSideStreaming - getStatisticsOfStocks from a list of stocks");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote summary) {
logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
finishLatch.countDown();
}
@Override
public void onError(Throwable t) {
logWarning("Stock Statistics Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
};
StreamObserver<Stock> requestObserver = nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
if (finishLatch.getCount() == 0) {
return;
}
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logWarning("clientSideStreamingGetStatisticsOfStocks can not finish within 1 minutes");
}
}
public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{
logInfo("#######START EXAMPLE#######: BidirectionalStreaming - getListsStockQuotes from list of stocks");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote stockQuote) {
logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
finishLatch.countDown();
}
@Override
public void onError(Throwable t) {
logWarning("bidirectionalStreamingGetListsStockQuotes Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
};
StreamObserver<Stock> requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
Thread.sleep(200);
if (finishLatch.getCount() == 0) {
return;
}
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logWarning("bidirectionalStreamingGetListsStockQuotes can not finish within 1 minute");
}
}
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
target = args[0];
}
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
.usePlaintext()
.build();
try {
StockClient client = new StockClient(channel);
client.serverSideStreamingListOfStockPrices();
client.clientSideStreamingGetStatisticsOfStocks();
client.bidirectionalStreamingGetListsStockQuotes();
} finally {
channel.shutdownNow()
.awaitTermination(5, TimeUnit.SECONDS);
}
}
private void initializeStocks() {
this.stocks = Arrays.asList(Stock.newBuilder().setTickerSymbol("AU").setCompanyName("Auburn Corp").setDescription("Aptitude Intel").build()
, Stock.newBuilder().setTickerSymbol("BAS").setCompanyName("Bassel Corp").setDescription("Business Intel").build()
, Stock.newBuilder().setTickerSymbol("COR").setCompanyName("Corvine Corp").setDescription("Corporate Intel").build()
, Stock.newBuilder().setTickerSymbol("DIA").setCompanyName("Dialogic Corp").setDescription("Development Intel").build()
, Stock.newBuilder().setTickerSymbol("EUS").setCompanyName("Euskaltel Corp").setDescription("English Intel").build());
}
private void logInfo(String msg, Object... params) {
logger.log(Level.INFO, msg, params);
}
private void logWarning(String msg, Object... params) {
logger.log(Level.WARNING, msg, params);
}
}

View File

@ -0,0 +1,147 @@
package com.baeldung.grpc.streaming;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
public class StockServer {
private static final Logger logger = Logger.getLogger(StockServer.class.getName());
private final int port;
private final Server server;
public StockServer(int port) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port)
.addService(new StockService())
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime()
.addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("shutting down server");
try {
StockServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("server shutted down");
}
});
}
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown()
.awaitTermination(30, TimeUnit.SECONDS);
}
}
public static void main(String[] args) throws Exception {
StockServer stockServer = new StockServer(8980);
stockServer.start();
if (stockServer.server != null) {
stockServer.server.awaitTermination();
}
}
private static class StockService extends StockQuoteProviderGrpc.StockQuoteProviderImplBase {
StockService() {
}
@Override
public void serverSideStreamingGetListStockQuotes(Stock request, StreamObserver<StockQuote> responseObserver) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
responseObserver.onCompleted();
}
@Override
public StreamObserver<Stock> clientSideStreamingGetStatisticsOfStocks(final StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
int count;
double price = 0.0;
StringBuffer sb = new StringBuffer();
@Override
public void onNext(Stock stock) {
count++;
price = +fetchStockPriceBid(stock);
sb.append(":")
.append(stock.getTickerSymbol());
}
@Override
public void onCompleted() {
responseObserver.onNext(StockQuote.newBuilder()
.setPrice(price / count)
.setDescription("Statistics-" + sb.toString())
.build());
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "error:{0}", t.getMessage());
}
};
}
@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(final StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
@Override
public void onNext(Stock request) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "error:{0}", t.getMessage());
}
};
}
}
private static double fetchStockPriceBid(Stock stock) {
return stock.getTickerSymbol()
.length()
+ ThreadLocalRandom.current()
.nextDouble(-0.1d, 0.1d);
}
}

View File

@ -0,0 +1,31 @@
syntax = "proto3";
package stockquote;
option java_multiple_files = true;
option java_package = "com.baeldung.grpc.streaming";
option java_outer_classname = "StockQuoteProto";
option objc_class_prefix = "RTG";
//basic setup ...
service StockQuoteProvider {
rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}
rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {}
rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {}
}
message Stock {
string ticker_symbol = 1;
string company_name = 2;
string description = 3;
}
message StockQuote {
double price = 1;
int32 offer_number = 2;
string description = 3;
}