From ffce600b8dfdf5df7943cc3d2957d1cd232b6273 Mon Sep 17 00:00:00 2001 From: rozagerardo Date: Mon, 3 Sep 2018 12:32:50 -0300 Subject: [PATCH] [BAEL-2114] spring-5-reactive & spring-5-mvc | Server Sent Events in Spring (#5146) * * added spring 5 reactive examples * * added MVC example * added spring 5 webflux test --- .../baeldung/web/SseEmitterController.java | 29 ++++++- spring-5-reactive/pom.xml | 7 ++ .../consumer/ConsumerSSEApplication.java | 19 +++++ .../consumer/controller/ClientController.java | 83 +++++++++++++++++++ .../server/ServerSSEApplication.java | 17 ++++ .../server/controllers/ServerController.java | 35 ++++++++ .../ServiceSentEventLiveTest.java | 49 +++++++++++ 7 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java create mode 100644 spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java diff --git a/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java b/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java index b11c93fb08..00113c5ff7 100644 --- a/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java +++ b/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java @@ -1,12 +1,16 @@ package com.baeldung.web; -import com.baeldung.Constants; +import java.time.LocalTime; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder; + +import com.baeldung.Constants; @Controller public class SseEmitterController { @@ -29,4 +33,27 @@ public class SseEmitterController { return emitter; } + @GetMapping("/stream-sse-mvc") + public SseEmitter streamSseMvc() { + SseEmitter emitter = new SseEmitter(); + ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor(); + + sseMvcExecutor.execute(() -> { + try { + for (int i = 0; true; i++) { + SseEventBuilder event = SseEmitter.event() + .data("SSE MVC - " + LocalTime.now() + .toString()) + .id(String.valueOf(i)) + .name("sse event - mvc"); + emitter.send(event); + Thread.sleep(1000); + } + } catch (Exception ex) { + emitter.completeWithError(ex); + } + }); + return emitter; + } + } diff --git a/spring-5-reactive/pom.xml b/spring-5-reactive/pom.xml index e81d3d8b79..5f455c3906 100644 --- a/spring-5-reactive/pom.xml +++ b/spring-5-reactive/pom.xml @@ -94,6 +94,12 @@ ${project-reactor-test} test + + org.junit.platform + junit-platform-runner + ${junit.platform.version} + test + @@ -117,6 +123,7 @@ 1.0 4.1 3.1.6.RELEASE + 1.2.0 diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java new file mode 100644 index 0000000000..3997607ef0 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java @@ -0,0 +1,19 @@ +package com.baeldung.reactive.serversentevents.consumer; + +import java.util.Collections; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; + +@SpringBootApplication +@EnableAsync +public class ConsumerSSEApplication { + + public static void main(String[] args) { + SpringApplication app = new SpringApplication(ConsumerSSEApplication.class); + app.setDefaultProperties(Collections.singletonMap("server.port", "8082")); + app.run(args); + } + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java new file mode 100644 index 0000000000..69a6bc396c --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java @@ -0,0 +1,83 @@ +package com.baeldung.reactive.serversentevents.consumer.controller; + +import java.time.LocalTime; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.scheduling.annotation.Async; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.function.client.WebClient; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequestMapping("/sse-consumer") +public class ClientController { + + private static Logger logger = LoggerFactory.getLogger(ClientController.class); + private WebClient client = WebClient.create("http://localhost:8081/sse-server"); + + @GetMapping("/launch-sse-client") + public String launchSSEFromSSEWebClient() { + consumeSSE(); + return "LAUNCHED EVENT CLIENT!!! Check the logs..."; + } + + @GetMapping("/launch-flux-client") + public String launchcFluxFromSSEWebClient() { + consumeFlux(); + return "LAUNCHED EVENT CLIENT!!! Check the logs..."; + } + + @GetMapping("/launch-sse-from-flux-endpoint-client") + public String launchFluxFromFluxWebClient() { + consumeSSEFromFluxEndpoint(); + return "LAUNCHED EVENT CLIENT!!! Check the logs..."; + } + + @Async + public void consumeSSE() { + ParameterizedTypeReference> type = new ParameterizedTypeReference>() { + }; + + Flux> eventStream = client.get() + .uri("/stream-sse") + .retrieve() + .bodyToFlux(type); + + eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error), + () -> logger.info("Completed!!!")); + } + + @Async + public void consumeFlux() { + Flux stringStream = client.get() + .uri("/stream-flux") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(String.class); + + stringStream.subscribe(content -> logger.info("Current time: {} - Received content: {} ", LocalTime.now(), content), error -> logger.error("Error retrieving content: {}", error), () -> logger.info("Completed!!!")); + } + + @Async + public void consumeSSEFromFluxEndpoint() { + ParameterizedTypeReference> type = new ParameterizedTypeReference>() { + }; + + Flux> eventStream = client.get() + .uri("/stream-flux") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(type); + + eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error), + () -> logger.info("Completed!!!")); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java new file mode 100644 index 0000000000..2750e6616d --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java @@ -0,0 +1,17 @@ +package com.baeldung.reactive.serversentevents.server; + +import java.util.Collections; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ServerSSEApplication { + + public static void main(String[] args) { + SpringApplication app = new SpringApplication(ServerSSEApplication.class); + app.setDefaultProperties(Collections.singletonMap("server.port", "8081")); + app.run(args); + } + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java new file mode 100644 index 0000000000..1ad8e848cf --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java @@ -0,0 +1,35 @@ +package com.baeldung.reactive.serversentevents.server.controllers; + +import java.time.Duration; +import java.time.LocalTime; + +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import reactor.core.publisher.Flux; + +@RestController +@RequestMapping("/sse-server") +public class ServerController { + + @GetMapping("/stream-sse") + public Flux> streamEvents() { + return Flux.interval(Duration.ofSeconds(1)) + .map(sequence -> ServerSentEvent. builder() + .id(String.valueOf(sequence)) + .event("periodic-event") + .data("SSE - " + LocalTime.now() + .toString()) + .build()); + } + + @GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux streamFlux() { + return Flux.interval(Duration.ofSeconds(1)) + .map(sequence -> "Flux - " + LocalTime.now() + .toString()); + } +} diff --git a/spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java b/spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java new file mode 100644 index 0000000000..53f4a3b1bb --- /dev/null +++ b/spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java @@ -0,0 +1,49 @@ +package com.baeldung.reactive.serversentsevents; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.web.reactive.server.WebTestClient; + +@RunWith(JUnitPlatform.class) +@SpringBootTest +public class ServiceSentEventLiveTest { + + private WebTestClient client = WebTestClient.bindToServer() + .baseUrl("http://localhost:8081/sse-server") + .build(); + + @Test + public void whenSSEEndpointIsCalled_thenEventStreamingBegins() { + + Executable sseStreamingCall = () -> client.get() + .uri("/stream-sse") + .exchange() + .expectStatus() + .isOk() + .expectHeader() + .contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM) + .expectBody(String.class); + + Assertions.assertThrows(IllegalStateException.class, sseStreamingCall, "Expected test to timeout and throw IllegalStateException, but it didn't"); + } + + @Test + public void whenFluxEndpointIsCalled_thenEventStreamingBegins() { + + Executable sseStreamingCall = () -> client.get() + .uri("/stream-flux") + .exchange() + .expectStatus() + .isOk() + .expectHeader() + .contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM) + .expectBody(String.class); + + Assertions.assertThrows(IllegalStateException.class, sseStreamingCall, "Expected test to timeout and throw IllegalStateException, but it didn't"); + } +}