diff --git a/pom.xml b/pom.xml index 78ce1821fa..eb60cb2e45 100644 --- a/pom.xml +++ b/pom.xml @@ -484,6 +484,7 @@ reactor-core resteasy + rsocket rxjava rxjava-2 rabbitmq @@ -1039,6 +1040,7 @@ reactor-core resteasy + rsocket rxjava rxjava-2 rabbitmq diff --git a/rsocket/pom.xml b/rsocket/pom.xml new file mode 100644 index 0000000000..8b04a31583 --- /dev/null +++ b/rsocket/pom.xml @@ -0,0 +1,54 @@ + + + 4.0.0 + rsocket + 0.0.1-SNAPSHOT + rsocket + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + jar + + + + io.rsocket + rsocket-core + 0.11.13 + + + io.rsocket + rsocket-transport-netty + 0.11.13 + + + junit + junit + 4.12 + test + + + org.hamcrest + hamcrest-core + 1.3 + test + + + ch.qos.logback + logback-classic + 1.2.3 + + + ch.qos.logback + logback-core + 1.2.3 + + + org.slf4j + slf4j-api + 1.7.25 + + + \ No newline at end of file diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java new file mode 100644 index 0000000000..f35d337427 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/ChannelClient.java @@ -0,0 +1,33 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import com.baeldung.rsocket.support.GameController; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import reactor.core.publisher.Flux; + +public class ChannelClient { + + private final RSocket socket; + private final GameController gameController; + + public ChannelClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + + this.gameController = new GameController("Client Player"); + } + + public void playGame() { + socket.requestChannel(Flux.from(gameController)) + .doOnNext(gameController::processPayload) + .blockLast(); + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java new file mode 100644 index 0000000000..496b3fc5c9 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/FireNForgetClient.java @@ -0,0 +1,85 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +public class FireNForgetClient { + + private static final Logger LOG = LoggerFactory.getLogger(FireNForgetClient.class); + + private final RSocket socket; + private final List data; + + public FireNForgetClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + this.data = Collections.unmodifiableList(generateData()); + } + + /** + * Send binary velocity (float) every 50ms + */ + public void sendData() { + Flux.interval(Duration.ofMillis(50)) + .take(data.size()) + .map(this::createFloatPayload) + .flatMap(socket::fireAndForget) + .blockLast(); + } + + /** + * Create a binary payload containing a single float value + * + * @param index Index into the data list + * @return Payload ready to send to the server + */ + private Payload createFloatPayload(Long index) { + float velocity = data.get(index.intValue()); + ByteBuffer buffer = ByteBuffer.allocate(4).putFloat(velocity); + buffer.rewind(); + return DefaultPayload.create(buffer); + } + + /** + * Generate sample data + * + * @return List of random floats + */ + private List generateData() { + List dataList = new ArrayList<>(WIND_DATA_LENGTH); + float velocity = 0; + for (int i = 0; i < WIND_DATA_LENGTH; i++) { + velocity += Math.random(); + dataList.add(velocity); + } + return dataList; + } + + /** + * Get the data used for this client. + * + * @return list of data values + */ + public List getData() { + return data; + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java new file mode 100644 index 0000000000..fff196a580 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqResClient.java @@ -0,0 +1,32 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; + +public class ReqResClient { + + private final RSocket socket; + + public ReqResClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + } + + public String callBlocking(String string) { + return socket + .requestResponse(DefaultPayload.create(string)) + .map(Payload::getDataUtf8) + .onErrorReturn(ERROR_MSG) + .block(); + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java new file mode 100644 index 0000000000..e97192bdf0 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/ReqStreamClient.java @@ -0,0 +1,33 @@ +package com.baeldung.rsocket; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Flux; + +public class ReqStreamClient { + + private final RSocket socket; + + public ReqStreamClient() { + this.socket = RSocketFactory.connect() + .transport(TcpClientTransport.create("localhost", TCP_PORT)) + .start() + .block(); + } + + public Flux getDataStream() { + return socket + .requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) + .map(Payload::getData) + .map(buf -> buf.getFloat()) + .onErrorReturn(null); + } + + public void dispose() { + this.socket.dispose(); + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/Server.java b/rsocket/src/main/java/com/baeldung/rsocket/Server.java new file mode 100644 index 0000000000..b5718ab36d --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/Server.java @@ -0,0 +1,107 @@ +package com.baeldung.rsocket; + +import com.baeldung.rsocket.support.WindDataPublisher; +import static com.baeldung.rsocket.support.Constants.*; +import com.baeldung.rsocket.support.GameController; +import io.rsocket.AbstractRSocket; +import io.rsocket.Payload; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.server.TcpServerTransport; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class Server { + + private static final Logger LOG = LoggerFactory.getLogger(Server.class); + + private final Disposable server; + private final WindDataPublisher windDataPublisher = new WindDataPublisher(); + private final GameController gameController; + + public Server() { + this.server = RSocketFactory.receive() + .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) + .transport(TcpServerTransport.create("localhost", TCP_PORT)) + .start() + .doOnNext(x -> LOG.info("Server started")) + .subscribe(); + + this.gameController = new GameController("Server Player"); + } + + public void dispose() { + windDataPublisher.complete(); + this.server.dispose(); + } + + /** + * RSocket implementation + */ + private class RSocketImpl extends AbstractRSocket { + + /** + * Handle Request/Response messages + * + * @param payload Message payload + * @return payload response + */ + @Override + public Mono requestResponse(Payload payload) { + try { + return Mono.just(payload); // reflect the payload back to the sender + } catch (Exception x) { + return Mono.error(x); + } + } + + /** + * Handle Fire-and-Forget messages + * + * @param payload Message payload + * @return nothing + */ + @Override + public Mono fireAndForget(Payload payload) { + try { + windDataPublisher.publish(payload); // forward the payload + return Mono.empty(); + } catch (Exception x) { + return Mono.error(x); + } + } + + /** + * Handle Request/Stream messages. Each request returns a new stream. + * + * @param payload Payload that can be used to determine which stream to return + * @return Flux stream containing simulated wind speed data + */ + @Override + public Flux requestStream(Payload payload) { + String streamName = payload.getDataUtf8(); + if (WIND_DATA_STREAM_NAME.equals(streamName)) { + return Flux.from(windDataPublisher); + } + return Flux.error(new IllegalArgumentException(streamName)); + } + + /** + * Handle request for bidirectional channel + * + * @param payloads Stream of payloads delivered from the client + * @return + */ + @Override + public Flux requestChannel(Publisher payloads) { + Flux.from(payloads) + .subscribe(gameController::processPayload); + Flux channel = Flux.from(gameController); + return channel; + } + } + +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java b/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java new file mode 100644 index 0000000000..01bb374b4e --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/Constants.java @@ -0,0 +1,10 @@ +package com.baeldung.rsocket.support; + +public interface Constants { + + int TCP_PORT = 7101; + String ERROR_MSG = "error"; + int WIND_DATA_LENGTH = 30; + String WIND_DATA_STREAM_NAME = "wind-data"; + int SHOT_COUNT = 10; +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java new file mode 100644 index 0000000000..bc1bc0f861 --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/GameController.java @@ -0,0 +1,84 @@ +package com.baeldung.rsocket.support; + +import static com.baeldung.rsocket.support.Constants.*; +import io.rsocket.Payload; +import io.rsocket.util.DefaultPayload; +import java.util.List; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +public class GameController implements Publisher { + + private static final Logger LOG = LoggerFactory.getLogger(GameController.class); + + private final String playerName; + private final List shots; + private Subscriber subscriber; + private boolean truce = false; + + public GameController(String playerName) { + this.playerName = playerName; + this.shots = generateShotList(); + } + + /** + * Create a random list of time intervals, 0-1000ms + * + * @return List of time intervals + */ + private List generateShotList() { + return Flux.range(1, SHOT_COUNT) + .map(x -> (long) Math.ceil(Math.random() * 1000)) + .collectList() + .block(); + } + + @Override + public void subscribe(Subscriber subscriber) { + this.subscriber = subscriber; + fireAtWill(); + } + + /** + * Publish game events asynchronously + */ + private void fireAtWill() { + new Thread(() -> { + for (Long shotDelay : shots) { + try { Thread.sleep(shotDelay); } catch (Exception x) {} + if (truce) { + break; + } + LOG.info("{}: bang!", playerName); + subscriber.onNext(DefaultPayload.create("bang!")); + } + if (!truce) { + LOG.info("{}: I give up!", playerName); + subscriber.onNext(DefaultPayload.create("I give up")); + } + subscriber.onComplete(); + }).start(); + } + + /** + * Process events from the opponent + * + * @param payload Payload received from the rSocket + */ + public void processPayload(Payload payload) { + String message = payload.getDataUtf8(); + switch (message) { + case "bang!": + String result = Math.random() < 0.5 ? "Haha missed!" : "Ow!"; + LOG.info("{}: {}", playerName, result); + break; + case "I give up": + truce = true; + LOG.info("{}: OK, truce", playerName); + break; + } + } +} diff --git a/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java b/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java new file mode 100644 index 0000000000..2ad5b5144b --- /dev/null +++ b/rsocket/src/main/java/com/baeldung/rsocket/support/WindDataPublisher.java @@ -0,0 +1,31 @@ +package com.baeldung.rsocket.support; + +import io.rsocket.Payload; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +/** + * Simple PUblisher to provide async data to Flux stream + */ +public class WindDataPublisher implements Publisher { + + private Subscriber subscriber; + + @Override + public void subscribe(Subscriber subscriber) { + this.subscriber = subscriber; + } + + public void publish(Payload payload) { + if (subscriber != null) { + subscriber.onNext(payload); + } + } + + public void complete() { + if (subscriber != null) { + subscriber.onComplete(); + } + } + +} diff --git a/rsocket/src/main/resources/logback.xml b/rsocket/src/main/resources/logback.xml new file mode 100644 index 0000000000..7d900d8ea8 --- /dev/null +++ b/rsocket/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + \ No newline at end of file diff --git a/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java new file mode 100644 index 0000000000..afa3960eac --- /dev/null +++ b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java @@ -0,0 +1,84 @@ +package com.baeldung.rsocket; + +import java.util.ArrayList; +import java.util.List; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; + +public class RSocketIntegrationTest { + + private static final Logger LOG = LoggerFactory.getLogger(RSocketIntegrationTest.class); + + private static Server server; + + public RSocketIntegrationTest() { + } + + @BeforeClass + public static void setUpClass() { + server = new Server(); + } + + @AfterClass + public static void tearDownClass() { + server.dispose(); + } + + @Test + public void whenSendingAString_thenRevceiveTheSameString() { + ReqResClient client = new ReqResClient(); + String string = "Hello RSocket"; + + assertEquals(string, client.callBlocking(string)); + + client.dispose(); + } + + @Test + public void whenSendingStream_thenReceiveTheSameStream() { + // create the client that pushes data to the server and start sending + FireNForgetClient fnfClient = new FireNForgetClient(); + // create a client to read a stream from the server and subscribe to events + ReqStreamClient streamClient = new ReqStreamClient(); + + // get the data that is used by the client + List data = fnfClient.getData(); + // create a place to count the results + List dataReceived = new ArrayList<>(); + + // assert that the data received is the same as the data sent + Disposable subscription = streamClient.getDataStream() + .index() + .subscribe( + tuple -> { + assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2()); + dataReceived.add(tuple.getT2()); + }, + err -> LOG.error(err.getMessage()) + ); + + // start sending the data + fnfClient.sendData(); + + // wait a short time for the data to complete then dispose everything + try { Thread.sleep(500); } catch (Exception x) {} + subscription.dispose(); + fnfClient.dispose(); + + // verify the item count + assertEquals("Wrong data count received", data.size(), dataReceived.size()); + } + + @Test + public void whenRunningChannelGame_thenLogTheResults() { + ChannelClient client = new ChannelClient(); + client.playGame(); + client.dispose(); + } + +}