Server Sent Events example using Spring Webflux and React
This commit is contained in:
parent
3cab703646
commit
429bbbbaf1
|
@ -0,0 +1,26 @@
|
|||
package com.baeldung.reactive.sse.controller;
|
||||
|
||||
|
||||
import com.baeldung.reactive.sse.service.EventSubscriptionsService;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.codec.ServerSentEvent;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@RestController
|
||||
public class EventController {
|
||||
|
||||
private EventSubscriptionsService eventSubscriptionsService;
|
||||
|
||||
public EventController(EventSubscriptionsService eventSubscriptionsService) {
|
||||
this.eventSubscriptionsService = eventSubscriptionsService;
|
||||
}
|
||||
|
||||
@GetMapping(
|
||||
produces = MediaType.TEXT_EVENT_STREAM_VALUE,
|
||||
value = "/sse/events")
|
||||
public Flux<ServerSentEvent> events() {
|
||||
return eventSubscriptionsService.subscribe();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package com.baeldung.reactive.sse.model;
|
||||
|
||||
import org.springframework.http.codec.ServerSentEvent;
|
||||
import reactor.core.publisher.DirectProcessor;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class EventSubscription {
|
||||
|
||||
private DirectProcessor<ServerSentEvent> directProcessor;
|
||||
|
||||
public EventSubscription() {
|
||||
this.directProcessor = DirectProcessor.create();
|
||||
}
|
||||
|
||||
public void emit(ServerSentEvent e) {
|
||||
directProcessor.onNext(e);
|
||||
}
|
||||
|
||||
public Flux<ServerSentEvent> subscribe() {
|
||||
return directProcessor;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package com.baeldung.reactive.sse.service;
|
||||
|
||||
import com.baeldung.reactive.sse.model.EventSubscription;
|
||||
import org.springframework.http.codec.ServerSentEvent;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Service
|
||||
public class EventSubscriptionsService {
|
||||
|
||||
private List<EventSubscription> listeners;
|
||||
|
||||
public EventSubscriptionsService() {
|
||||
this.listeners = new ArrayList<>();
|
||||
}
|
||||
|
||||
public Flux<ServerSentEvent> subscribe() {
|
||||
EventSubscription e = new EventSubscription();
|
||||
listeners.add(e);
|
||||
|
||||
return e.subscribe();
|
||||
}
|
||||
|
||||
public void sendDateEvent(Date date) {
|
||||
for (EventSubscription e : listeners) {
|
||||
try {
|
||||
e.emit(ServerSentEvent.builder(date)
|
||||
.event("date")
|
||||
.id(UUID.randomUUID().toString())
|
||||
.build());
|
||||
} catch (Exception ex) {
|
||||
listeners.remove(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package com.baeldung.reactive.sse.service.emitter;
|
||||
|
||||
import com.baeldung.reactive.sse.service.EventSubscriptionsService;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.time.Duration;
|
||||
import java.util.Date;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Service
|
||||
public class DateEmitterService {
|
||||
|
||||
private EventSubscriptionsService eventSubscriptionsService;
|
||||
|
||||
public DateEmitterService(EventSubscriptionsService eventSubscriptionsService) {
|
||||
this.eventSubscriptionsService = eventSubscriptionsService;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
Flux.fromStream(Stream.generate(Date::new))
|
||||
.delayElements(Duration.ofSeconds(1))
|
||||
.subscribe(data -> eventSubscriptionsService.sendDateEvent(new Date()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
let Clock = React.createClass({
|
||||
|
||||
getInitialState: function() {
|
||||
const self = this;
|
||||
const ev = new EventSource("http://localhost:8080/sse/events");
|
||||
|
||||
self.setState({currentDate : moment(new Date()).format("MMMM Do YYYY, h:mm:ss a")});
|
||||
|
||||
ev.addEventListener("date", function(e) {
|
||||
self.setState({currentDate : moment(JSON.parse(e.data).date).format("MMMM Do YYYY, h:mm:ss a")});
|
||||
}, false);
|
||||
|
||||
return {currentDate: moment(new Date()).format("MMMM Do YYYY, h:mm:ss a") };
|
||||
},
|
||||
|
||||
render() {
|
||||
return (
|
||||
<p>
|
||||
Current time: {this.state.currentDate}
|
||||
</p>
|
||||
);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
let App = React.createClass({
|
||||
render() {
|
||||
return (
|
||||
<div>
|
||||
<Clock/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
ReactDOM.render(<App />, document.getElementById('root') );
|
|
@ -0,0 +1,19 @@
|
|||
<!DOCTYPE html>
|
||||
<html>
|
||||
|
||||
<head>
|
||||
<title>React + Spring</title>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<div id="root"></div>
|
||||
|
||||
<script src="https://unpkg.com/react@15/dist/react.min.js"></script>
|
||||
<script src="https://unpkg.com/react-dom@15/dist/react-dom.min.js"></script>
|
||||
<script src="https://cdnjs.cloudflare.com/ajax/libs/babel-core/5.8.24/browser.min.js"></script>
|
||||
<script src="https://unpkg.com/moment@2.22.2/moment.js"></script>
|
||||
|
||||
<script type="text/babel" src="app.js"></script>
|
||||
</body>
|
||||
|
||||
</html>
|
|
@ -0,0 +1,39 @@
|
|||
package com.baeldung.reactive.sse;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.test.web.reactive.server.WebTestClient;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||
public class ServerSentEventsTest {
|
||||
|
||||
@Autowired
|
||||
private WebTestClient webClient;
|
||||
|
||||
@Test
|
||||
public void contextLoads() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenValidRequest_shouldReceiveOk() throws Exception {
|
||||
|
||||
webClient.get().uri("/events").accept(MediaType.TEXT_EVENT_STREAM)
|
||||
.exchange()
|
||||
.expectStatus().isOk();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenInvalidHttpVerb_shouldReceiveMethodNotAllowedError() throws Exception {
|
||||
|
||||
webClient.post().uri("/events").accept(MediaType.TEXT_EVENT_STREAM)
|
||||
.exchange()
|
||||
.expectStatus().isEqualTo(HttpStatus.METHOD_NOT_ALLOWED);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue