[BAEL-2114] spring-5-reactive & spring-5-mvc | Server Sent Events in Spring (#5146)

* * added spring 5 reactive examples

* * added MVC example
* added spring 5 webflux test
This commit is contained in:
rozagerardo 2018-09-03 12:32:50 -03:00 committed by maibin
parent 23a971073f
commit ffce600b8d
7 changed files with 238 additions and 1 deletions

View File

@ -1,12 +1,16 @@
package com.baeldung.web;
import com.baeldung.Constants;
import java.time.LocalTime;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;
import com.baeldung.Constants;
@Controller
public class SseEmitterController {
@ -29,4 +33,27 @@ public class SseEmitterController {
return emitter;
}
@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int i = 0; true; i++) {
SseEventBuilder event = SseEmitter.event()
.data("SSE MVC - " + LocalTime.now()
.toString())
.id(String.valueOf(i))
.name("sse event - mvc");
emitter.send(event);
Thread.sleep(1000);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
}

View File

@ -94,6 +94,12 @@
<version>${project-reactor-test}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<version>${junit.platform.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -117,6 +123,7 @@
<geronimo-json_1.1_spec.version>1.0</geronimo-json_1.1_spec.version>
<commons-collections4.version>4.1</commons-collections4.version>
<project-reactor-test>3.1.6.RELEASE</project-reactor-test>
<junit.platform.version>1.2.0</junit.platform.version>
</properties>
</project>

View File

@ -0,0 +1,19 @@
package com.baeldung.reactive.serversentevents.consumer;
import java.util.Collections;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class ConsumerSSEApplication {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(ConsumerSSEApplication.class);
app.setDefaultProperties(Collections.singletonMap("server.port", "8082"));
app.run(args);
}
}

View File

@ -0,0 +1,83 @@
package com.baeldung.reactive.serversentevents.consumer.controller;
import java.time.LocalTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/sse-consumer")
public class ClientController {
private static Logger logger = LoggerFactory.getLogger(ClientController.class);
private WebClient client = WebClient.create("http://localhost:8081/sse-server");
@GetMapping("/launch-sse-client")
public String launchSSEFromSSEWebClient() {
consumeSSE();
return "LAUNCHED EVENT CLIENT!!! Check the logs...";
}
@GetMapping("/launch-flux-client")
public String launchcFluxFromSSEWebClient() {
consumeFlux();
return "LAUNCHED EVENT CLIENT!!! Check the logs...";
}
@GetMapping("/launch-sse-from-flux-endpoint-client")
public String launchFluxFromFluxWebClient() {
consumeSSEFromFluxEndpoint();
return "LAUNCHED EVENT CLIENT!!! Check the logs...";
}
@Async
public void consumeSSE() {
ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<ServerSentEvent<String>>() {
};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
@Async
public void consumeFlux() {
Flux<String> stringStream = client.get()
.uri("/stream-flux")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
stringStream.subscribe(content -> logger.info("Current time: {} - Received content: {} ", LocalTime.now(), content), error -> logger.error("Error retrieving content: {}", error), () -> logger.info("Completed!!!"));
}
@Async
public void consumeSSEFromFluxEndpoint() {
ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<ServerSentEvent<String>>() {
};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-flux")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
}

View File

@ -0,0 +1,17 @@
package com.baeldung.reactive.serversentevents.server;
import java.util.Collections;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ServerSSEApplication {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(ServerSSEApplication.class);
app.setDefaultProperties(Collections.singletonMap("server.port", "8081"));
app.run(args);
}
}

View File

@ -0,0 +1,35 @@
package com.baeldung.reactive.serversentevents.server.controllers;
import java.time.Duration;
import java.time.LocalTime;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/sse-server")
public class ServerController {
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now()
.toString())
.build());
}
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now()
.toString());
}
}

View File

@ -0,0 +1,49 @@
package com.baeldung.reactive.serversentsevents;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
@RunWith(JUnitPlatform.class)
@SpringBootTest
public class ServiceSentEventLiveTest {
private WebTestClient client = WebTestClient.bindToServer()
.baseUrl("http://localhost:8081/sse-server")
.build();
@Test
public void whenSSEEndpointIsCalled_thenEventStreamingBegins() {
Executable sseStreamingCall = () -> client.get()
.uri("/stream-sse")
.exchange()
.expectStatus()
.isOk()
.expectHeader()
.contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
.expectBody(String.class);
Assertions.assertThrows(IllegalStateException.class, sseStreamingCall, "Expected test to timeout and throw IllegalStateException, but it didn't");
}
@Test
public void whenFluxEndpointIsCalled_thenEventStreamingBegins() {
Executable sseStreamingCall = () -> client.get()
.uri("/stream-flux")
.exchange()
.expectStatus()
.isOk()
.expectHeader()
.contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
.expectBody(String.class);
Assertions.assertThrows(IllegalStateException.class, sseStreamingCall, "Expected test to timeout and throw IllegalStateException, but it didn't");
}
}