diff --git a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java index 496b3fc5c9..a67078db06 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java @@ -61,9 +61,9 @@ public class FireNForgetClient { * @return List of random floats */ private List generateData() { - List dataList = new ArrayList<>(WIND_DATA_LENGTH); + List dataList = new ArrayList<>(DATA_LENGTH); float velocity = 0; - for (int i = 0; i < WIND_DATA_LENGTH; i++) { + for (int i = 0; i < DATA_LENGTH; i++) { velocity += Math.random(); dataList.add(velocity); } diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java index e97192bdf0..085f9874fa 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java @@ -21,7 +21,7 @@ public class ReqStreamClient { public Flux getDataStream() { return socket - .requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) + .requestStream(DefaultPayload.create(DATA_STREAM_NAME)) .map(Payload::getData) .map(buf -> buf.getFloat()) .onErrorReturn(null); diff --git a/rsocket/src/main/java/com/baeldung/rsocket/Server.java b/rsocket/src/main/java/com/baeldung/rsocket/Server.java index b5718ab36d..42243da39f 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/Server.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/Server.java @@ -1,6 +1,6 @@ package com.baeldung.rsocket; -import com.baeldung.rsocket.support.WindDataPublisher; +import com.baeldung.rsocket.support.DataPublisher; import static com.baeldung.rsocket.support.Constants.*; import com.baeldung.rsocket.support.GameController; import io.rsocket.AbstractRSocket; @@ -19,7 +19,7 @@ public class Server { private static final Logger LOG = LoggerFactory.getLogger(Server.class); private final Disposable server; - private final WindDataPublisher windDataPublisher = new WindDataPublisher(); + private final DataPublisher dataPublisher = new DataPublisher(); private final GameController gameController; public Server() { @@ -34,7 +34,7 @@ public class Server { } public void dispose() { - windDataPublisher.complete(); + dataPublisher.complete(); this.server.dispose(); } @@ -67,7 +67,7 @@ public class Server { @Override public Mono fireAndForget(Payload payload) { try { - windDataPublisher.publish(payload); // forward the payload + dataPublisher.publish(payload); // forward the payload return Mono.empty(); } catch (Exception x) { return Mono.error(x); @@ -78,13 +78,13 @@ public class Server { * Handle Request/Stream messages. Each request returns a new stream. * * @param payload Payload that can be used to determine which stream to return - * @return Flux stream containing simulated wind speed data + * @return Flux stream containing simulated measurement data */ @Override public Flux requestStream(Payload payload) { String streamName = payload.getDataUtf8(); - if (WIND_DATA_STREAM_NAME.equals(streamName)) { - return Flux.from(windDataPublisher); + if (DATA_STREAM_NAME.equals(streamName)) { + return Flux.from(dataPublisher); } return Flux.error(new IllegalArgumentException(streamName)); } diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java b/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java index 01bb374b4e..4ffc4f6483 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java @@ -4,7 +4,7 @@ public interface Constants { int TCP_PORT = 7101; String ERROR_MSG = "error"; - int WIND_DATA_LENGTH = 30; - String WIND_DATA_STREAM_NAME = "wind-data"; + int DATA_LENGTH = 30; + String DATA_STREAM_NAME = "data"; int SHOT_COUNT = 10; } diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java b/rsocket/src/main/java/com/baeldung/rsocket/support/DataPublisher.java similarity index 91% rename from rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java rename to rsocket/src/main/java/com/baeldung/rsocket/support/DataPublisher.java index 2ad5b5144b..3e74da8317 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/DataPublisher.java @@ -7,7 +7,7 @@ import org.reactivestreams.Subscriber; /** * Simple PUblisher to provide async data to Flux stream */ -public class WindDataPublisher implements Publisher { +public class DataPublisher implements Publisher { private Subscriber subscriber;