diff --git a/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java b/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java index 71d4f87b35..26b17ec163 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java @@ -5,12 +5,11 @@ import io.reactivex.Flowable; import io.reactivex.FlowableOnSubscribe; import io.reactivex.Observable; import io.reactivex.exceptions.MissingBackpressureException; -import io.reactivex.exceptions.OnErrorNotImplementedException; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; + import org.junit.Test; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -34,61 +33,61 @@ public class FlowableTest { } @Test public void whenFlowableIsCreatedFromFlowableOnSubscribe_thenItIsProperlyInitialized() throws InterruptedException { - FlowableOnSubscribe flowableOnSubscribe = flowableEmitter -> { - flowableEmitter.onNext(1); - }; + FlowableOnSubscribe flowableOnSubscribe = flowableEmitter -> flowableEmitter.onNext(1); Flowable integerFlowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER); assertNotNull(integerFlowable); } - @Test public void givenFlowableWithBufferStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenAllValuesAreBufferedAndReceived() throws InterruptedException { + @Test public void whenFlowableUsesBufferStragegy_thenOnBackpressureAllValuesAreBufferedAndReceived() { + List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); + Observable observable = Observable.fromIterable(testList); + TestSubscriber testSubscriber = observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).test(); + + testSubscriber.awaitTerminalEvent(); + + List receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList()); + + assertEquals(testList, receivedInts); + } + + @Test public void whenFlowableUsesDropStrategy_thenOnBackpressureNotAllValuesAreReceivedAndTheLastElementIsNotReceived() { List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); - List listToFill = new ArrayList(); Observable observable = Observable.fromIterable(testList); - observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).subscribe(listToFill::add); - Thread.sleep(5000); - assertEquals(testList, listToFill); + TestSubscriber testSubscriber = observable.toFlowable(BackpressureStrategy.DROP).observeOn(Schedulers.computation()).test(); + testSubscriber.awaitTerminalEvent(); + List receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList()); + + assertThat(receivedInts.size() < testList.size()); + assertThat(!receivedInts.contains(100000)); } - @Test public void givenFlowableWithDropStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenNotAllValuesAreReceived() throws InterruptedException { - List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); - List listToFill = new ArrayList(); - - Observable observable = Observable.fromIterable(testList); - observable.toFlowable(BackpressureStrategy.DROP).observeOn(Schedulers.computation()).subscribe(listToFill::add); - Thread.sleep(5000); - assertThat(listToFill.size() < testList.size()); - assertThat(!listToFill.contains(100000)); - } - - @Test - public void givenFlowableWithMissingStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException { + @Test public void whenFlowableUsesMissingStrategy_thenExceptionIsThrownOnBackpressure() { Observable observable = Observable.range(1, 100000); - TestSubscriber subscriber =observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test(); - subscriber.awaitTerminalEvent(); - subscriber.assertError(MissingBackpressureException.class); - } - - @Test - public void givenFlowableWithErrorStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException { - Observable observable = Observable.range(1, 100000); - TestSubscriber subscriber =observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test(); + TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test(); subscriber.awaitTerminalEvent(); subscriber.assertError(MissingBackpressureException.class); } - @Test - public void givenFlowableWithLatestStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException { - List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); - List listToFill = new ArrayList(); + @Test public void whenFlowableUsesErrorStrategy_thenExceptionIsThrownOnBackpressure() { + Observable observable = Observable.range(1, 100000); + TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test(); + subscriber.awaitTerminalEvent(); + subscriber.assertError(MissingBackpressureException.class); + } + + @Test public void whenFlowableUsesLatesStrategy_thenNotElementsAreReceivedButTheLastElementIs() { + List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); Observable observable = Observable.fromIterable(testList); - observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).subscribe(listToFill::add); - Thread.sleep(6000); - assertThat(listToFill.size() < testList.size()); - assertThat(listToFill.contains(100000)); + TestSubscriber testSubscriber = observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).test(); + + testSubscriber.awaitTerminalEvent(); + List receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList()); + + assertThat(receivedInts.size() < testList.size()); + assertThat(receivedInts.contains(100000)); } } \ No newline at end of file