BAELDUNG-2283 (#6)

This commit is contained in:
pandachris 2018-12-06 23:14:05 +07:00 committed by GitHub
parent 1472e6afd5
commit 291ba89004
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 554 additions and 0 deletions

54
rsocket/pom.xml Normal file
View File

@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rsocket</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rsocket</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>0.11.13</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>0.11.13</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,33 @@
package com.baeldung.rsocket;
import static com.baeldung.rsocket.support.Constants.*;
import com.baeldung.rsocket.support.GameController;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import reactor.core.publisher.Flux;
public class ChannelClient {
private final RSocket socket;
private final GameController gameController;
public ChannelClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
this.gameController = new GameController("Client Player");
}
public void playGame() {
socket.requestChannel(Flux.from(gameController))
.doOnNext(gameController::processPayload)
.blockLast();
}
public void dispose() {
this.socket.dispose();
}
}

View File

@ -0,0 +1,86 @@
package com.baeldung.rsocket;
import static com.baeldung.rsocket.support.Constants.*;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class FireNForgetClient {
private static final Logger LOG = LoggerFactory.getLogger(FireNForgetClient.class);
private final RSocket socket;
private final List<Float> data;
public FireNForgetClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
this.data = Collections.unmodifiableList(generateData());
}
/**
* Send binary velocity (float) every 50ms
*/
public void sendData() {
Flux.interval(Duration.ofMillis(50))
.take(data.size())
.map(this::createFloatPayload)
.map(socket::fireAndForget)
.flatMap(Function.identity())
.blockLast();
}
/**
* Create a binary payload containing a single float value
*
* @param index Index into the data list
* @return Payload ready to send to the server
*/
private Payload createFloatPayload(Long index) {
float velocity = data.get(index.intValue());
ByteBuffer buffer = ByteBuffer.allocate(4).putFloat(velocity);
buffer.rewind();
return DefaultPayload.create(buffer);
}
/**
* Generate sample data
*
* @return List of random floats
*/
private List<Float> generateData() {
List<Float> dataList = new ArrayList<>(WIND_DATA_LENGTH);
float velocity = 0;
for (int i = 0; i < WIND_DATA_LENGTH; i++) {
velocity += Math.random();
dataList.add(velocity);
}
return dataList;
}
/**
* Get the data used for this client.
*
* @return list of data values
*/
public List<Float> getData() {
return data;
}
public void dispose() {
this.socket.dispose();
}
}

View File

@ -0,0 +1,32 @@
package com.baeldung.rsocket;
import static com.baeldung.rsocket.support.Constants.*;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
public class ReqResClient {
private final RSocket socket;
public ReqResClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public String callBlocking(String string) {
return socket
.requestResponse(DefaultPayload.create(string))
.map(Payload::getDataUtf8)
.onErrorReturn(ERROR_MSG)
.block();
}
public void dispose() {
this.socket.dispose();
}
}

View File

@ -0,0 +1,33 @@
package com.baeldung.rsocket;
import static com.baeldung.rsocket.support.Constants.*;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Flux;
public class ReqStreamClient {
private final RSocket socket;
public ReqStreamClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public Flux<Float> getDataStream() {
return socket
.requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME))
.map(Payload::getData)
.map(buf -> buf.getFloat())
.onErrorReturn(null);
}
public void dispose() {
this.socket.dispose();
}
}

View File

@ -0,0 +1,107 @@
package com.baeldung.rsocket;
import com.baeldung.rsocket.support.WindDataPublisher;
import static com.baeldung.rsocket.support.Constants.*;
import com.baeldung.rsocket.support.GameController;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class Server {
private static final Logger LOG = LoggerFactory.getLogger(Server.class);
private final Disposable server;
private final WindDataPublisher windDataPublisher = new WindDataPublisher();
private final GameController gameController;
public Server() {
this.server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
.transport(TcpServerTransport.create("localhost", TCP_PORT))
.start()
.doOnNext(x -> LOG.info("Server started"))
.subscribe();
this.gameController = new GameController("Server Player");
}
public void dispose() {
windDataPublisher.complete();
this.server.dispose();
}
/**
* RSocket implementation
*/
private class RSocketImpl extends AbstractRSocket {
/**
* Handle Request/Response messages
*
* @param payload Message payload
* @return payload response
*/
@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
return Mono.just(payload); // reflect the payload back to the sender
} catch (Exception x) {
return Mono.error(x);
}
}
/**
* Handle Fire-and-Forget messages
*
* @param payload Message payload
* @return nothing
*/
@Override
public Mono<Void> fireAndForget(Payload payload) {
try {
windDataPublisher.publish(payload); // forward the payload
return Mono.empty();
} catch (Exception x) {
return Mono.error(x);
}
}
/**
* Handle Request/Stream messages. Each request returns a new stream.
*
* @param payload Payload that can be used to determine which stream to return
* @return Flux stream containing simulated wind speed data
*/
@Override
public Flux<Payload> requestStream(Payload payload) {
String streamName = payload.getDataUtf8();
if (WIND_DATA_STREAM_NAME.equals(streamName)) {
return Flux.from(windDataPublisher);
}
return Flux.error(new IllegalArgumentException(streamName));
}
/**
* Handle request for bidirectional channel
*
* @param payloads Stream of payloads delivered from the client
* @return
*/
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads)
.subscribe(gameController::processPayload);
Flux<Payload> channel = Flux.from(gameController);
return channel;
}
}
}

