Refactor BackpressureTest

This commit is contained in:
pivovarit 2017-02-05 17:16:27 +01:00
parent 5b79f605a4
commit 4e91a6388c
1 changed files with 65 additions and 49 deletions

View File

@ -10,6 +10,7 @@ import rx.subjects.PublishSubject;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.junit.Assert.assertTrue;
@ -21,13 +22,16 @@ public class RxJavaBackpressureTest {
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
//when
Observable.range(1, 1_000_000)
Observable
.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
//then
testSubscriber.awaitTerminalEvent();
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
}
@ -35,16 +39,16 @@ public class RxJavaBackpressureTest {
public void givenHotObservable_whenBackpressureNotDefined_shouldTrowException() {
//given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.<Integer>create();
PublishSubject<Integer> source = PublishSubject.<Integer> create();
source.observeOn(Schedulers.computation())
source
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
//when
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
IntStream
.range(0, 1_000_000)
.forEach(source::onNext);
//then
testSubscriber.awaitTerminalEvent();
@ -55,20 +59,23 @@ public class RxJavaBackpressureTest {
public void givenHotObservable_whenWindowIsDefined_shouldNotThrowException() {
//given
TestSubscriber<Observable<Integer>> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.<Integer>create();
PublishSubject<Integer> source = PublishSubject.<Integer> create();
//when
source.window(500)
source
.window(500)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
for (int i = 0; i < 1_000; i++) {
source.onNext(i);
}
IntStream
.range(0, 1_000)
.forEach(source::onNext);
//then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
}
@ -76,43 +83,49 @@ public class RxJavaBackpressureTest {
public void givenHotObservable_whenBufferIsDefined_shouldNotThrowException() {
//given
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.<Integer>create();
PublishSubject<Integer> source = PublishSubject.<Integer> create();
//when
source.buffer(1024)
source
.buffer(1024)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
for (int i = 0; i < 1_000; i++) {
source.onNext(i);
}
IntStream
.range(0, 1_000)
.forEach(source::onNext);
//then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
}
@Test
public void givenHotObservable_whenSkippingOperationIsDefined_shouldNotThrowException() {
//given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.<Integer>create();
PublishSubject<Integer> source = PublishSubject.<Integer> create();
//when
source.sample(100, TimeUnit.MILLISECONDS)
// .throttleFirst(100, TimeUnit.MILLISECONDS)
// .throttleFirst(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
for (int i = 0; i < 1_000; i++) {
source.onNext(i);
}
IntStream
.range(0, 1_000)
.forEach(source::onNext);
//then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
}
@ -122,34 +135,37 @@ public class RxJavaBackpressureTest {
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
//when
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> {
},
BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
Observable
.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
//then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
}
@Test
public void givenHotObservable_whenOnBackpressureDropDefined_shouldNotThrowException() {
//given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
//when
Observable.range(1, 1_000_000)
Observable
.range(1, 1_000_000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
//then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
}
}