BAEL-1512 Filtering Observables in RxJava (#3931)
* Added test cases for RxJava Filtering Operators * Added @Ignore on TimeFiltering tests
This commit is contained in:
parent
266c285533
commit
0bf46b81a2
|
@ -0,0 +1,202 @@
|
|||
package com.baeldung.rxjava.filters;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import rx.Observable;
|
||||
import rx.observers.TestSubscriber;
|
||||
|
||||
public class RxJavaFilterOperatorsTest {
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenFilteringItems_thenOddItemsAreFiltered() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 10);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.filter(i -> i % 2 != 0);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(5);
|
||||
subscriber.assertValues(1, 3, 5, 7, 9);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenFilteringWithTake_thenOnlyFirstThreeItemsAreEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 10);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.take(3);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(3);
|
||||
subscriber.assertValues(1, 2, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenFilteringWithTakeWhile_thenItemsEmittedUntilConditionIsVerified() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.takeWhile(i -> i < 4);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(3);
|
||||
subscriber.assertValues(1, 2, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenFilteringWithTakeFirst_thenOnlyFirstItemIsEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 5, 7, 6);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.takeFirst(x -> x > 5);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(7);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenFilteringWithFirst_thenOnlyFirstThreeItemsAreEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 10);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.first();
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenEmptyObservable_whenFilteringWithFirstOrDefault_thenDefaultValue() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.empty();
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.firstOrDefault(-1);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(-1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenFilteringWithTakeLast_thenLastThreeItemAreEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 10);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.takeLast(3);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(3);
|
||||
subscriber.assertValues(8, 9, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenFilteringWithLast_thenOnlyLastItemIsEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 10);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.last(i -> i % 2 != 0);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(9);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenFilteringWithLastAndDefault_thenOnlyDefaultIsEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 10);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.lastOrDefault(-1, i -> i > 10);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(-1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenTakingElementAt_thenItemAtSpecifiedIndexIsEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 5, 7, 11);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.elementAt(4);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(7);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenTakingElementAtOrDefault_thenDefaultIsReturned() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 5, 7, 11);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.elementAtOrDefault(7, -1);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(-1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMixedTypeObservable_whenFilteringByType_thenOnlyNumbersAreEmitted() {
|
||||
|
||||
Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11);
|
||||
TestSubscriber subscriber = new TestSubscriber();
|
||||
|
||||
Observable filteredObservable = sourceObservable.ofType(String.class);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(2);
|
||||
subscriber.assertValues("two", "five");
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
package com.baeldung.rxjava.filters;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import rx.Observable;
|
||||
import rx.observers.TestSubscriber;
|
||||
|
||||
public class RxJavaSkipOperatorsTest {
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenSkipping_thenFirstFourItemsAreSkipped() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 10);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.skip(4);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(6);
|
||||
subscriber.assertValues(5, 6, 7, 8, 9, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenSkippingWhile_thenFirstItemsAreSkipped() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.skipWhile(i -> i < 4);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(6);
|
||||
subscriber.assertValues(4, 5, 4, 3, 2, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenSkippingLast_thenLastFiveItemsAreSkipped() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 10);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = sourceObservable.skipLast(5);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(5);
|
||||
subscriber.assertValues(1, 2, 3, 4, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenFilteringDistinct_thenOnlyDistinctValuesAreEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.just(1, 1, 2, 2, 1, 3, 3, 1);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> distinctObservable = sourceObservable.distinct();
|
||||
|
||||
distinctObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(3);
|
||||
subscriber.assertValues(1, 2, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenFilteringDistinctUntilChanged_thenOnlyDistinctConsecutiveItemsAreEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.just(1, 1, 2, 2, 1, 3, 3, 1);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> distinctObservable = sourceObservable.distinctUntilChanged();
|
||||
|
||||
distinctObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(5);
|
||||
subscriber.assertValues(1, 2, 1, 3, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenIgnoringElements_thenOnlyDistinctConsecutiveItemsAreEmitted() {
|
||||
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 10);
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> ignoredObservable = sourceObservable.ignoreElements();
|
||||
|
||||
ignoredObservable.subscribe(subscriber);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(0);
|
||||
subscriber.assertNoValues();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,206 @@
|
|||
package com.baeldung.rxjava.filters;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import rx.Observable;
|
||||
import rx.observers.TestSubscriber;
|
||||
|
||||
@Ignore("Manual only")
|
||||
public class RxJavaTimeFilteringOperatorsTest {
|
||||
|
||||
@Test
|
||||
public void givenTimedObservable_whenSampling_thenOnlySampleItemsAreEmitted() throws InterruptedException {
|
||||
|
||||
Observable<Integer> timedObservable =
|
||||
Observable.just(1, 2, 3, 4, 5, 6)
|
||||
.zipWith(
|
||||
Observable.interval(0, 1, TimeUnit.SECONDS),
|
||||
(item, time) -> item
|
||||
);
|
||||
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> sampledObservable =
|
||||
timedObservable.sample(Observable.interval(2500L, TimeUnit.MILLISECONDS));
|
||||
|
||||
sampledObservable.subscribe(subscriber);
|
||||
|
||||
Thread.sleep(7000);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValues(3, 5, 6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmitted() throws InterruptedException {
|
||||
|
||||
Observable<Integer> timedObservable =
|
||||
Observable.just(1, 2, 3, 4, 5, 6)
|
||||
.zipWith(
|
||||
Observable.interval(0, 1, TimeUnit.SECONDS),
|
||||
(item, time) -> item
|
||||
);
|
||||
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = timedObservable.throttleLast(3100L, TimeUnit.MILLISECONDS);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
Thread.sleep(7000);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValues(4, 6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeObservable_whenThrottlingFirst_thenThrottledFirstItemsAreEmitted() throws InterruptedException {
|
||||
|
||||
Observable<Integer> timedObservable =
|
||||
Observable.just(1, 2, 3, 4, 5, 6)
|
||||
.zipWith(
|
||||
Observable.interval(0, 1, TimeUnit.SECONDS),
|
||||
(item, time) -> item
|
||||
);
|
||||
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable =
|
||||
timedObservable.throttleFirst(4100L, TimeUnit.MILLISECONDS);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
Thread.sleep(7000);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValues(1, 6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenTimedObservable_whenThrottlingWithTimeout_thenLastItemIsEmitted() throws InterruptedException {
|
||||
|
||||
Observable<Integer> timedObservable =
|
||||
Observable.just(1, 2, 3, 4, 5, 6)
|
||||
.zipWith(
|
||||
Observable.interval(0, 1, TimeUnit.SECONDS),
|
||||
(item, time) -> item
|
||||
);
|
||||
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = timedObservable.throttleWithTimeout(2000L, TimeUnit.MILLISECONDS);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
Thread.sleep(7000);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValue(6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenTimedObservable_whenDebounceOperatorIsApplied_thenLastItemIsEmitted() throws InterruptedException {
|
||||
|
||||
Observable<Integer> timedObservable =
|
||||
Observable.just(1, 2, 3, 4, 5, 6)
|
||||
.zipWith(
|
||||
Observable.interval(0, 1, TimeUnit.SECONDS),
|
||||
(item, time) -> item
|
||||
);
|
||||
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = timedObservable.debounce(2000L, TimeUnit.MILLISECONDS);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
Thread.sleep(7000);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValue(6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenTimedObservable_whenUsingTimeout_thenTimeOutException() throws InterruptedException {
|
||||
|
||||
Observable<Integer> timedObservable =
|
||||
Observable.just(1, 2, 3, 4, 5, 6)
|
||||
.zipWith(
|
||||
Observable.interval(0, 1, TimeUnit.SECONDS),
|
||||
(item, time) -> item
|
||||
);
|
||||
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = timedObservable.timeout(500L, TimeUnit.MILLISECONDS);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
Thread.sleep(7000);
|
||||
|
||||
subscriber.assertError(TimeoutException.class);
|
||||
subscriber.assertValues(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObservableEmitsItems() throws InterruptedException {
|
||||
|
||||
Observable<Integer> timedObservable =
|
||||
Observable.just(1, 2, 3, 4, 5, 6)
|
||||
.zipWith(
|
||||
Observable.interval(0, 1, TimeUnit.SECONDS),
|
||||
(item, time) -> item
|
||||
);
|
||||
|
||||
|
||||
Observable<Integer> delayedObservable = Observable.just(1)
|
||||
.delay(3000, TimeUnit.MILLISECONDS);
|
||||
|
||||
TestSubscriber<Integer> subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> filteredObservable = timedObservable.skipUntil(delayedObservable);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
Thread.sleep(7000);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValues(4, 5, 6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenSkippingWhile_thenItemsAreEmittedUntilSecondObservableEmitsItems() throws InterruptedException {
|
||||
|
||||
Observable<Integer> timedObservable =
|
||||
Observable.just(1, 2, 3, 4, 5, 6)
|
||||
.zipWith(
|
||||
Observable.interval(0, 1, TimeUnit.SECONDS),
|
||||
(item, time) -> item
|
||||
);
|
||||
|
||||
TestSubscriber subscriber = new TestSubscriber();
|
||||
|
||||
Observable<Integer> delayedObservable = Observable.just(1)
|
||||
.delay(3000, TimeUnit.MILLISECONDS);
|
||||
|
||||
Observable<Integer> filteredObservable = timedObservable.takeUntil(delayedObservable);
|
||||
|
||||
filteredObservable.subscribe(subscriber);
|
||||
|
||||
Thread.sleep(7000);
|
||||
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValues(1, 2, 3);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue