This commit is contained in:
Dominik 2018-01-10 22:57:28 +01:00 committed by DominWos
parent cafee5736f
commit 71abd78dff

View File

@ -5,12 +5,11 @@ import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe; import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable; import io.reactivex.Observable;
import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.schedulers.Schedulers; import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber; import io.reactivex.subscribers.TestSubscriber;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -34,44 +33,44 @@ public class FlowableTest {
} }
@Test public void whenFlowableIsCreatedFromFlowableOnSubscribe_thenItIsProperlyInitialized() throws InterruptedException { @Test public void whenFlowableIsCreatedFromFlowableOnSubscribe_thenItIsProperlyInitialized() throws InterruptedException {
FlowableOnSubscribe<Integer> flowableOnSubscribe = flowableEmitter -> { FlowableOnSubscribe<Integer> flowableOnSubscribe = flowableEmitter -> flowableEmitter.onNext(1);
flowableEmitter.onNext(1);
};
Flowable<Integer> integerFlowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER); Flowable<Integer> integerFlowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);
assertNotNull(integerFlowable); assertNotNull(integerFlowable);
} }
@Test public void givenFlowableWithBufferStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenAllValuesAreBufferedAndReceived() throws InterruptedException { @Test public void whenFlowableUsesBufferStragegy_thenOnBackpressureAllValuesAreBufferedAndReceived() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
List listToFill = new ArrayList();
Observable observable = Observable.fromIterable(testList); Observable observable = Observable.fromIterable(testList);
observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).subscribe(listToFill::add); TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).test();
Thread.sleep(5000);
assertEquals(testList, listToFill); testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertEquals(testList, receivedInts);
} }
@Test public void givenFlowableWithDropStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenNotAllValuesAreReceived() throws InterruptedException { @Test public void whenFlowableUsesDropStrategy_thenOnBackpressureNotAllValuesAreReceivedAndTheLastElementIsNotReceived() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
List listToFill = new ArrayList();
Observable observable = Observable.fromIterable(testList); Observable observable = Observable.fromIterable(testList);
observable.toFlowable(BackpressureStrategy.DROP).observeOn(Schedulers.computation()).subscribe(listToFill::add); TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.DROP).observeOn(Schedulers.computation()).test();
Thread.sleep(5000); testSubscriber.awaitTerminalEvent();
assertThat(listToFill.size() < testList.size()); List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertThat(!listToFill.contains(100000));
assertThat(receivedInts.size() < testList.size());
assertThat(!receivedInts.contains(100000));
} }
@Test @Test public void whenFlowableUsesMissingStrategy_thenExceptionIsThrownOnBackpressure() {
public void givenFlowableWithMissingStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException {
Observable observable = Observable.range(1, 100000); Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test(); TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent(); subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class); subscriber.assertError(MissingBackpressureException.class);
} }
@Test @Test public void whenFlowableUsesErrorStrategy_thenExceptionIsThrownOnBackpressure() {
public void givenFlowableWithErrorStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException {
Observable observable = Observable.range(1, 100000); Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test(); TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test();
@ -79,16 +78,16 @@ public class FlowableTest {
subscriber.assertError(MissingBackpressureException.class); subscriber.assertError(MissingBackpressureException.class);
} }
@Test @Test public void whenFlowableUsesLatesStrategy_thenNotElementsAreReceivedButTheLastElementIs() {
public void givenFlowableWithLatestStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
List listToFill = new ArrayList();
Observable observable = Observable.fromIterable(testList); Observable observable = Observable.fromIterable(testList);
observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).subscribe(listToFill::add); TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).test();
Thread.sleep(6000);
assertThat(listToFill.size() < testList.size()); testSubscriber.awaitTerminalEvent();
assertThat(listToFill.contains(100000)); List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(receivedInts.contains(100000));
} }
} }