Real Time Event Streaming with Spring Webflux

This commit is contained in:
Chirag Dewan 2018-07-22 17:56:20 +05:30
parent 3205de68c1
commit 945932fa1d
3 changed files with 89 additions and 0 deletions

View File

@ -0,0 +1,38 @@
package com.baeldung.reactive.webflux.eventstreaming;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
@SpringBootApplication
public class EventStreamingApplication {
public static void main(String[] args) {
SpringApplication.run(EventStreamingApplication.class, args);
}
@Bean
public HandlerMapping webSocketMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/events", new EventWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.reactive.webflux.eventstreaming;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
public class EventWebSocketClient {
public static void main(String[] args) throws URISyntaxException {
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create("ws://localhost:8080/events"),
session -> session.receive()
.map(WebSocketMessage::getPayloadAsText)
.log().then())
.block(Duration.ofMinutes(10L));
}
}

View File

@ -0,0 +1,27 @@
package com.baeldung.reactive.webflux.eventstreaming;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.LocalTime;
import java.util.UUID;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class EventWebSocketHandler implements WebSocketHandler {
MessageFormat mf = new MessageFormat("EventID: {0} , Event Time: {1}") ;
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(
Flux.<String> generate(sink -> sink.next(
mf.format(new Object[] {UUID.randomUUID(),LocalTime.now()})))
.delayElements(Duration.ofSeconds(1))
.map(session::textMessage));
}
}