Merge pull request #3522 from eugenp/reactive-websocket-refactor

Reactive exception handling
This commit is contained in:
Carsten Gräf 2018-01-29 07:15:42 +01:00 committed by GitHub
commit cd64578fbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 26 deletions

View File

@ -1,48 +1,41 @@
package com.baeldung.reactive.websocket;
import org.springframework.web.reactive.socket.WebSocketSession;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.UUID;
import static java.time.LocalDateTime.now;
import static java.util.UUID.randomUUID;
@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
private Flux<Event> eventFlux = Flux.generate(e -> {
Event event = new Event(UUID.randomUUID().toString(), LocalDateTime.now().toString());
e.next(event);
private static final ObjectMapper json = new ObjectMapper();
private Flux<String> eventFlux = Flux.generate(sink -> {
Event event = new Event(randomUUID().toString(), now().toString());
try {
sink.next(json.writeValueAsString(event));
} catch (JsonProcessingException e) {
sink.error(e);
}
});
private Flux<Event> intervalFlux = Flux.interval(Duration.ofMillis(1000L)).zipWith(eventFlux, (time, event) -> event);
private ObjectMapper json = new ObjectMapper();
private Flux<String> intervalFlux = Flux.interval(Duration.ofMillis(1000L))
.zipWith(eventFlux, (time, event) -> event);
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(intervalFlux.map(event -> {
try {
String jsonEvent = json.writeValueAsString(event);
System.out.println(jsonEvent);
return jsonEvent;
} catch (JsonProcessingException e) {
e.printStackTrace();
return "";
}
}).map(webSocketSession::textMessage))
.and(webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).log());
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText).log());
}
}