diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java index 669c212fd3..2e93c0c0dc 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java @@ -1,48 +1,41 @@ package com.baeldung.reactive.websocket; -import org.springframework.web.reactive.socket.WebSocketSession; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; - import org.springframework.stereotype.Component; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; - +import org.springframework.web.reactive.socket.WebSocketSession; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; -import java.time.LocalDateTime; -import java.util.UUID; + +import static java.time.LocalDateTime.now; +import static java.util.UUID.randomUUID; @Component public class ReactiveWebSocketHandler implements WebSocketHandler { - private Flux eventFlux = Flux.generate(e -> { - Event event = new Event(UUID.randomUUID().toString(), LocalDateTime.now().toString()); - e.next(event); + private static final ObjectMapper json = new ObjectMapper(); + + private Flux eventFlux = Flux.generate(sink -> { + Event event = new Event(randomUUID().toString(), now().toString()); + try { + sink.next(json.writeValueAsString(event)); + } catch (JsonProcessingException e) { + sink.error(e); + } }); - private Flux intervalFlux = Flux.interval(Duration.ofMillis(1000L)).zipWith(eventFlux, (time, event) -> event); - - private ObjectMapper json = new ObjectMapper(); + private Flux intervalFlux = Flux.interval(Duration.ofMillis(1000L)) + .zipWith(eventFlux, (time, event) -> event); @Override public Mono handle(WebSocketSession webSocketSession) { - - return webSocketSession.send(intervalFlux.map(event -> { - try { - String jsonEvent = json.writeValueAsString(event); - System.out.println(jsonEvent); - return jsonEvent; - } catch (JsonProcessingException e) { - e.printStackTrace(); - return ""; - } - }).map(webSocketSession::textMessage)) - - .and(webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).log()); + return webSocketSession.send(intervalFlux + .map(webSocketSession::textMessage)) + .and(webSocketSession.receive() + .map(WebSocketMessage::getPayloadAsText).log()); } - }