diff --git a/pom.xml b/pom.xml index dabb4a9628..62bb0566a2 100644 --- a/pom.xml +++ b/pom.xml @@ -483,6 +483,7 @@ reactor-core resteasy + rsocket rxjava rxjava-2 rabbitmq @@ -1036,6 +1037,7 @@ reactor-core resteasy + rsocket rxjava rxjava-2 rabbitmq diff --git a/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java index 8f2d4a9ffd..36f0b56fb4 100644 --- a/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java +++ b/rsocket/src/test/java/com/baeldung/rsocket/RSocketIntegrationTest.java @@ -1,5 +1,6 @@ package com.baeldung.rsocket; +import java.util.ArrayList; import java.util.List; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -29,7 +30,7 @@ public class RSocketIntegrationTest { } @Test - public void testRequestResponse() { + public void whenSendingAString_thenRevceiveTheSameString() { ReqResClient client = new ReqResClient(); String string = "Hello RSocket"; assertEquals(string, client.callBlocking(string)); @@ -37,20 +38,27 @@ public class RSocketIntegrationTest { } @Test - public void testFNFAndRequestStream() { + public void whenSendingStream_thenReceiveTheSameStream() { // create the client that pushes data to the server and start sending FireNForgetClient fnfClient = new FireNForgetClient(); - // get the data that is used by the client - List data = fnfClient.getData(); - // create a client to read a stream from the server and subscribe to events ReqStreamClient streamClient = new ReqStreamClient(); + + // get the data that is used by the client + List data = fnfClient.getData(); + // create a place to count the results + List dataReceived = new ArrayList<>(); + // assert that the data received is the same as the data sent Disposable subscription = streamClient.getDataStream() - .index() - .doOnNext(element -> assertEquals(data.get(element.getT1().intValue()), element.getT2())) - .count() - .subscribe(count -> assertEquals(data.size(), count.intValue())); + .index() + .subscribe( + tuple -> { + assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2()); + dataReceived.add(tuple.getT2()); + }, + err -> LOG.error(err.getMessage()) + ); // start sending the data fnfClient.sendData(); @@ -59,10 +67,13 @@ public class RSocketIntegrationTest { try { Thread.sleep(500); } catch (Exception x) {} subscription.dispose(); fnfClient.dispose(); + + // verify the item count + assertEquals("Wrong data count received", data.size(), dataReceived.size()); } @Test - public void testChannel() { + public void whenRunningChannelGame_thenLogTheResults() { ChannelClient client = new ChannelClient(); client.playGame(); client.dispose();