From dec5eabe81a7063ac5cdc5deeb64e46eb8673d0f Mon Sep 17 00:00:00 2001 From: clininger Date: Thu, 6 Dec 2018 23:09:43 +0700 Subject: [PATCH 1/6] BAELDUNG-2283 --- rsocket/pom.xml | 54 +++++++++ .../com/baeldung/rsocket/ChannelClient.java | 33 ++++++ .../baeldung/rsocket/FireNForgetClient.java | 86 ++++++++++++++ .../com/baeldung/rsocket/ReqResClient.java | 32 ++++++ .../com/baeldung/rsocket/ReqStreamClient.java | 33 ++++++ .../java/com/baeldung/rsocket/Server.java | 107 ++++++++++++++++++ .../baeldung/rsocket/support/Constants.java | 10 ++ .../rsocket/support/GameController.java | 84 ++++++++++++++ .../rsocket/support/WindDataPublisher.java | 31 +++++ rsocket/src/main/resources/logback.xml | 13 +++ .../rsocket/RSocketIntegrationTest.java | 71 ++++++++++++ 11 files changed, 554 insertions(+) create mode 100644 rsocket/pom.xml create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/Server.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java create mode 100644 rsocket/src/main/resources/logback.xml create mode 100644 rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java diff --git a/rsocket/pom.xml b/rsocket/pom.xml new file mode 100644 index 0000000000..8b04a31583 --- /dev/null +++ b/rsocket/pom.xml @@ -0,0 +1,54 @@ + + + 4.0.0 + rsocket + 0.0.1-SNAPSHOT + rsocket + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + jar + + + + io.rsocket + rsocket-core + 0.11.13 + + + io.rsocket + rsocket-transport-netty + 0.11.13 + + + junit + junit + 4.12 + test + + + org.hamcrest + hamcrest-core + 1.3 + test + + + ch.qos.logback + logback-classic + 1.2.3 + + + ch.qos.logback + logback-core + 1.2.3 + + + org.slf4j + slf4j-api + 1.7.25 + + + \ No newline at end of file diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java new file mode 100644 index 0000000000..9f26384c95 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java @@ -0,0 +1,33 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import com.baeldung.rsocket.support.GameController; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import reactor.core.publisher.Flux; + +public class ChannelClient { + + private final RSocket socket; + private final GameController gameController; + + public ChannelClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + + this.gameController = new GameController("Client Player"); + } + + public void playGame() { + socket.requestChannel(Flux.from(gameController)) + .doOnNext(gameController::processPayload) + .blockLast(); + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java new file mode 100644 index 0000000000..61d6173b23 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java @@ -0,0 +1,86 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +public class FireNForgetClient { + + private static final Logger LOG = LoggerFactory.getLogger(FireNForgetClient.class); + + private final RSocket socket; + private final List data; + + public FireNForgetClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + this.data = Collections.unmodifiableList(generateData()); + } + + /** + * Send binary velocity (float) every 50ms + */ + public void sendData() { + Flux.interval(Duration.ofMillis(50)) + .take(data.size()) + .map(this::createFloatPayload) + .map(socket::fireAndForget) + .flatMap(Function.identity()) + .blockLast(); + } + + /** + * Create a binary payload containing a single float value + * + * @param index Index into the data list + * @return Payload ready to send to the server + */ + private Payload createFloatPayload(Long index) { + float velocity = data.get(index.intValue()); + ByteBuffer buffer = ByteBuffer.allocate(4).putFloat(velocity); + buffer.rewind(); + return DefaultPayload.create(buffer); + } + + /** + * Generate sample data + * + * @return List of random floats + */ + private List generateData() { + List dataList = new ArrayList<>(WIND_DATA_LENGTH); + float velocity = 0; + for (int i = 0; i < WIND_DATA_LENGTH; i++) { + velocity += Math.random(); + dataList.add(velocity); + } + return dataList; + } + + /** + * Get the data used for this client. + * + * @return list of data values + */ + public List getData() { + return data; + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java new file mode 100644 index 0000000000..8865acd995 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java @@ -0,0 +1,32 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; + +public class ReqResClient { + + private final RSocket socket; + + public ReqResClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + } + + public String callBlocking(String string) { + return socket + .requestResponse(DefaultPayload.create(string)) + .map(Payload::getDataUtf8) + .onErrorReturn(ERROR_MSG) + .block(); + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java new file mode 100644 index 0000000000..eaab777c15 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java @@ -0,0 +1,33 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Flux; + +public class ReqStreamClient { + + private final RSocket socket; + + public ReqStreamClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + } + + public Flux getDataStream() { + return socket + .requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) + .map(Payload::getData) + .map(buf -> buf.getFloat()) + .onErrorReturn(null); + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/Server.java b/rsocket/src/main/java/com/baeldung/rsocket/Server.java new file mode 100644 index 0000000000..9cec3c69c8 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/Server.java @@ -0,0 +1,107 @@ +package com.baeldung.rsocket; + +import com.baeldung.rsocket.support.WindDataPublisher; +import static com.baeldung.rsocket.support.Constants.*; +import com.baeldung.rsocket.support.GameController; +import io.rsocket.AbstractRSocket; +import io.rsocket.Payload; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.server.TcpServerTransport; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class Server { + + private static final Logger LOG = LoggerFactory.getLogger(Server.class); + + private final Disposable server; + private final WindDataPublisher windDataPublisher = new WindDataPublisher(); + private final GameController gameController; + + public Server() { + this.server = RSocketFactory.receive() + .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) + .transport(TcpServerTransport.create("localhost", TCP_PORT)) + .start() + .doOnNext(x -> LOG.info("Server started")) + .subscribe(); + + this.gameController = new GameController("Server Player"); + } + + public void dispose() { + windDataPublisher.complete(); + this.server.dispose(); + } + + /** + * RSocket implementation + */ + private class RSocketImpl extends AbstractRSocket { + + /** + * Handle Request/Response messages + * + * @param payload Message payload + * @return payload response + */ + @Override + public Mono requestResponse(Payload payload) { + try { + return Mono.just(payload); // reflect the payload back to the sender + } catch (Exception x) { + return Mono.error(x); + } + } + + /** + * Handle Fire-and-Forget messages + * + * @param payload Message payload + * @return nothing + */ + @Override + public Mono fireAndForget(Payload payload) { + try { + windDataPublisher.publish(payload); // forward the payload + return Mono.empty(); + } catch (Exception x) { + return Mono.error(x); + } + } + + /** + * Handle Request/Stream messages. Each request returns a new stream. + * + * @param payload Payload that can be used to determine which stream to return + * @return Flux stream containing simulated wind speed data + */ + @Override + public Flux requestStream(Payload payload) { + String streamName = payload.getDataUtf8(); + if (WIND_DATA_STREAM_NAME.equals(streamName)) { + return Flux.from(windDataPublisher); + } + return Flux.error(new IllegalArgumentException(streamName)); + } + + /** + * Handle request for bidirectional channel + * + * @param payloads Stream of payloads delivered from the client + * @return + */ + @Override + public Flux requestChannel(Publisher payloads) { + Flux.from(payloads) + .subscribe(gameController::processPayload); + Flux channel = Flux.from(gameController); + return channel; + } + } + +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java b/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java new file mode 100644 index 0000000000..01bb374b4e --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java @@ -0,0 +1,10 @@ +package com.baeldung.rsocket.support; + +public interface Constants { + + int TCP_PORT = 7101; + String ERROR_MSG = "error"; + int WIND_DATA_LENGTH = 30; + String WIND_DATA_STREAM_NAME = "wind-data"; + int SHOT_COUNT = 10; +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java new file mode 100644 index 0000000000..35b6fe4b95 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java @@ -0,0 +1,84 @@ +package com.baeldung.rsocket.support; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.util.DefaultPayload; +import java.util.List; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +public class GameController implements Publisher { + + private static final Logger LOG = LoggerFactory.getLogger(GameController.class); + + private final String playerName; + private final List shots; + private Subscriber subscriber; + private boolean truce = false; + + public GameController(String playerName) { + this.playerName = playerName; + this.shots = generateShotList(); + } + + /** + * Create a random list of time intervals, 0-1000ms + * + * @return List of time intervals + */ + private List generateShotList() { + return Flux.range(1, SHOT_COUNT) + .map(x -> (long) Math.ceil(Math.random() * 1000)) + .collectList() + .block(); + } + + @Override + public void subscribe(Subscriber subscriber) { + this.subscriber = subscriber; + fireAtWill(); + } + + /** + * Publish game events asynchronously + */ + private void fireAtWill() { + new Thread(() -> { + for (Long shotDelay : shots) { + try { Thread.sleep(shotDelay); } catch (Exception x) {} + if (truce) { + break; + } + LOG.info("{}: bang!", playerName); + subscriber.onNext(DefaultPayload.create("bang!")); + } + if (!truce) { + LOG.info("{}: I give up!", playerName); + subscriber.onNext(DefaultPayload.create("I give up")); + } + subscriber.onComplete(); + }).start(); + } + + /** + * Process events from the opponent + * + * @param payload Payload received from the rSocket + */ + public void processPayload(Payload payload) { + String message = payload.getDataUtf8(); + switch (message) { + case "bang!": + String result = Math.random() < 0.5 ? "Haha missed!" : "Ow!"; + LOG.info("{}: {}", playerName, result); + break; + case "I give up": + truce = true; + LOG.info("{}: OK, truce", playerName); + break; + } + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java b/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java new file mode 100644 index 0000000000..2ad5b5144b --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java @@ -0,0 +1,31 @@ +package com.baeldung.rsocket.support; + +import io.rsocket.Payload; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +/** + * Simple PUblisher to provide async data to Flux stream + */ +public class WindDataPublisher implements Publisher { + + private Subscriber subscriber; + + @Override + public void subscribe(Subscriber subscriber) { + this.subscriber = subscriber; + } + + public void publish(Payload payload) { + if (subscriber != null) { + subscriber.onNext(payload); + } + } + + public void complete() { + if (subscriber != null) { + subscriber.onComplete(); + } + } + +} diff --git a/rsocket/src/main/resources/logback.xml b/rsocket/src/main/resources/logback.xml new file mode 100644 index 0000000000..7d900d8ea8 --- /dev/null +++ b/rsocket/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + \ No newline at end of file diff --git a/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java new file mode 100644 index 0000000000..8f2d4a9ffd --- /dev/null +++ b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java @@ -0,0 +1,71 @@ +package com.baeldung.rsocket; + +import java.util.List; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; + +public class RSocketIntegrationTest { + + private static final Logger LOG = LoggerFactory.getLogger(RSocketIntegrationTest.class); + + private static Server server; + + public RSocketIntegrationTest() { + } + + @BeforeClass + public static void setUpClass() { + server = new Server(); + } + + @AfterClass + public static void tearDownClass() { + server.dispose(); + } + + @Test + public void testRequestResponse() { + ReqResClient client = new ReqResClient(); + String string = "Hello RSocket"; + assertEquals(string, client.callBlocking(string)); + client.dispose(); + } + + @Test + public void testFNFAndRequestStream() { + // create the client that pushes data to the server and start sending + FireNForgetClient fnfClient = new FireNForgetClient(); + // get the data that is used by the client + List data = fnfClient.getData(); + + // create a client to read a stream from the server and subscribe to events + ReqStreamClient streamClient = new ReqStreamClient(); + // assert that the data received is the same as the data sent + Disposable subscription = streamClient.getDataStream() + .index() + .doOnNext(element -> assertEquals(data.get(element.getT1().intValue()), element.getT2())) + .count() + .subscribe(count -> assertEquals(data.size(), count.intValue())); + + // start sending the data + fnfClient.sendData(); + + // wait a short time for the data to complete then dispose everything + try { Thread.sleep(500); } catch (Exception x) {} + subscription.dispose(); + fnfClient.dispose(); + } + + @Test + public void testChannel() { + ChannelClient client = new ChannelClient(); + client.playGame(); + client.dispose(); + } + +} From 291ba89004008aaf83cf198e9a582e8840ca9e97 Mon Sep 17 00:00:00 2001 From: pandachris Date: Thu, 6 Dec 2018 23:14:05 +0700 Subject: [PATCH 2/6] BAELDUNG-2283 (#6) --- rsocket/pom.xml | 54 +++++++++ .../com/baeldung/rsocket/ChannelClient.java | 33 ++++++ .../baeldung/rsocket/FireNForgetClient.java | 86 ++++++++++++++ .../com/baeldung/rsocket/ReqResClient.java | 32 ++++++ .../com/baeldung/rsocket/ReqStreamClient.java | 33 ++++++ .../java/com/baeldung/rsocket/Server.java | 107 ++++++++++++++++++ .../baeldung/rsocket/support/Constants.java | 10 ++ .../rsocket/support/GameController.java | 84 ++++++++++++++ .../rsocket/support/WindDataPublisher.java | 31 +++++ rsocket/src/main/resources/logback.xml | 13 +++ .../rsocket/RSocketIntegrationTest.java | 71 ++++++++++++ 11 files changed, 554 insertions(+) create mode 100644 rsocket/pom.xml create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/Server.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java create mode 100644 rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java create mode 100644 rsocket/src/main/resources/logback.xml create mode 100644 rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java diff --git a/rsocket/pom.xml b/rsocket/pom.xml new file mode 100644 index 0000000000..8b04a31583 --- /dev/null +++ b/rsocket/pom.xml @@ -0,0 +1,54 @@ + + + 4.0.0 + rsocket + 0.0.1-SNAPSHOT + rsocket + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + jar + + + + io.rsocket + rsocket-core + 0.11.13 + + + io.rsocket + rsocket-transport-netty + 0.11.13 + + + junit + junit + 4.12 + test + + + org.hamcrest + hamcrest-core + 1.3 + test + + + ch.qos.logback + logback-classic + 1.2.3 + + + ch.qos.logback + logback-core + 1.2.3 + + + org.slf4j + slf4j-api + 1.7.25 + + + \ No newline at end of file diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java new file mode 100644 index 0000000000..9f26384c95 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java @@ -0,0 +1,33 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import com.baeldung.rsocket.support.GameController; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import reactor.core.publisher.Flux; + +public class ChannelClient { + + private final RSocket socket; + private final GameController gameController; + + public ChannelClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + + this.gameController = new GameController("Client Player"); + } + + public void playGame() { + socket.requestChannel(Flux.from(gameController)) + .doOnNext(gameController::processPayload) + .blockLast(); + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java new file mode 100644 index 0000000000..61d6173b23 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java @@ -0,0 +1,86 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +public class FireNForgetClient { + + private static final Logger LOG = LoggerFactory.getLogger(FireNForgetClient.class); + + private final RSocket socket; + private final List data; + + public FireNForgetClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + this.data = Collections.unmodifiableList(generateData()); + } + + /** + * Send binary velocity (float) every 50ms + */ + public void sendData() { + Flux.interval(Duration.ofMillis(50)) + .take(data.size()) + .map(this::createFloatPayload) + .map(socket::fireAndForget) + .flatMap(Function.identity()) + .blockLast(); + } + + /** + * Create a binary payload containing a single float value + * + * @param index Index into the data list + * @return Payload ready to send to the server + */ + private Payload createFloatPayload(Long index) { + float velocity = data.get(index.intValue()); + ByteBuffer buffer = ByteBuffer.allocate(4).putFloat(velocity); + buffer.rewind(); + return DefaultPayload.create(buffer); + } + + /** + * Generate sample data + * + * @return List of random floats + */ + private List generateData() { + List dataList = new ArrayList<>(WIND_DATA_LENGTH); + float velocity = 0; + for (int i = 0; i < WIND_DATA_LENGTH; i++) { + velocity += Math.random(); + dataList.add(velocity); + } + return dataList; + } + + /** + * Get the data used for this client. + * + * @return list of data values + */ + public List getData() { + return data; + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java new file mode 100644 index 0000000000..8865acd995 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java @@ -0,0 +1,32 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; + +public class ReqResClient { + + private final RSocket socket; + + public ReqResClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + } + + public String callBlocking(String string) { + return socket + .requestResponse(DefaultPayload.create(string)) + .map(Payload::getDataUtf8) + .onErrorReturn(ERROR_MSG) + .block(); + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java new file mode 100644 index 0000000000..eaab777c15 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java @@ -0,0 +1,33 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Flux; + +public class ReqStreamClient { + + private final RSocket socket; + + public ReqStreamClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + } + + public Flux getDataStream() { + return socket + .requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) + .map(Payload::getData) + .map(buf -> buf.getFloat()) + .onErrorReturn(null); + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/Server.java b/rsocket/src/main/java/com/baeldung/rsocket/Server.java new file mode 100644 index 0000000000..9cec3c69c8 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/Server.java @@ -0,0 +1,107 @@ +package com.baeldung.rsocket; + +import com.baeldung.rsocket.support.WindDataPublisher; +import static com.baeldung.rsocket.support.Constants.*; +import com.baeldung.rsocket.support.GameController; +import io.rsocket.AbstractRSocket; +import io.rsocket.Payload; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.server.TcpServerTransport; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class Server { + + private static final Logger LOG = LoggerFactory.getLogger(Server.class); + + private final Disposable server; + private final WindDataPublisher windDataPublisher = new WindDataPublisher(); + private final GameController gameController; + + public Server() { + this.server = RSocketFactory.receive() + .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) + .transport(TcpServerTransport.create("localhost", TCP_PORT)) + .start() + .doOnNext(x -> LOG.info("Server started")) + .subscribe(); + + this.gameController = new GameController("Server Player"); + } + + public void dispose() { + windDataPublisher.complete(); + this.server.dispose(); + } + + /** + * RSocket implementation + */ + private class RSocketImpl extends AbstractRSocket { + + /** + * Handle Request/Response messages + * + * @param payload Message payload + * @return payload response + */ + @Override + public Mono requestResponse(Payload payload) { + try { + return Mono.just(payload); // reflect the payload back to the sender + } catch (Exception x) { + return Mono.error(x); + } + } + + /** + * Handle Fire-and-Forget messages + * + * @param payload Message payload + * @return nothing + */ + @Override + public Mono fireAndForget(Payload payload) { + try { + windDataPublisher.publish(payload); // forward the payload + return Mono.empty(); + } catch (Exception x) { + return Mono.error(x); + } + } + + /** + * Handle Request/Stream messages. Each request returns a new stream. + * + * @param payload Payload that can be used to determine which stream to return + * @return Flux stream containing simulated wind speed data + */ + @Override + public Flux requestStream(Payload payload) { + String streamName = payload.getDataUtf8(); + if (WIND_DATA_STREAM_NAME.equals(streamName)) { + return Flux.from(windDataPublisher); + } + return Flux.error(new IllegalArgumentException(streamName)); + } + + /** + * Handle request for bidirectional channel + * + * @param payloads Stream of payloads delivered from the client + * @return + */ + @Override + public Flux requestChannel(Publisher payloads) { + Flux.from(payloads) + .subscribe(gameController::processPayload); + Flux channel = Flux.from(gameController); + return channel; + } + } + +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java b/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java new file mode 100644 index 0000000000..01bb374b4e --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java @@ -0,0 +1,10 @@ +package com.baeldung.rsocket.support; + +public interface Constants { + + int TCP_PORT = 7101; + String ERROR_MSG = "error"; + int WIND_DATA_LENGTH = 30; + String WIND_DATA_STREAM_NAME = "wind-data"; + int SHOT_COUNT = 10; +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java new file mode 100644 index 0000000000..35b6fe4b95 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java @@ -0,0 +1,84 @@ +package com.baeldung.rsocket.support; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.util.DefaultPayload; +import java.util.List; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +public class GameController implements Publisher { + + private static final Logger LOG = LoggerFactory.getLogger(GameController.class); + + private final String playerName; + private final List shots; + private Subscriber subscriber; + private boolean truce = false; + + public GameController(String playerName) { + this.playerName = playerName; + this.shots = generateShotList(); + } + + /** + * Create a random list of time intervals, 0-1000ms + * + * @return List of time intervals + */ + private List generateShotList() { + return Flux.range(1, SHOT_COUNT) + .map(x -> (long) Math.ceil(Math.random() * 1000)) + .collectList() + .block(); + } + + @Override + public void subscribe(Subscriber subscriber) { + this.subscriber = subscriber; + fireAtWill(); + } + + /** + * Publish game events asynchronously + */ + private void fireAtWill() { + new Thread(() -> { + for (Long shotDelay : shots) { + try { Thread.sleep(shotDelay); } catch (Exception x) {} + if (truce) { + break; + } + LOG.info("{}: bang!", playerName); + subscriber.onNext(DefaultPayload.create("bang!")); + } + if (!truce) { + LOG.info("{}: I give up!", playerName); + subscriber.onNext(DefaultPayload.create("I give up")); + } + subscriber.onComplete(); + }).start(); + } + + /** + * Process events from the opponent + * + * @param payload Payload received from the rSocket + */ + public void processPayload(Payload payload) { + String message = payload.getDataUtf8(); + switch (message) { + case "bang!": + String result = Math.random() < 0.5 ? "Haha missed!" : "Ow!"; + LOG.info("{}: {}", playerName, result); + break; + case "I give up": + truce = true; + LOG.info("{}: OK, truce", playerName); + break; + } + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java b/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java new file mode 100644 index 0000000000..2ad5b5144b --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java @@ -0,0 +1,31 @@ +package com.baeldung.rsocket.support; + +import io.rsocket.Payload; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +/** + * Simple PUblisher to provide async data to Flux stream + */ +public class WindDataPublisher implements Publisher { + + private Subscriber subscriber; + + @Override + public void subscribe(Subscriber subscriber) { + this.subscriber = subscriber; + } + + public void publish(Payload payload) { + if (subscriber != null) { + subscriber.onNext(payload); + } + } + + public void complete() { + if (subscriber != null) { + subscriber.onComplete(); + } + } + +} diff --git a/rsocket/src/main/resources/logback.xml b/rsocket/src/main/resources/logback.xml new file mode 100644 index 0000000000..7d900d8ea8 --- /dev/null +++ b/rsocket/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + \ No newline at end of file diff --git a/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java new file mode 100644 index 0000000000..8f2d4a9ffd --- /dev/null +++ b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java @@ -0,0 +1,71 @@ +package com.baeldung.rsocket; + +import java.util.List; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; + +public class RSocketIntegrationTest { + + private static final Logger LOG = LoggerFactory.getLogger(RSocketIntegrationTest.class); + + private static Server server; + + public RSocketIntegrationTest() { + } + + @BeforeClass + public static void setUpClass() { + server = new Server(); + } + + @AfterClass + public static void tearDownClass() { + server.dispose(); + } + + @Test + public void testRequestResponse() { + ReqResClient client = new ReqResClient(); + String string = "Hello RSocket"; + assertEquals(string, client.callBlocking(string)); + client.dispose(); + } + + @Test + public void testFNFAndRequestStream() { + // create the client that pushes data to the server and start sending + FireNForgetClient fnfClient = new FireNForgetClient(); + // get the data that is used by the client + List data = fnfClient.getData(); + + // create a client to read a stream from the server and subscribe to events + ReqStreamClient streamClient = new ReqStreamClient(); + // assert that the data received is the same as the data sent + Disposable subscription = streamClient.getDataStream() + .index() + .doOnNext(element -> assertEquals(data.get(element.getT1().intValue()), element.getT2())) + .count() + .subscribe(count -> assertEquals(data.size(), count.intValue())); + + // start sending the data + fnfClient.sendData(); + + // wait a short time for the data to complete then dispose everything + try { Thread.sleep(500); } catch (Exception x) {} + subscription.dispose(); + fnfClient.dispose(); + } + + @Test + public void testChannel() { + ChannelClient client = new ChannelClient(); + client.playGame(); + client.dispose(); + } + +} From 31741530ddc52b3df9754ae6a0552c19c10af84c Mon Sep 17 00:00:00 2001 From: clininger Date: Thu, 6 Dec 2018 23:48:18 +0700 Subject: [PATCH 3/6] Indentation changes --- .../java/com/baeldung/rsocket/ChannelClient.java | 10 +++++----- .../com/baeldung/rsocket/FireNForgetClient.java | 16 ++++++++-------- .../java/com/baeldung/rsocket/ReqResClient.java | 14 +++++++------- .../com/baeldung/rsocket/ReqStreamClient.java | 14 +++++++------- .../main/java/com/baeldung/rsocket/Server.java | 12 ++++++------ .../baeldung/rsocket/support/GameController.java | 6 +++--- 6 files changed, 36 insertions(+), 36 deletions(-) diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java index 9f26384c95..f35d337427 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java @@ -14,17 +14,17 @@ public class ChannelClient { public ChannelClient() { this.socket = RSocketFactory.connect() - .transport(TcpClientTransport.create("localhost", TCP_PORT)) - .start() - .block(); + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); this.gameController = new GameController("Client Player"); } public void playGame() { socket.requestChannel(Flux.from(gameController)) - .doOnNext(gameController::processPayload) - .blockLast(); + .doOnNext(gameController::processPayload) + .blockLast(); } public void dispose() { diff --git a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java index 61d6173b23..6c7362a008 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java @@ -25,9 +25,9 @@ public class FireNForgetClient { public FireNForgetClient() { this.socket = RSocketFactory.connect() - .transport(TcpClientTransport.create("localhost", TCP_PORT)) - .start() - .block(); + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); this.data = Collections.unmodifiableList(generateData()); } @@ -36,11 +36,11 @@ public class FireNForgetClient { */ public void sendData() { Flux.interval(Duration.ofMillis(50)) - .take(data.size()) - .map(this::createFloatPayload) - .map(socket::fireAndForget) - .flatMap(Function.identity()) - .blockLast(); + .take(data.size()) + .map(this::createFloatPayload) + .map(socket::fireAndForget) + .flatMap(Function.identity()) + .blockLast(); } /** diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java index 8865acd995..fff196a580 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java @@ -13,17 +13,17 @@ public class ReqResClient { public ReqResClient() { this.socket = RSocketFactory.connect() - .transport(TcpClientTransport.create("localhost", TCP_PORT)) - .start() - .block(); + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); } public String callBlocking(String string) { return socket - .requestResponse(DefaultPayload.create(string)) - .map(Payload::getDataUtf8) - .onErrorReturn(ERROR_MSG) - .block(); + .requestResponse(DefaultPayload.create(string)) + .map(Payload::getDataUtf8) + .onErrorReturn(ERROR_MSG) + .block(); } public void dispose() { diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java index eaab777c15..e97192bdf0 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java @@ -14,17 +14,17 @@ public class ReqStreamClient { public ReqStreamClient() { this.socket = RSocketFactory.connect() - .transport(TcpClientTransport.create("localhost", TCP_PORT)) - .start() - .block(); + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); } public Flux getDataStream() { return socket - .requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) - .map(Payload::getData) - .map(buf -> buf.getFloat()) - .onErrorReturn(null); + .requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) + .map(Payload::getData) + .map(buf -> buf.getFloat()) + .onErrorReturn(null); } public void dispose() { diff --git a/rsocket/src/main/java/com/baeldung/rsocket/Server.java b/rsocket/src/main/java/com/baeldung/rsocket/Server.java index 9cec3c69c8..b5718ab36d 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/Server.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/Server.java @@ -24,11 +24,11 @@ public class Server { public Server() { this.server = RSocketFactory.receive() - .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) - .transport(TcpServerTransport.create("localhost", TCP_PORT)) - .start() - .doOnNext(x -> LOG.info("Server started")) - .subscribe(); + .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) + .transport(TcpServerTransport.create("localhost", TCP_PORT)) + .start() + .doOnNext(x -> LOG.info("Server started")) + .subscribe(); this.gameController = new GameController("Server Player"); } @@ -98,7 +98,7 @@ public class Server { @Override public Flux requestChannel(Publisher payloads) { Flux.from(payloads) - .subscribe(gameController::processPayload); + .subscribe(gameController::processPayload); Flux channel = Flux.from(gameController); return channel; } diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java index 35b6fe4b95..bc1bc0f861 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java @@ -31,9 +31,9 @@ public class GameController implements Publisher { */ private List generateShotList() { return Flux.range(1, SHOT_COUNT) - .map(x -> (long) Math.ceil(Math.random() * 1000)) - .collectList() - .block(); + .map(x -> (long) Math.ceil(Math.random() * 1000)) + .collectList() + .block(); } @Override From 24bc616dd290d5488983b325342541fee060d4ed Mon Sep 17 00:00:00 2001 From: clininger Date: Sun, 9 Dec 2018 22:59:58 +0700 Subject: [PATCH 4/6] test name changes; fixed test assertions; POM modules added --- pom.xml | 2 ++ .../rsocket/RSocketIntegrationTest.java | 31 +++++++++++++------ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index dabb4a9628..62bb0566a2 100644 --- a/pom.xml +++ b/pom.xml @@ -483,6 +483,7 @@ reactor-core resteasy + rsocket rxjava rxjava-2 rabbitmq @@ -1036,6 +1037,7 @@ reactor-core resteasy + rsocket rxjava rxjava-2 rabbitmq diff --git a/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java index 8f2d4a9ffd..36f0b56fb4 100644 --- a/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java +++ b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java @@ -1,5 +1,6 @@ package com.baeldung.rsocket; +import java.util.ArrayList; import java.util.List; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -29,7 +30,7 @@ public class RSocketIntegrationTest { } @Test - public void testRequestResponse() { + public void whenSendingAString_thenRevceiveTheSameString() { ReqResClient client = new ReqResClient(); String string = "Hello RSocket"; assertEquals(string, client.callBlocking(string)); @@ -37,20 +38,27 @@ public class RSocketIntegrationTest { } @Test - public void testFNFAndRequestStream() { + public void whenSendingStream_thenReceiveTheSameStream() { // create the client that pushes data to the server and start sending FireNForgetClient fnfClient = new FireNForgetClient(); - // get the data that is used by the client - List data = fnfClient.getData(); - // create a client to read a stream from the server and subscribe to events ReqStreamClient streamClient = new ReqStreamClient(); + + // get the data that is used by the client + List data = fnfClient.getData(); + // create a place to count the results + List dataReceived = new ArrayList<>(); + // assert that the data received is the same as the data sent Disposable subscription = streamClient.getDataStream() - .index() - .doOnNext(element -> assertEquals(data.get(element.getT1().intValue()), element.getT2())) - .count() - .subscribe(count -> assertEquals(data.size(), count.intValue())); + .index() + .subscribe( + tuple -> { + assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2()); + dataReceived.add(tuple.getT2()); + }, + err -> LOG.error(err.getMessage()) + ); // start sending the data fnfClient.sendData(); @@ -59,10 +67,13 @@ public class RSocketIntegrationTest { try { Thread.sleep(500); } catch (Exception x) {} subscription.dispose(); fnfClient.dispose(); + + // verify the item count + assertEquals("Wrong data count received", data.size(), dataReceived.size()); } @Test - public void testChannel() { + public void whenRunningChannelGame_thenLogTheResults() { ChannelClient client = new ChannelClient(); client.playGame(); client.dispose(); From cbdd31a183531983526c45c716a866f670fe8ff8 Mon Sep 17 00:00:00 2001 From: clininger Date: Sun, 9 Dec 2018 23:08:41 +0700 Subject: [PATCH 5/6] Removed unnecessary line --- .../src/main/java/com/baeldung/rsocket/FireNForgetClient.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java index 6c7362a008..496b3fc5c9 100644 --- a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java +++ b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java @@ -38,8 +38,7 @@ public class FireNForgetClient { Flux.interval(Duration.ofMillis(50)) .take(data.size()) .map(this::createFloatPayload) - .map(socket::fireAndForget) - .flatMap(Function.identity()) + .flatMap(socket::fireAndForget) .blockLast(); } From 2b1d02ce1df25da949386fdf2130f5d93dfaf86d Mon Sep 17 00:00:00 2001 From: clininger Date: Sun, 9 Dec 2018 23:18:30 +0700 Subject: [PATCH 6/6] test spacing --- .../test/java/com/baeldung/rsocket/RSocketIntegrationTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java index 36f0b56fb4..afa3960eac 100644 --- a/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java +++ b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java @@ -33,7 +33,9 @@ public class RSocketIntegrationTest { public void whenSendingAString_thenRevceiveTheSameString() { ReqResClient client = new ReqResClient(); String string = "Hello RSocket"; + assertEquals(string, client.callBlocking(string)); + client.dispose(); }