BAEL-1510 - RxJava - Combining Observables (#3892)

* Simplified examples and lambda expressions
This commit is contained in:
Arjay Nacion 2018-03-28 01:41:20 +08:00 committed by Grzegorz Piwowarek
parent 66d6856dcb
commit 43365351c3
1 changed files with 24 additions and 70 deletions

View File

@ -4,14 +4,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import rx.Observable;
@ -19,50 +13,18 @@ import rx.observers.TestSubscriber;
public class ObservableCombineUnitTest {
private static ExecutorService executor;
@BeforeClass
public static void setupClass() {
executor = Executors.newFixedThreadPool(10);
}
@AfterClass
public static void tearDownClass() {
executor.shutdown();
}
@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
List<String> results = new ArrayList<>();
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
//@formatter:off
Observable.merge(
Observable.from(new String[] {"Hello", "World"}),
Observable.from(new String[]{ "I love", "RxJava"})
).subscribe(data -> {
results.add(data);
});
).subscribe(testSubscriber);
//@formatter:on
assertThat(results).isNotEmpty();
assertThat(results.size()).isEqualTo(4);
assertThat(results).contains("Hello", "World", "I love", "RxJava");
}
@Test
public void givenAnObservable_whenStartWith_thenPrependEmittedResults() {
StringBuffer buffer = new StringBuffer();
//@formatter:off
Observable
.from(new String[] { "RxJava", "Observables" })
.startWith("Buzzwords of Reactive Programming")
.subscribe(data -> {
buffer.append(data).append(" ");
});
//@formatter:on
assertThat(buffer.toString().trim()).isEqualTo("Buzzwords of Reactive Programming RxJava Observables");
testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}
@Test
@ -73,11 +35,7 @@ public class ObservableCombineUnitTest {
Observable.zip(
Observable.from(new String[] { "Simple", "Moderate", "Complex" }),
Observable.from(new String[] { "Solutions", "Success", "Heirarchy"}),
(str1, str2) -> {
return str1 + " " + str2;
}).subscribe(zipped -> {
zippedStrings.add(zipped);
});
(str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
//formatter:on
assertThat(zippedStrings).isNotEmpty();
@ -87,35 +45,31 @@ public class ObservableCombineUnitTest {
@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
Future<String> f1 = executor.submit(createCallable("Hello"));
Future<String> f2 = executor.submit(createCallable("World"));
Future<String> f3 = executor.submit(createCallable(null));
Future<String> f4 = executor.submit(createCallable("RxJava"));
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
//@formatter:off
Observable.mergeDelayError(
Observable.from(f1),
Observable.from(f2),
Observable.from(f3),
Observable.from(f4)
Observable.from(new String[] { "hello", "world" }),
Observable.error(new RuntimeException("Some exception")),
Observable.from(new String[] { "rxjava" })
).subscribe(testSubscriber);
//@formatter:on
testSubscriber.assertValues("hello", "world", "rxjava");
testSubscriber.assertError(ExecutionException.class);
testSubscriber.assertError(RuntimeException.class);;
}
private Callable<String> createCallable(final String data) {
return new Callable<String>() {
@Override
public String call() throws Exception {
if (data == null) {
throw new IllegalArgumentException("Data should not be null.");
}
return data.toLowerCase();
}
};
@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);
Observable
.zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
.toBlocking().subscribe(testSubscriber);
testSubscriber.assertCompleted();
testSubscriber.assertValueCount(5);
testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}
}