diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/client/StockClient.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/client/StockClient.java index 6c37c6b763..8a65b77b37 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/client/StockClient.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/client/StockClient.java @@ -6,19 +6,17 @@ import org.springframework.web.reactive.function.client.WebClient.RequestHeaders import com.baeldung.reactive.model.Stock; +import reactor.core.publisher.Flux; + public class StockClient { - public void getStockUpdates(String stockCode) { - WebClient client = WebClient.create("localhost:9111"); - RequestHeadersSpec request = client.get().uri("/rtes/stocks/"+stockCode).accept(MediaType.TEXT_EVENT_STREAM); - request.retrieve().bodyToFlux(Stock.class).toStream().forEach(System.out::println); - } - - public static void main(String[] args) throws InterruptedException { - new StockClient().getStockUpdates("GOOGL"); - - while(true) { - Thread.sleep(1000L); - } - } + public Flux getStockUpdates(String stockCode) { + WebClient client = WebClient.create("localhost:8080"); + RequestHeadersSpec request = client.get() + .uri("/rtes/stocks/" + stockCode) + .accept(MediaType.TEXT_EVENT_STREAM); + return request.retrieve() + .bodyToFlux(Stock.class) + .log(); + } } diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/StockReactiveController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/StockReactiveController.java index 4015d660b9..8f8de79561 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/StockReactiveController.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/StockReactiveController.java @@ -23,16 +23,14 @@ public class StockReactiveController { @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/stocks/{code}") public Flux getStocks(@PathVariable String code) { BigDecimal startingPrice = new BigDecimal("100"); - Flux stockFlux = Flux.fromStream(Stream.generate(() -> new Stock( - code, - new Random().nextBoolean() ? - startingPrice.add(BigDecimal.valueOf(new Random().nextDouble())): - startingPrice.subtract(BigDecimal.valueOf(new Random().nextDouble()))))); + Flux stockFlux = Flux.fromStream(Stream.generate(() -> new Stock(code, getLatestPrice(startingPrice)))); Flux emmitFlux = Flux.interval(Duration.ofSeconds(1)); - return Flux.zip(stockFlux, emmitFlux).map(Tuple2::getT1); + return Flux.zip(stockFlux, emmitFlux) + .map(Tuple2::getT1); } - public static void main(String [] args) { - + private BigDecimal getLatestPrice(BigDecimal startingPrice) { + BigDecimal priceChange = BigDecimal.valueOf(new Random().nextDouble()); + return new Random().nextBoolean() ? startingPrice.add(priceChange) : startingPrice.subtract(priceChange); } }