diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/controller/EventController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/controller/EventController.java new file mode 100644 index 0000000000..e692aa3a74 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/controller/EventController.java @@ -0,0 +1,26 @@ +package com.baeldung.reactive.sse.controller; + + +import com.baeldung.reactive.sse.service.EventSubscriptionsService; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; + +@RestController +public class EventController { + + private EventSubscriptionsService eventSubscriptionsService; + + public EventController(EventSubscriptionsService eventSubscriptionsService) { + this.eventSubscriptionsService = eventSubscriptionsService; + } + + @GetMapping( + produces = MediaType.TEXT_EVENT_STREAM_VALUE, + value = "/sse/events") + public Flux events() { + return eventSubscriptionsService.subscribe(); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/model/EventSubscription.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/model/EventSubscription.java new file mode 100644 index 0000000000..4d3ce27156 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/model/EventSubscription.java @@ -0,0 +1,22 @@ +package com.baeldung.reactive.sse.model; + +import org.springframework.http.codec.ServerSentEvent; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; + +public class EventSubscription { + + private DirectProcessor directProcessor; + + public EventSubscription() { + this.directProcessor = DirectProcessor.create(); + } + + public void emit(ServerSentEvent e) { + directProcessor.onNext(e); + } + + public Flux subscribe() { + return directProcessor; + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/service/EventSubscriptionsService.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/service/EventSubscriptionsService.java new file mode 100644 index 0000000000..f245ce6184 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/service/EventSubscriptionsService.java @@ -0,0 +1,41 @@ +package com.baeldung.reactive.sse.service; + +import com.baeldung.reactive.sse.model.EventSubscription; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +@Service +public class EventSubscriptionsService { + + private List listeners; + + public EventSubscriptionsService() { + this.listeners = new ArrayList<>(); + } + + public Flux subscribe() { + EventSubscription e = new EventSubscription(); + listeners.add(e); + + return e.subscribe(); + } + + public void sendDateEvent(Date date) { + for (EventSubscription e : listeners) { + try { + e.emit(ServerSentEvent.builder(date) + .event("date") + .id(UUID.randomUUID().toString()) + .build()); + } catch (Exception ex) { + listeners.remove(e); + } + } + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/service/emitter/DateEmitterService.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/service/emitter/DateEmitterService.java new file mode 100644 index 0000000000..29b70865dc --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/sse/service/emitter/DateEmitterService.java @@ -0,0 +1,27 @@ +package com.baeldung.reactive.sse.service.emitter; + +import com.baeldung.reactive.sse.service.EventSubscriptionsService; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; + +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.util.Date; +import java.util.stream.Stream; + +@Service +public class DateEmitterService { + + private EventSubscriptionsService eventSubscriptionsService; + + public DateEmitterService(EventSubscriptionsService eventSubscriptionsService) { + this.eventSubscriptionsService = eventSubscriptionsService; + } + + @PostConstruct + public void init() { + Flux.fromStream(Stream.generate(Date::new)) + .delayElements(Duration.ofSeconds(1)) + .subscribe(data -> eventSubscriptionsService.sendDateEvent(new Date())); + } +} diff --git a/spring-5-reactive/src/main/resources/static/app.js b/spring-5-reactive/src/main/resources/static/app.js new file mode 100644 index 0000000000..2af2ae5844 --- /dev/null +++ b/spring-5-reactive/src/main/resources/static/app.js @@ -0,0 +1,36 @@ +let Clock = React.createClass({ + + getInitialState: function() { + const self = this; + const ev = new EventSource("http://localhost:8080/sse/events"); + + self.setState({currentDate : moment(new Date()).format("MMMM Do YYYY, h:mm:ss a")}); + + ev.addEventListener("date", function(e) { + self.setState({currentDate : moment(JSON.parse(e.data).date).format("MMMM Do YYYY, h:mm:ss a")}); + }, false); + + return {currentDate: moment(new Date()).format("MMMM Do YYYY, h:mm:ss a") }; + }, + + render() { + return ( +

+ Current time: {this.state.currentDate} +

+ ); + } + +}); + +let App = React.createClass({ + render() { + return ( +
+ +
+ ); + } +}); + +ReactDOM.render(, document.getElementById('root') ); \ No newline at end of file diff --git a/spring-5-reactive/src/main/resources/static/sse-index.html b/spring-5-reactive/src/main/resources/static/sse-index.html new file mode 100644 index 0000000000..875c8176af --- /dev/null +++ b/spring-5-reactive/src/main/resources/static/sse-index.html @@ -0,0 +1,19 @@ + + + + + React + Spring + + + +
+ + + + + + + + + + \ No newline at end of file diff --git a/spring-5-reactive/src/test/java/com/baeldung/reactive/sse/ServerSentEventsTest.java b/spring-5-reactive/src/test/java/com/baeldung/reactive/sse/ServerSentEventsTest.java new file mode 100644 index 0000000000..48e8c23c37 --- /dev/null +++ b/spring-5-reactive/src/test/java/com/baeldung/reactive/sse/ServerSentEventsTest.java @@ -0,0 +1,39 @@ +package com.baeldung.reactive.sse; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.reactive.server.WebTestClient; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class ServerSentEventsTest { + + @Autowired + private WebTestClient webClient; + + @Test + public void contextLoads() { + } + + @Test + public void givenValidRequest_shouldReceiveOk() throws Exception { + + webClient.get().uri("/events").accept(MediaType.TEXT_EVENT_STREAM) + .exchange() + .expectStatus().isOk(); + } + + @Test + public void givenInvalidHttpVerb_shouldReceiveMethodNotAllowedError() throws Exception { + + webClient.post().uri("/events").accept(MediaType.TEXT_EVENT_STREAM) + .exchange() + .expectStatus().isEqualTo(HttpStatus.METHOD_NOT_ALLOWED); + } + +}