Merge pull request #3488 from felipeazv/master

BAEL-1475: code formatting
This commit is contained in:
Carsten Gräf 2018-01-24 19:45:18 +01:00 committed by GitHub
commit 026ff3be45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 38 deletions

View File

@ -2,22 +2,25 @@ package com.baeldung.reactive.websocket;
import java.net.URI; import java.net.URI;
import java.time.Duration; import java.time.Duration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient; import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@SpringBootApplication
public class ReactiveJavaClientWebSocket { public class ReactiveJavaClientWebSocket {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
WebSocketClient client = new ReactorNettyWebSocketClient(); WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(URI.create("ws://localhost:8080/event-emitter"), session -> session.send(Mono.just(session.textMessage("event-me-from-spring-reactive-client"))) client.execute(
URI.create("ws://localhost:8080/event-emitter"),
session -> session.send(
Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
.thenMany(session.receive() .thenMany(session.receive()
.map(WebSocketMessage::getPayloadAsText) .map(WebSocketMessage::getPayloadAsText)
.log()) .log())
.then()) .then())
.block(Duration.ofSeconds(10L)); .block(Duration.ofSeconds(10L));
} }
} }

View File

@ -12,7 +12,6 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import java.time.Duration; import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.UUID; import java.util.UUID;
@ -20,37 +19,18 @@ import java.util.UUID;
@Component @Component
public class ReactiveWebSocketHandler implements WebSocketHandler { public class ReactiveWebSocketHandler implements WebSocketHandler {
private Flux<Event> eventFlux; private Flux<Event> eventFlux = Flux.generate(e -> {
private Flux<Event> intervalFlux; Event event = new Event(UUID.randomUUID().toString(), LocalDateTime.now().toString());
e.next(event);
});
/** private Flux<Event> intervalFlux = Flux.interval(Duration.ofMillis(1000L)).zipWith(eventFlux, (time, event) -> event);
* Here we prepare a Flux that will emit a message every second
*/
@PostConstruct
private void init() throws InterruptedException {
eventFlux = Flux.generate(e -> { private ObjectMapper json = new ObjectMapper();
Event event = new Event(UUID.randomUUID()
.toString(),
LocalDateTime.now()
.toString());
e.next(event);
});
intervalFlux = Flux.interval(Duration.ofMillis(1000L))
.zipWith(eventFlux, (time, event) -> event);
}
/**
* On each new client session, send the message flux to the client.
* Spring subscribes to the flux and send every new flux event to the WebSocketSession object
* @param session
* @return Mono<Void>
*/
@Override @Override
public Mono<Void> handle(WebSocketSession webSocketSession) { public Mono<Void> handle(WebSocketSession webSocketSession) {
ObjectMapper json = new ObjectMapper();
return webSocketSession.send(intervalFlux.map(event -> { return webSocketSession.send(intervalFlux.map(event -> {
try { try {
String jsonEvent = json.writeValueAsString(event); String jsonEvent = json.writeValueAsString(event);
@ -60,12 +40,9 @@ public class ReactiveWebSocketHandler implements WebSocketHandler {
e.printStackTrace(); e.printStackTrace();
return ""; return "";
} }
}) }).map(webSocketSession::textMessage))
.map(webSocketSession::textMessage))
.and(webSocketSession.receive() .and(webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).log());
.map(WebSocketMessage::getPayloadAsText)
.log());
} }
} }