View File

@ -0,0 +1,10 @@
package com.baeldung.rsocket.support;
public interface Constants {
int TCP_PORT = 7101;
String ERROR_MSG = "error";
int WIND_DATA_LENGTH = 30;
String WIND_DATA_STREAM_NAME = "wind-data";
int SHOT_COUNT = 10;
}

View File

@ -0,0 +1,84 @@
package com.baeldung.rsocket.support;
import static com.baeldung.rsocket.support.Constants.*;
import io.rsocket.Payload;
import io.rsocket.util.DefaultPayload;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class GameController implements Publisher<Payload> {
private static final Logger LOG = LoggerFactory.getLogger(GameController.class);
private final String playerName;
private final List<Long> shots;
private Subscriber<? super Payload> subscriber;
private boolean truce = false;
public GameController(String playerName) {
this.playerName = playerName;
this.shots = generateShotList();
}
/**
* Create a random list of time intervals, 0-1000ms
*
* @return List of time intervals
*/
private List<Long> generateShotList() {
return Flux.range(1, SHOT_COUNT)
.map(x -> (long) Math.ceil(Math.random() * 1000))
.collectList()
.block();
}
@Override
public void subscribe(Subscriber<? super Payload> subscriber) {
this.subscriber = subscriber;
fireAtWill();
}
/**
* Publish game events asynchronously
*/
private void fireAtWill() {
new Thread(() -> {
for (Long shotDelay : shots) {
try { Thread.sleep(shotDelay); } catch (Exception x) {}
if (truce) {
break;
}
LOG.info("{}: bang!", playerName);
subscriber.onNext(DefaultPayload.create("bang!"));
}
if (!truce) {
LOG.info("{}: I give up!", playerName);
subscriber.onNext(DefaultPayload.create("I give up"));
}
subscriber.onComplete();
}).start();
}
/**
* Process events from the opponent
*
* @param payload Payload received from the rSocket
*/
public void processPayload(Payload payload) {
String message = payload.getDataUtf8();
switch (message) {
case "bang!":
String result = Math.random() < 0.5 ? "Haha missed!" : "Ow!";
LOG.info("{}: {}", playerName, result);
break;
case "I give up":
truce = true;
LOG.info("{}: OK, truce", playerName);
break;
}
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.rsocket.support;
import io.rsocket.Payload;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
/**
* Simple PUblisher to provide async data to Flux stream
*/
public class WindDataPublisher implements Publisher<Payload> {
private Subscriber<? super Payload> subscriber;
@Override
public void subscribe(Subscriber<? super Payload> subscriber) {
this.subscriber = subscriber;
}
public void publish(Payload payload) {
if (subscriber != null) {
subscriber.onNext(payload);
}
}
public void complete() {
if (subscriber != null) {
subscriber.onComplete();
}
}
}

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,71 @@
package com.baeldung.rsocket;
import java.util.List;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
public class RSocketIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(RSocketIntegrationTest.class);
private static Server server;
public RSocketIntegrationTest() {
}
@BeforeClass
public static void setUpClass() {
server = new Server();
}
@AfterClass
public static void tearDownClass() {
server.dispose();
}
@Test
public void testRequestResponse() {
ReqResClient client = new ReqResClient();
String string = "Hello RSocket";
assertEquals(string, client.callBlocking(string));
client.dispose();
}
@Test
public void testFNFAndRequestStream() {
// create the client that pushes data to the server and start sending
FireNForgetClient fnfClient = new FireNForgetClient();
// get the data that is used by the client
List<Float> data = fnfClient.getData();
// create a client to read a stream from the server and subscribe to events
ReqStreamClient streamClient = new ReqStreamClient();
// assert that the data received is the same as the data sent
Disposable subscription = streamClient.getDataStream()
.index()
.doOnNext(element -> assertEquals(data.get(element.getT1().intValue()), element.getT2()))
.count()
.subscribe(count -> assertEquals(data.size(), count.intValue()));
// start sending the data
fnfClient.sendData();
// wait a short time for the data to complete then dispose everything
try { Thread.sleep(500); } catch (Exception x) {}
subscription.dispose();
fnfClient.dispose();
}
@Test
public void testChannel() {
ChannelClient client = new ChannelClient();
client.playGame();
client.dispose();
}
}