diff --git a/spring-reactive-websocket/pom.xml b/spring-reactive-websocket/pom.xml new file mode 100644 index 0000000000..846cece177 --- /dev/null +++ b/spring-reactive-websocket/pom.xml @@ -0,0 +1,92 @@ + + + 4.0.0 + + spring-reactive-websocket + 0.0.1-SNAPSHOT + jar + + spring-reactive-websocket + Reactive WebSockets with Spring 5 + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + UTF-8 + UTF-8 + 1.8 + + + + + org.springframework.boot + spring-boot-starter-integration + 2.0.0.M7 + + + org.springframework.boot + spring-boot-starter-webflux + 2.0.0.M7 + + + org.projectlombok + lombok + compile + RELEASE + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + false + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + false + + + + + + diff --git a/spring-reactive-websocket/src/main/java/com/baeldung/Event.java b/spring-reactive-websocket/src/main/java/com/baeldung/Event.java new file mode 100644 index 0000000000..20d678c214 --- /dev/null +++ b/spring-reactive-websocket/src/main/java/com/baeldung/Event.java @@ -0,0 +1,11 @@ +package com.baeldung; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class Event { + private String eventId; + private String eventDt; +} diff --git a/spring-reactive-websocket/src/main/java/com/baeldung/ReactiveWebSocketApplication.java b/spring-reactive-websocket/src/main/java/com/baeldung/ReactiveWebSocketApplication.java new file mode 100644 index 0000000000..f8952d750d --- /dev/null +++ b/spring-reactive-websocket/src/main/java/com/baeldung/ReactiveWebSocketApplication.java @@ -0,0 +1,38 @@ +package com.baeldung; + +import java.net.URI; +import java.time.Duration; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; +import org.springframework.web.reactive.socket.client.WebSocketClient; + +import reactor.core.publisher.Mono; + +@SpringBootApplication +public class ReactiveWebSocketApplication { + public static void main(String[] args) { + SpringApplication.run(ReactiveWebSocketApplication.class, args); + } + + /** + * Spring Reactive WebSocket Client + * **/ + @Bean + CommandLineRunner runner() { + return run -> { + WebSocketClient client = new ReactorNettyWebSocketClient(); + client.execute(URI.create("ws://localhost:8080/event-emitter"), session -> session.send(Mono.just(session.textMessage("event-me-from-spring-reactive-client"))) + .thenMany(session.receive() + .map(WebSocketMessage::getPayloadAsText) + .log()) + .then()) + .block(); +// .block(Duration.ofSeconds(10L));//force timeout after given duration + }; + } +} diff --git a/spring-reactive-websocket/src/main/java/com/baeldung/ReactiveWebSocketConfiguration.java b/spring-reactive-websocket/src/main/java/com/baeldung/ReactiveWebSocketConfiguration.java new file mode 100644 index 0000000000..6729e09273 --- /dev/null +++ b/spring-reactive-websocket/src/main/java/com/baeldung/ReactiveWebSocketConfiguration.java @@ -0,0 +1,34 @@ +package com.baeldung; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class ReactiveWebSocketConfiguration { + + @Autowired + private WebSocketHandler webSocketHandler; + + @Bean + public HandlerMapping webSocketHandlerMapping() { + Map map = new HashMap<>(); + map.put("/event-emitter", webSocketHandler); + + SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping(); + handlerMapping.setOrder(1); + handlerMapping.setUrlMap(map); + return handlerMapping; + } + + @Bean + public WebSocketHandlerAdapter handlerAdapter() { + return new WebSocketHandlerAdapter(); + } +} \ No newline at end of file diff --git a/spring-reactive-websocket/src/main/java/com/baeldung/ReactiveWebSocketHandler.java b/spring-reactive-websocket/src/main/java/com/baeldung/ReactiveWebSocketHandler.java new file mode 100644 index 0000000000..4a548322b3 --- /dev/null +++ b/spring-reactive-websocket/src/main/java/com/baeldung/ReactiveWebSocketHandler.java @@ -0,0 +1,71 @@ +package com.baeldung; + +import org.springframework.web.reactive.socket.WebSocketSession; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketMessage; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.UUID; + +@Component +public class ReactiveWebSocketHandler implements WebSocketHandler { + + private Flux eventFlux; + private Flux intervalFlux; + + /** + * Here we prepare a Flux that will emit a message every second + */ + @PostConstruct + private void init() throws InterruptedException { + + eventFlux = Flux.generate(e -> { + Event event = new Event(UUID.randomUUID() + .toString(), + LocalDateTime.now() + .toString()); + e.next(event); + }); + + intervalFlux = Flux.interval(Duration.ofMillis(1000L)) + .zipWith(eventFlux, (time, event) -> event); + + } + + /** + * On each new client session, send the message flux to the client. + * Spring subscribes to the flux and send every new flux event to the WebSocketSession object + * @param session + * @return Mono + */ + @Override + public Mono handle(WebSocketSession webSocketSession) { + ObjectMapper json = new ObjectMapper(); + return webSocketSession.send(intervalFlux.map(event -> { + try { + String jsonEvent = json.writeValueAsString(event); + System.out.println(jsonEvent); + return jsonEvent; + } catch (JsonProcessingException e) { + e.printStackTrace(); + return ""; + } + }) + .map(webSocketSession::textMessage)) + + .and(webSocketSession.receive() + .map(WebSocketMessage::getPayloadAsText) + .log()); + } + +} diff --git a/spring-reactive-websocket/src/main/resources/static/client-websocket.html b/spring-reactive-websocket/src/main/resources/static/client-websocket.html new file mode 100644 index 0000000000..3f840e8bd4 --- /dev/null +++ b/spring-reactive-websocket/src/main/resources/static/client-websocket.html @@ -0,0 +1,34 @@ + + + + +Baeldung: Spring 5 Reactive Client WebSocket (Browser) + + + +
+ + + \ No newline at end of file