commit
8e18c03c40
2
pom.xml
2
pom.xml
|
@ -484,6 +484,7 @@
|
|||
|
||||
<module>reactor-core</module>
|
||||
<module>resteasy</module>
|
||||
<module>rsocket</module>
|
||||
<module>rxjava</module>
|
||||
<module>rxjava-2</module>
|
||||
<module>rabbitmq</module>
|
||||
|
@ -1039,6 +1040,7 @@
|
|||
|
||||
<module>reactor-core</module>
|
||||
<module>resteasy</module>
|
||||
<module>rsocket</module>
|
||||
<module>rxjava</module>
|
||||
<module>rxjava-2</module>
|
||||
<module>rabbitmq</module>
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>rsocket</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>rsocket</name>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.rsocket</groupId>
|
||||
<artifactId>rsocket-core</artifactId>
|
||||
<version>0.11.13</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.rsocket</groupId>
|
||||
<artifactId>rsocket-transport-netty</artifactId>
|
||||
<version>0.11.13</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-core</artifactId>
|
||||
<version>1.3</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.2.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-core</artifactId>
|
||||
<version>1.2.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.25</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,33 @@
|
|||
package com.baeldung.rsocket;
|
||||
|
||||
import static com.baeldung.rsocket.support.Constants.*;
|
||||
import com.baeldung.rsocket.support.GameController;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.RSocketFactory;
|
||||
import io.rsocket.transport.netty.client.TcpClientTransport;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class ChannelClient {
|
||||
|
||||
private final RSocket socket;
|
||||
private final GameController gameController;
|
||||
|
||||
public ChannelClient() {
|
||||
this.socket = RSocketFactory.connect()
|
||||
.transport(TcpClientTransport.create("localhost", TCP_PORT))
|
||||
.start()
|
||||
.block();
|
||||
|
||||
this.gameController = new GameController("Client Player");
|
||||
}
|
||||
|
||||
public void playGame() {
|
||||
socket.requestChannel(Flux.from(gameController))
|
||||
.doOnNext(gameController::processPayload)
|
||||
.blockLast();
|
||||
}
|
||||
|
||||
public void dispose() {
|
||||
this.socket.dispose();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
package com.baeldung.rsocket;
|
||||
|
||||
import static com.baeldung.rsocket.support.Constants.*;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.RSocketFactory;
|
||||
import io.rsocket.transport.netty.client.TcpClientTransport;
|
||||
import io.rsocket.util.DefaultPayload;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class FireNForgetClient {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FireNForgetClient.class);
|
||||
|
||||
private final RSocket socket;
|
||||
private final List<Float> data;
|
||||
|
||||
public FireNForgetClient() {
|
||||
this.socket = RSocketFactory.connect()
|
||||
.transport(TcpClientTransport.create("localhost", TCP_PORT))
|
||||
.start()
|
||||
.block();
|
||||
this.data = Collections.unmodifiableList(generateData());
|
||||
}
|
||||
|
||||
/**
|
||||
* Send binary velocity (float) every 50ms
|
||||
*/
|
||||
public void sendData() {
|
||||
Flux.interval(Duration.ofMillis(50))
|
||||
.take(data.size())
|
||||
.map(this::createFloatPayload)
|
||||
.flatMap(socket::fireAndForget)
|
||||
.blockLast();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a binary payload containing a single float value
|
||||
*
|
||||
* @param index Index into the data list
|
||||
* @return Payload ready to send to the server
|
||||
*/
|
||||
private Payload createFloatPayload(Long index) {
|
||||
float velocity = data.get(index.intValue());
|
||||
ByteBuffer buffer = ByteBuffer.allocate(4).putFloat(velocity);
|
||||
buffer.rewind();
|
||||
return DefaultPayload.create(buffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate sample data
|
||||
*
|
||||
* @return List of random floats
|
||||
*/
|
||||
private List<Float> generateData() {
|
||||
List<Float> dataList = new ArrayList<>(WIND_DATA_LENGTH);
|
||||
float velocity = 0;
|
||||
for (int i = 0; i < WIND_DATA_LENGTH; i++) {
|
||||
velocity += Math.random();
|
||||
dataList.add(velocity);
|
||||
}
|
||||
return dataList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the data used for this client.
|
||||
*
|
||||
* @return list of data values
|
||||
*/
|
||||
public List<Float> getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public void dispose() {
|
||||
this.socket.dispose();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package com.baeldung.rsocket;
|
||||
|
||||
import static com.baeldung.rsocket.support.Constants.*;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.RSocketFactory;
|
||||
import io.rsocket.transport.netty.client.TcpClientTransport;
|
||||
import io.rsocket.util.DefaultPayload;
|
||||
|
||||
public class ReqResClient {
|
||||
|
||||
private final RSocket socket;
|
||||
|
||||
public ReqResClient() {
|
||||
this.socket = RSocketFactory.connect()
|
||||
.transport(TcpClientTransport.create("localhost", TCP_PORT))
|
||||
.start()
|
||||
.block();
|
||||
}
|
||||
|
||||
public String callBlocking(String string) {
|
||||
return socket
|
||||
.requestResponse(DefaultPayload.create(string))
|
||||
.map(Payload::getDataUtf8)
|
||||
.onErrorReturn(ERROR_MSG)
|
||||
.block();
|
||||
}
|
||||
|
||||
public void dispose() {
|
||||
this.socket.dispose();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package com.baeldung.rsocket;
|
||||
|
||||
import static com.baeldung.rsocket.support.Constants.*;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.RSocketFactory;
|
||||
import io.rsocket.transport.netty.client.TcpClientTransport;
|
||||
import io.rsocket.util.DefaultPayload;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class ReqStreamClient {
|
||||
|
||||
private final RSocket socket;
|
||||
|
||||
public ReqStreamClient() {
|
||||
this.socket = RSocketFactory.connect()
|
||||
.transport(TcpClientTransport.create("localhost", TCP_PORT))
|
||||
.start()
|
||||
.block();
|
||||
}
|
||||
|
||||
public Flux<Float> getDataStream() {
|
||||
return socket
|
||||
.requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME))
|
||||
.map(Payload::getData)
|
||||
.map(buf -> buf.getFloat())
|
||||
.onErrorReturn(null);
|
||||
}
|
||||
|
||||
public void dispose() {
|
||||
this.socket.dispose();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
package com.baeldung.rsocket;
|
||||
|
||||
import com.baeldung.rsocket.support.WindDataPublisher;
|
||||
import static com.baeldung.rsocket.support.Constants.*;
|
||||
import com.baeldung.rsocket.support.GameController;
|
||||
import io.rsocket.AbstractRSocket;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocketFactory;
|
||||
import io.rsocket.transport.netty.server.TcpServerTransport;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class Server {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Server.class);
|
||||
|
||||
private final Disposable server;
|
||||
private final WindDataPublisher windDataPublisher = new WindDataPublisher();
|
||||
private final GameController gameController;
|
||||
|
||||
public Server() {
|
||||
this.server = RSocketFactory.receive()
|
||||
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
|
||||
.transport(TcpServerTransport.create("localhost", TCP_PORT))
|
||||
.start()
|
||||
.doOnNext(x -> LOG.info("Server started"))
|
||||
.subscribe();
|
||||
|
||||
this.gameController = new GameController("Server Player");
|
||||
}
|
||||
|
||||
public void dispose() {
|
||||
windDataPublisher.complete();
|
||||
this.server.dispose();
|
||||
}
|
||||
|
||||
/**
|
||||
* RSocket implementation
|
||||
*/
|
||||
private class RSocketImpl extends AbstractRSocket {
|
||||
|
||||
/**
|
||||
* Handle Request/Response messages
|
||||
*
|
||||
* @param payload Message payload
|
||||
* @return payload response
|
||||
*/
|
||||
@Override
|
||||
public Mono<Payload> requestResponse(Payload payload) {
|
||||
try {
|
||||
return Mono.just(payload); // reflect the payload back to the sender
|
||||
} catch (Exception x) {
|
||||
return Mono.error(x);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle Fire-and-Forget messages
|
||||
*
|
||||
* @param payload Message payload
|
||||
* @return nothing
|
||||
*/
|
||||
@Override
|
||||
public Mono<Void> fireAndForget(Payload payload) {
|
||||
try {
|
||||
windDataPublisher.publish(payload); // forward the payload
|
||||
return Mono.empty();
|
||||
} catch (Exception x) {
|
||||
return Mono.error(x);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle Request/Stream messages. Each request returns a new stream.
|
||||
*
|
||||
* @param payload Payload that can be used to determine which stream to return
|
||||
* @return Flux stream containing simulated wind speed data
|
||||
*/
|
||||
@Override
|
||||
public Flux<Payload> requestStream(Payload payload) {
|
||||
String streamName = payload.getDataUtf8();
|
||||
if (WIND_DATA_STREAM_NAME.equals(streamName)) {
|
||||
return Flux.from(windDataPublisher);
|
||||
}
|
||||
return Flux.error(new IllegalArgumentException(streamName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle request for bidirectional channel
|
||||
*
|
||||
* @param payloads Stream of payloads delivered from the client
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
|
||||
Flux.from(payloads)
|
||||
.subscribe(gameController::processPayload);
|
||||
Flux<Payload> channel = Flux.from(gameController);
|
||||
return channel;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package com.baeldung.rsocket.support;
|
||||
|
||||
public interface Constants {
|
||||
|
||||
int TCP_PORT = 7101;
|
||||
String ERROR_MSG = "error";
|
||||
int WIND_DATA_LENGTH = 30;
|
||||
String WIND_DATA_STREAM_NAME = "wind-data";
|
||||
int SHOT_COUNT = 10;
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
package com.baeldung.rsocket.support;
|
||||
|
||||
import static com.baeldung.rsocket.support.Constants.*;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.util.DefaultPayload;
|
||||
import java.util.List;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscriber;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class GameController implements Publisher<Payload> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(GameController.class);
|
||||
|
||||
private final String playerName;
|
||||
private final List<Long> shots;
|
||||
private Subscriber<? super Payload> subscriber;
|
||||
private boolean truce = false;
|
||||
|
||||
public GameController(String playerName) {
|
||||
this.playerName = playerName;
|
||||
this.shots = generateShotList();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a random list of time intervals, 0-1000ms
|
||||
*
|
||||
* @return List of time intervals
|
||||
*/
|
||||
private List<Long> generateShotList() {
|
||||
return Flux.range(1, SHOT_COUNT)
|
||||
.map(x -> (long) Math.ceil(Math.random() * 1000))
|
||||
.collectList()
|
||||
.block();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Subscriber<? super Payload> subscriber) {
|
||||
this.subscriber = subscriber;
|
||||
fireAtWill();
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish game events asynchronously
|
||||
*/
|
||||
private void fireAtWill() {
|
||||
new Thread(() -> {
|
||||
for (Long shotDelay : shots) {
|
||||
try { Thread.sleep(shotDelay); } catch (Exception x) {}
|
||||
if (truce) {
|
||||
break;
|
||||
}
|
||||
LOG.info("{}: bang!", playerName);
|
||||
subscriber.onNext(DefaultPayload.create("bang!"));
|
||||
}
|
||||
if (!truce) {
|
||||
LOG.info("{}: I give up!", playerName);
|
||||
subscriber.onNext(DefaultPayload.create("I give up"));
|
||||
}
|
||||
subscriber.onComplete();
|
||||
}).start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process events from the opponent
|
||||
*
|
||||
* @param payload Payload received from the rSocket
|
||||
*/
|
||||
public void processPayload(Payload payload) {
|
||||
String message = payload.getDataUtf8();
|
||||
switch (message) {
|
||||
case "bang!":
|
||||
String result = Math.random() < 0.5 ? "Haha missed!" : "Ow!";
|
||||
LOG.info("{}: {}", playerName, result);
|
||||
break;
|
||||
case "I give up":
|
||||
truce = true;
|
||||
LOG.info("{}: OK, truce", playerName);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package com.baeldung.rsocket.support;
|
||||
|
||||
import io.rsocket.Payload;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscriber;
|
||||
|
||||
/**
|
||||
* Simple PUblisher to provide async data to Flux stream
|
||||
*/
|
||||
public class WindDataPublisher implements Publisher<Payload> {
|
||||
|
||||
private Subscriber<? super Payload> subscriber;
|
||||
|
||||
@Override
|
||||
public void subscribe(Subscriber<? super Payload> subscriber) {
|
||||
this.subscriber = subscriber;
|
||||
}
|
||||
|
||||
public void publish(Payload payload) {
|
||||
if (subscriber != null) {
|
||||
subscriber.onNext(payload);
|
||||
}
|
||||
}
|
||||
|
||||
public void complete() {
|
||||
if (subscriber != null) {
|
||||
subscriber.onComplete();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
|
@ -0,0 +1,84 @@
|
|||
package com.baeldung.rsocket;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import reactor.core.Disposable;
|
||||
|
||||
public class RSocketIntegrationTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RSocketIntegrationTest.class);
|
||||
|
||||
private static Server server;
|
||||
|
||||
public RSocketIntegrationTest() {
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpClass() {
|
||||
server = new Server();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownClass() {
|
||||
server.dispose();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSendingAString_thenRevceiveTheSameString() {
|
||||
ReqResClient client = new ReqResClient();
|
||||
String string = "Hello RSocket";
|
||||
|
||||
assertEquals(string, client.callBlocking(string));
|
||||
|
||||
client.dispose();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSendingStream_thenReceiveTheSameStream() {
|
||||
// create the client that pushes data to the server and start sending
|
||||
FireNForgetClient fnfClient = new FireNForgetClient();
|
||||
// create a client to read a stream from the server and subscribe to events
|
||||
ReqStreamClient streamClient = new ReqStreamClient();
|
||||
|
||||
// get the data that is used by the client
|
||||
List<Float> data = fnfClient.getData();
|
||||
// create a place to count the results
|
||||
List<Float> dataReceived = new ArrayList<>();
|
||||
|
||||
// assert that the data received is the same as the data sent
|
||||
Disposable subscription = streamClient.getDataStream()
|
||||
.index()
|
||||
.subscribe(
|
||||
tuple -> {
|
||||
assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
|
||||
dataReceived.add(tuple.getT2());
|
||||
},
|
||||
err -> LOG.error(err.getMessage())
|
||||
);
|
||||
|
||||
// start sending the data
|
||||
fnfClient.sendData();
|
||||
|
||||
// wait a short time for the data to complete then dispose everything
|
||||
try { Thread.sleep(500); } catch (Exception x) {}
|
||||
subscription.dispose();
|
||||
fnfClient.dispose();
|
||||
|
||||
// verify the item count
|
||||
assertEquals("Wrong data count received", data.size(), dataReceived.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenRunningChannelGame_thenLogTheResults() {
|
||||
ChannelClient client = new ChannelClient();
|
||||
client.playGame();
|
||||
client.dispose();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue