diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java new file mode 100644 index 0000000000..5deff35fa7 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java @@ -0,0 +1,38 @@ +package com.baeldung.reactive.webflux.eventstreaming; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; + +@SpringBootApplication +public class EventStreamingApplication { + + public static void main(String[] args) { + SpringApplication.run(EventStreamingApplication.class, args); + } + + + @Bean + public HandlerMapping webSocketMapping() { + Map map = new HashMap<>(); + map.put("/events", new EventWebSocketHandler()); + + SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); + mapping.setUrlMap(map); + mapping.setOrder(1); + return mapping; + } + + @Bean + public WebSocketHandlerAdapter handlerAdapter() { + return new WebSocketHandlerAdapter(); + } + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java new file mode 100644 index 0000000000..3fa4027449 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java @@ -0,0 +1,24 @@ +package com.baeldung.reactive.webflux.eventstreaming; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; + +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; +import org.springframework.web.reactive.socket.client.WebSocketClient; + +public class EventWebSocketClient { + + public static void main(String[] args) throws URISyntaxException { + + WebSocketClient client = new ReactorNettyWebSocketClient(); + + client.execute( + URI.create("ws://localhost:8080/events"), + session -> session.receive() + .map(WebSocketMessage::getPayloadAsText) + .log().then()) + .block(Duration.ofMinutes(10L)); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java new file mode 100644 index 0000000000..37deaf3d1b --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java @@ -0,0 +1,27 @@ +package com.baeldung.reactive.webflux.eventstreaming; + +import java.text.MessageFormat; +import java.time.Duration; +import java.time.LocalTime; +import java.util.UUID; + +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketSession; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class EventWebSocketHandler implements WebSocketHandler { + + MessageFormat mf = new MessageFormat("EventID: {0} , Event Time: {1}") ; + + @Override + public Mono handle(WebSocketSession session) { + return session.send( + Flux. generate(sink -> sink.next( + mf.format(new Object[] {UUID.randomUUID(),LocalTime.now()}))) + .delayElements(Duration.ofSeconds(1)) + .map(session::textMessage)); + } + +}