diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveJavaClientWebSocket.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveJavaClientWebSocket.java index 74e2d7daca..c9a333c044 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveJavaClientWebSocket.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveJavaClientWebSocket.java @@ -2,22 +2,25 @@ package com.baeldung.reactive.websocket; import java.net.URI; import java.time.Duration; -import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.WebSocketClient; import reactor.core.publisher.Mono; -@SpringBootApplication public class ReactiveJavaClientWebSocket { - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException { + WebSocketClient client = new ReactorNettyWebSocketClient(); - client.execute(URI.create("ws://localhost:8080/event-emitter"), session -> session.send(Mono.just(session.textMessage("event-me-from-spring-reactive-client"))) + client.execute( + URI.create("ws://localhost:8080/event-emitter"), + session -> session.send( + Mono.just(session.textMessage("event-spring-reactive-client-websocket"))) .thenMany(session.receive() - .map(WebSocketMessage::getPayloadAsText) - .log()) + .map(WebSocketMessage::getPayloadAsText) + .log()) .then()) .block(Duration.ofSeconds(10L)); } + } diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java index 7f74e714f6..669c212fd3 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java @@ -12,7 +12,6 @@ import org.springframework.web.reactive.socket.WebSocketMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import javax.annotation.PostConstruct; import java.time.Duration; import java.time.LocalDateTime; import java.util.UUID; @@ -20,37 +19,18 @@ import java.util.UUID; @Component public class ReactiveWebSocketHandler implements WebSocketHandler { - private Flux eventFlux; - private Flux intervalFlux; + private Flux eventFlux = Flux.generate(e -> { + Event event = new Event(UUID.randomUUID().toString(), LocalDateTime.now().toString()); + e.next(event); + }); - /** - * Here we prepare a Flux that will emit a message every second - */ - @PostConstruct - private void init() throws InterruptedException { + private Flux intervalFlux = Flux.interval(Duration.ofMillis(1000L)).zipWith(eventFlux, (time, event) -> event); - eventFlux = Flux.generate(e -> { - Event event = new Event(UUID.randomUUID() - .toString(), - LocalDateTime.now() - .toString()); - e.next(event); - }); + private ObjectMapper json = new ObjectMapper(); - intervalFlux = Flux.interval(Duration.ofMillis(1000L)) - .zipWith(eventFlux, (time, event) -> event); - - } - - /** - * On each new client session, send the message flux to the client. - * Spring subscribes to the flux and send every new flux event to the WebSocketSession object - * @param session - * @return Mono - */ @Override public Mono handle(WebSocketSession webSocketSession) { - ObjectMapper json = new ObjectMapper(); + return webSocketSession.send(intervalFlux.map(event -> { try { String jsonEvent = json.writeValueAsString(event); @@ -60,12 +40,9 @@ public class ReactiveWebSocketHandler implements WebSocketHandler { e.printStackTrace(); return ""; } - }) - .map(webSocketSession::textMessage)) + }).map(webSocketSession::textMessage)) - .and(webSocketSession.receive() - .map(WebSocketMessage::getPayloadAsText) - .log()); + .and(webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).log()); } }