diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java index 9f26384c95..f35d337427 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java @@ -14,17 +14,17 @@ public class ChannelClient { public ChannelClient() { this.socket = RSocketFactory.connect() - .transport(TcpClientTransport.create("localhost", TCP_PORT)) - .start() - .block(); + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); this.gameController = new GameController("Client Player"); } public void playGame() { socket.requestChannel(Flux.from(gameController)) - .doOnNext(gameController::processPayload) - .blockLast(); + .doOnNext(gameController::processPayload) + .blockLast(); } public void dispose() { diff --git a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java index 61d6173b23..6c7362a008 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java @@ -25,9 +25,9 @@ public class FireNForgetClient { public FireNForgetClient() { this.socket = RSocketFactory.connect() - .transport(TcpClientTransport.create("localhost", TCP_PORT)) - .start() - .block(); + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); this.data = Collections.unmodifiableList(generateData()); } @@ -36,11 +36,11 @@ public class FireNForgetClient { */ public void sendData() { Flux.interval(Duration.ofMillis(50)) - .take(data.size()) - .map(this::createFloatPayload) - .map(socket::fireAndForget) - .flatMap(Function.identity()) - .blockLast(); + .take(data.size()) + .map(this::createFloatPayload) + .map(socket::fireAndForget) + .flatMap(Function.identity()) + .blockLast(); } /** diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java index 8865acd995..fff196a580 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java @@ -13,17 +13,17 @@ public class ReqResClient { public ReqResClient() { this.socket = RSocketFactory.connect() - .transport(TcpClientTransport.create("localhost", TCP_PORT)) - .start() - .block(); + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); } public String callBlocking(String string) { return socket - .requestResponse(DefaultPayload.create(string)) - .map(Payload::getDataUtf8) - .onErrorReturn(ERROR_MSG) - .block(); + .requestResponse(DefaultPayload.create(string)) + .map(Payload::getDataUtf8) + .onErrorReturn(ERROR_MSG) + .block(); } public void dispose() { diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java index eaab777c15..e97192bdf0 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java @@ -14,17 +14,17 @@ public class ReqStreamClient { public ReqStreamClient() { this.socket = RSocketFactory.connect() - .transport(TcpClientTransport.create("localhost", TCP_PORT)) - .start() - .block(); + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); } public Flux getDataStream() { return socket - .requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) - .map(Payload::getData) - .map(buf -> buf.getFloat()) - .onErrorReturn(null); + .requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) + .map(Payload::getData) + .map(buf -> buf.getFloat()) + .onErrorReturn(null); } public void dispose() { diff --git a/rsocket/src/main/java/com/baeldung/rsocket/Server.java b/rsocket/src/main/java/com/baeldung/rsocket/Server.java index 9cec3c69c8..b5718ab36d 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/Server.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/Server.java @@ -24,11 +24,11 @@ public class Server { public Server() { this.server = RSocketFactory.receive() - .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) - .transport(TcpServerTransport.create("localhost", TCP_PORT)) - .start() - .doOnNext(x -> LOG.info("Server started")) - .subscribe(); + .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) + .transport(TcpServerTransport.create("localhost", TCP_PORT)) + .start() + .doOnNext(x -> LOG.info("Server started")) + .subscribe(); this.gameController = new GameController("Server Player"); } @@ -98,7 +98,7 @@ public class Server { @Override public Flux requestChannel(Publisher payloads) { Flux.from(payloads) - .subscribe(gameController::processPayload); + .subscribe(gameController::processPayload); Flux channel = Flux.from(gameController); return channel; } diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java index 35b6fe4b95..bc1bc0f861 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java @@ -31,9 +31,9 @@ public class GameController implements Publisher { */ private List generateShotList() { return Flux.range(1, SHOT_COUNT) - .map(x -> (long) Math.ceil(Math.random() * 1000)) - .collectList() - .block(); + .map(x -> (long) Math.ceil(Math.random() * 1000)) + .collectList() + .block(); } @Override