From d1b8f9b9b35ee84409571844317474295e7a6b4f Mon Sep 17 00:00:00 2001 From: tritty Date: Wed, 9 May 2018 03:58:04 +0530 Subject: [PATCH] Bean Object, server side and client side example for event streaming example --- .../controller/StockBrokerController.java | 26 ++++++++++++ .../com/baeldung/reactive/model/Stock.java | 18 ++++++++ .../reactive/webclient/StockClient.java | 42 +++++++++++++++++++ 3 files changed, 86 insertions(+) create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/controller/StockBrokerController.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/model/Stock.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/webclient/StockClient.java diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/StockBrokerController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/StockBrokerController.java new file mode 100644 index 0000000000..f904943a02 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/StockBrokerController.java @@ -0,0 +1,26 @@ +package com.baeldung.reactive.controller; + +import java.time.Duration; +import java.util.Date; +import java.util.Random; +import java.util.stream.Stream; + +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import com.baeldung.reactive.model.Stock; + +import reactor.core.publisher.Flux; + +@RestController +public class StockBrokerController { + + @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/getStockPrice") + public Flux getStockUpdates() { + final Flux stockFlux = Flux.fromStream(Stream.generate(() -> new Stock(new Random().nextFloat(), "WebFluxStock", new Date()))); + final Flux intervalFlux = Flux.interval(Duration.ofSeconds(1)); + return Flux.zip(stockFlux, intervalFlux) + .map(t1 -> t1.getT1()); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/model/Stock.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/model/Stock.java new file mode 100644 index 0000000000..959c8de685 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/model/Stock.java @@ -0,0 +1,18 @@ +package com.baeldung.reactive.model; + +import java.util.Date; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@AllArgsConstructor +@Data +@NoArgsConstructor +public class Stock { + + private float price; + private String name; + private Date date; + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webclient/StockClient.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webclient/StockClient.java new file mode 100644 index 0000000000..d86660dd90 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/webclient/StockClient.java @@ -0,0 +1,42 @@ +package com.baeldung.reactive.webclient; + +import java.util.Collections; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.WebClient; + +import com.baeldung.reactive.model.Stock; + +@SpringBootApplication +public class StockClient { + + @Bean + WebClient webClient() { + return WebClient.builder() + .baseUrl("http://localhost:8080/getStockPrice") + .build(); + } + + @Bean + CommandLineRunner runner(WebClient webClient) { + return args -> { + webClient.get() + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(Stock.class) + .log() + .subscribe(System.out::println); + }; + } + + public static void main(String args[]) throws InterruptedException { + new SpringApplicationBuilder(StockClient.class).properties(Collections.singletonMap("server.port", "9090")) + .run(args); + + } + +}