From 3e5ac1de94aa9c139ecc433898265bc29dd69abe Mon Sep 17 00:00:00 2001 From: felipeazv Date: Mon, 22 Jan 2018 22:09:57 +0100 Subject: [PATCH 1/2] BAEL-1475: code formatting --- .../ReactiveJavaClientWebSocket.java | 15 ++++---- .../websocket/ReactiveWebSocketHandler.java | 35 +++++-------------- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveJavaClientWebSocket.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveJavaClientWebSocket.java index 74e2d7daca..c9a333c044 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveJavaClientWebSocket.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveJavaClientWebSocket.java @@ -2,22 +2,25 @@ package com.baeldung.reactive.websocket; import java.net.URI; import java.time.Duration; -import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.WebSocketClient; import reactor.core.publisher.Mono; -@SpringBootApplication public class ReactiveJavaClientWebSocket { - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException { + WebSocketClient client = new ReactorNettyWebSocketClient(); - client.execute(URI.create("ws://localhost:8080/event-emitter"), session -> session.send(Mono.just(session.textMessage("event-me-from-spring-reactive-client"))) + client.execute( + URI.create("ws://localhost:8080/event-emitter"), + session -> session.send( + Mono.just(session.textMessage("event-spring-reactive-client-websocket"))) .thenMany(session.receive() - .map(WebSocketMessage::getPayloadAsText) - .log()) + .map(WebSocketMessage::getPayloadAsText) + .log()) .then()) .block(Duration.ofSeconds(10L)); } + } diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java index 7f74e714f6..eea0fb9962 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java @@ -12,7 +12,6 @@ import org.springframework.web.reactive.socket.WebSocketMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import javax.annotation.PostConstruct; import java.time.Duration; import java.time.LocalDateTime; import java.util.UUID; @@ -20,27 +19,14 @@ import java.util.UUID; @Component public class ReactiveWebSocketHandler implements WebSocketHandler { - private Flux eventFlux; - private Flux intervalFlux; + private Flux eventFlux = Flux.generate(e -> { + Event event = new Event(UUID.randomUUID().toString(), LocalDateTime.now().toString()); + e.next(event); + }); - /** - * Here we prepare a Flux that will emit a message every second - */ - @PostConstruct - private void init() throws InterruptedException { + private Flux intervalFlux = Flux.interval(Duration.ofMillis(1000L)).zipWith(eventFlux, (time, event) -> event); - eventFlux = Flux.generate(e -> { - Event event = new Event(UUID.randomUUID() - .toString(), - LocalDateTime.now() - .toString()); - e.next(event); - }); - - intervalFlux = Flux.interval(Duration.ofMillis(1000L)) - .zipWith(eventFlux, (time, event) -> event); - - } + private ObjectMapper json = new ObjectMapper(); /** * On each new client session, send the message flux to the client. @@ -50,7 +36,7 @@ public class ReactiveWebSocketHandler implements WebSocketHandler { */ @Override public Mono handle(WebSocketSession webSocketSession) { - ObjectMapper json = new ObjectMapper(); + return webSocketSession.send(intervalFlux.map(event -> { try { String jsonEvent = json.writeValueAsString(event); @@ -60,12 +46,9 @@ public class ReactiveWebSocketHandler implements WebSocketHandler { e.printStackTrace(); return ""; } - }) - .map(webSocketSession::textMessage)) + }).map(webSocketSession::textMessage)) - .and(webSocketSession.receive() - .map(WebSocketMessage::getPayloadAsText) - .log()); + .and(webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).log()); } } From c38de161c68e0b327b5f27e26ed01d328882a8f3 Mon Sep 17 00:00:00 2001 From: felipeazv Date: Mon, 22 Jan 2018 22:28:02 +0100 Subject: [PATCH 2/2] removing verbose comment --- .../reactive/websocket/ReactiveWebSocketHandler.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java index eea0fb9962..669c212fd3 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket/ReactiveWebSocketHandler.java @@ -28,12 +28,6 @@ public class ReactiveWebSocketHandler implements WebSocketHandler { private ObjectMapper json = new ObjectMapper(); - /** - * On each new client session, send the message flux to the client. - * Spring subscribes to the flux and send every new flux event to the WebSocketSession object - * @param session - * @return Mono - */ @Override public Mono handle(WebSocketSession webSocketSession) {