Merge pull request #5916 from pandachris/master

BAEL-2283
This commit is contained in:
Loredana Crusoveanu 2018-12-14 23:58:49 +02:00 committed by GitHub
commit fd554d064f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 13 additions and 13 deletions

View File

@ -61,9 +61,9 @@ public class FireNForgetClient {
* @return List of random floats * @return List of random floats
*/ */
private List<Float> generateData() { private List<Float> generateData() {
List<Float> dataList = new ArrayList<>(WIND_DATA_LENGTH); List<Float> dataList = new ArrayList<>(DATA_LENGTH);
float velocity = 0; float velocity = 0;
for (int i = 0; i < WIND_DATA_LENGTH; i++) { for (int i = 0; i < DATA_LENGTH; i++) {
velocity += Math.random(); velocity += Math.random();
dataList.add(velocity); dataList.add(velocity);
} }

View File

@ -21,7 +21,7 @@ public class ReqStreamClient {
public Flux<Float> getDataStream() { public Flux<Float> getDataStream() {
return socket return socket
.requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME)) .requestStream(DefaultPayload.create(DATA_STREAM_NAME))
.map(Payload::getData) .map(Payload::getData)
.map(buf -> buf.getFloat()) .map(buf -> buf.getFloat())
.onErrorReturn(null); .onErrorReturn(null);

View File

@ -1,6 +1,6 @@
package com.baeldung.rsocket; package com.baeldung.rsocket;
import com.baeldung.rsocket.support.WindDataPublisher; import com.baeldung.rsocket.support.DataPublisher;
import static com.baeldung.rsocket.support.Constants.*; import static com.baeldung.rsocket.support.Constants.*;
import com.baeldung.rsocket.support.GameController; import com.baeldung.rsocket.support.GameController;
import io.rsocket.AbstractRSocket; import io.rsocket.AbstractRSocket;
@ -19,7 +19,7 @@ public class Server {
private static final Logger LOG = LoggerFactory.getLogger(Server.class); private static final Logger LOG = LoggerFactory.getLogger(Server.class);
private final Disposable server; private final Disposable server;
private final WindDataPublisher windDataPublisher = new WindDataPublisher(); private final DataPublisher dataPublisher = new DataPublisher();
private final GameController gameController; private final GameController gameController;
public Server() { public Server() {
@ -34,7 +34,7 @@ public class Server {
} }
public void dispose() { public void dispose() {
windDataPublisher.complete(); dataPublisher.complete();
this.server.dispose(); this.server.dispose();
} }
@ -67,7 +67,7 @@ public class Server {
@Override @Override
public Mono<Void> fireAndForget(Payload payload) { public Mono<Void> fireAndForget(Payload payload) {
try { try {
windDataPublisher.publish(payload); // forward the payload dataPublisher.publish(payload); // forward the payload
return Mono.empty(); return Mono.empty();
} catch (Exception x) { } catch (Exception x) {
return Mono.error(x); return Mono.error(x);
@ -78,13 +78,13 @@ public class Server {
* Handle Request/Stream messages. Each request returns a new stream. * Handle Request/Stream messages. Each request returns a new stream.
* *
* @param payload Payload that can be used to determine which stream to return * @param payload Payload that can be used to determine which stream to return
* @return Flux stream containing simulated wind speed data * @return Flux stream containing simulated measurement data
*/ */
@Override @Override
public Flux<Payload> requestStream(Payload payload) { public Flux<Payload> requestStream(Payload payload) {
String streamName = payload.getDataUtf8(); String streamName = payload.getDataUtf8();
if (WIND_DATA_STREAM_NAME.equals(streamName)) { if (DATA_STREAM_NAME.equals(streamName)) {
return Flux.from(windDataPublisher); return Flux.from(dataPublisher);
} }
return Flux.error(new IllegalArgumentException(streamName)); return Flux.error(new IllegalArgumentException(streamName));
} }

View File

@ -4,7 +4,7 @@ public interface Constants {
int TCP_PORT = 7101; int TCP_PORT = 7101;
String ERROR_MSG = "error"; String ERROR_MSG = "error";
int WIND_DATA_LENGTH = 30; int DATA_LENGTH = 30;
String WIND_DATA_STREAM_NAME = "wind-data"; String DATA_STREAM_NAME = "data";
int SHOT_COUNT = 10; int SHOT_COUNT = 10;
} }

View File

@ -7,7 +7,7 @@ import org.reactivestreams.Subscriber;
/** /**
* Simple PUblisher to provide async data to Flux stream * Simple PUblisher to provide async data to Flux stream
*/ */
public class WindDataPublisher implements Publisher<Payload> { public class DataPublisher implements Publisher<Payload> {
private Subscriber<? super Payload> subscriber; private Subscriber<? super Payload> subscriber;