Merge pull request #7 from pandachris/master

merge from master
This commit is contained in:
pandachris 2018-12-09 21:35:32 +07:00 committed by GitHub
commit e00bfcf21d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 36 additions and 36 deletions

View File

@ -14,17 +14,17 @@ public class ChannelClient {
public ChannelClient() { public ChannelClient() {
this.socket = RSocketFactory.connect() this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT)) .transport(TcpClientTransport.create("localhost", TCP_PORT))
.start() .start()
.block(); .block();
this.gameController = new GameController("Client Player"); this.gameController = new GameController("Client Player");
} }
public void playGame() { public void playGame() {
socket.requestChannel(Flux.from(gameController)) socket.requestChannel(Flux.from(gameController))
.doOnNext(gameController::processPayload) .doOnNext(gameController::processPayload)
.blockLast(); .blockLast();
} }
public void dispose() { public void dispose() {

View File

@ -25,9 +25,9 @@ public class FireNForgetClient {
public FireNForgetClient() { public FireNForgetClient() {
this.socket = RSocketFactory.connect() this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT)) .transport(TcpClientTransport.create("localhost", TCP_PORT))
.start() .start()
.block(); .block();
this.data = Collections.unmodifiableList(generateData()); this.data = Collections.unmodifiableList(generateData());
} }
@ -36,11 +36,11 @@ public class FireNForgetClient {
*/ */
public void sendData() { public void sendData() {
Flux.interval(Duration.ofMillis(50)) Flux.interval(Duration.ofMillis(50))
.take(data.size()) .take(data.size())
.map(this::createFloatPayload) .map(this::createFloatPayload)
.map(socket::fireAndForget) .map(socket::fireAndForget)
.flatMap(Function.identity()) .flatMap(Function.identity())
.blockLast(); .blockLast();
} }
/** /**

View File

@ -13,17 +13,17 @@ public class ReqResClient {
public ReqResClient() { public ReqResClient() {
this.socket = RSocketFactory.connect() this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT)) .transport(TcpClientTransport.create("localhost", TCP_PORT))
.start() .start()
.block(); .block();
} }
public String callBlocking(String string) { public String callBlocking(String string) {
return socket return socket
.requestResponse(DefaultPayload.create(string)) .requestResponse(DefaultPayload.create(string))
.map(Payload::getDataUtf8) .map(Payload::getDataUtf8)
.onErrorReturn(ERROR_MSG) .onErrorReturn(ERROR_MSG)
.block(); .block();
} }
public void dispose() { public void dispose() {

View File

@ -14,17 +14,17 @@ public class ReqStreamClient {
public ReqStreamClient() { public ReqStreamClient() {
this.socket = RSocketFactory.connect() this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT)) .transport(TcpClientTransport.create("localhost", TCP_PORT))
.start() .start()
.block(); .block();
} }
public Flux<Float> getDataStream() { public Flux<Float> getDataStream() {
return socket return socket
.requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) .requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME))
.map(Payload::getData) .map(Payload::getData)
.map(buf -> buf.getFloat()) .map(buf -> buf.getFloat())
.onErrorReturn(null); .onErrorReturn(null);
} }
public void dispose() { public void dispose() {

View File

@ -24,11 +24,11 @@ public class Server {
public Server() { public Server() {
this.server = RSocketFactory.receive() this.server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
.transport(TcpServerTransport.create("localhost", TCP_PORT)) .transport(TcpServerTransport.create("localhost", TCP_PORT))
.start() .start()
.doOnNext(x -> LOG.info("Server started")) .doOnNext(x -> LOG.info("Server started"))
.subscribe(); .subscribe();
this.gameController = new GameController("Server Player"); this.gameController = new GameController("Server Player");
} }
@ -98,7 +98,7 @@ public class Server {
@Override @Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) { public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads) Flux.from(payloads)
.subscribe(gameController::processPayload); .subscribe(gameController::processPayload);
Flux<Payload> channel = Flux.from(gameController); Flux<Payload> channel = Flux.from(gameController);
return channel; return channel;
} }

View File

@ -31,9 +31,9 @@ public class GameController implements Publisher<Payload> {
*/ */
private List<Long> generateShotList() { private List<Long> generateShotList() {
return Flux.range(1, SHOT_COUNT) return Flux.range(1, SHOT_COUNT)
.map(x -> (long) Math.ceil(Math.random() * 1000)) .map(x -> (long) Math.ceil(Math.random() * 1000))
.collectList() .collectList()
.block(); .block();
} }
@Override @Override