Added TestScheduler to TimeFiltering operators test (#3935)

This commit is contained in:
Carlo Corti 2018-04-05 11:02:56 +02:00 committed by Grzegorz Piwowarek
parent 0bf46b81a2
commit 5a44224fd9
1 changed files with 81 additions and 66 deletions

View File

@ -3,33 +3,34 @@ package com.baeldung.rxjava.filters;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Ignore;
import org.junit.Test;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
@Ignore("Manual only")
public class RxJavaTimeFilteringOperatorsTest {
@Test
public void givenTimedObservable_whenSampling_thenOnlySampleItemsAreEmitted() throws InterruptedException {
public void givenTimedObservable_whenSampling_thenOnlySampleItemsAreEmitted() {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> sampledObservable =
timedObservable.sample(Observable.interval(2500L, TimeUnit.MILLISECONDS));
timedObservable.sample(2500L, TimeUnit.MILLISECONDS, testScheduler);
sampledObservable.subscribe(subscriber);
Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
@ -37,22 +38,24 @@ public class RxJavaTimeFilteringOperatorsTest {
}
@Test
public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmitted() throws InterruptedException {
public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmitted() {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable.throttleLast(3100L, TimeUnit.MILLISECONDS);
Observable<Integer> filteredObservable = timedObservable.throttleLast(3100L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
@ -60,23 +63,25 @@ public class RxJavaTimeFilteringOperatorsTest {
}
@Test
public void givenRangeObservable_whenThrottlingFirst_thenThrottledFirstItemsAreEmitted() throws InterruptedException {
public void givenRangeObservable_whenThrottlingFirst_thenThrottledFirstItemsAreEmitted() {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable =
timedObservable.throttleFirst(4100L, TimeUnit.MILLISECONDS);
timedObservable.throttleFirst(4100L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
@ -84,22 +89,24 @@ public class RxJavaTimeFilteringOperatorsTest {
}
@Test
public void givenTimedObservable_whenThrottlingWithTimeout_thenLastItemIsEmitted() throws InterruptedException {
public void givenTimedObservable_whenThrottlingWithTimeout_thenLastItemIsEmitted() {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable.throttleWithTimeout(2000L, TimeUnit.MILLISECONDS);
Observable<Integer> filteredObservable = timedObservable.throttleWithTimeout(2000L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
@ -107,22 +114,24 @@ public class RxJavaTimeFilteringOperatorsTest {
}
@Test
public void givenTimedObservable_whenDebounceOperatorIsApplied_thenLastItemIsEmitted() throws InterruptedException {
public void givenTimedObservable_whenDebounceOperatorIsApplied_thenLastItemIsEmitted() {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable.debounce(2000L, TimeUnit.MILLISECONDS);
Observable<Integer> filteredObservable = timedObservable.debounce(2000L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
@ -130,40 +139,44 @@ public class RxJavaTimeFilteringOperatorsTest {
}
@Test
public void givenTimedObservable_whenUsingTimeout_thenTimeOutException() throws InterruptedException {
public void givenTimedObservable_whenUsingTimeout_thenTimeOutException() {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable.timeout(500L, TimeUnit.MILLISECONDS);
Observable<Integer> filteredObservable = timedObservable.timeout(500L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertError(TimeoutException.class);
subscriber.assertValues(1);
}
@Test
public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObservableEmitsItems() throws InterruptedException {
public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObservableEmitsItems() {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);
Observable<Integer> delayedObservable = Observable.just(1)
.delay(3000, TimeUnit.MILLISECONDS);
.delay(3000, TimeUnit.MILLISECONDS, testScheduler);
TestSubscriber<Integer> subscriber = new TestSubscriber();
@ -171,7 +184,7 @@ public class RxJavaTimeFilteringOperatorsTest {
filteredObservable.subscribe(subscriber);
Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
@ -179,25 +192,27 @@ public class RxJavaTimeFilteringOperatorsTest {
}
@Test
public void givenObservable_whenSkippingWhile_thenItemsAreEmittedUntilSecondObservableEmitsItems() throws InterruptedException {
public void givenObservable_whenSkippingWhile_thenItemsAreEmittedUntilSecondObservableEmitsItems() {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);
TestSubscriber subscriber = new TestSubscriber();
Observable<Integer> delayedObservable = Observable.just(1)
.delay(3000, TimeUnit.MILLISECONDS);
.delay(3000, TimeUnit.MILLISECONDS, testScheduler);
Observable<Integer> filteredObservable = timedObservable.takeUntil(delayedObservable);
filteredObservable.subscribe(subscriber);
Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();