rxjava schedulers (#2668)
* rxjava utility operators test * rxjava schedulers
This commit is contained in:
parent
8db94607f9
commit
c9072e54d1
@ -47,6 +47,12 @@
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>22.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
|
238
rxjava/src/test/java/com/baeldung/rxjava/SchedulersTest.java
Normal file
238
rxjava/src/test/java/com/baeldung/rxjava/SchedulersTest.java
Normal file
@ -0,0 +1,238 @@
|
||||
package com.baeldung.rxjava;
|
||||
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.codehaus.mojo.animal_sniffer.IgnoreJRERequirement;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import rx.Observable;
|
||||
import rx.Scheduler;
|
||||
import rx.observers.TestSubscriber;
|
||||
import rx.schedulers.Schedulers;
|
||||
import rx.schedulers.TestScheduler;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.util.concurrent.Executors.newFixedThreadPool;
|
||||
import static org.hamcrest.Matchers.hasItems;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class SchedulersTest {
|
||||
String result = "";
|
||||
String result1 = "";
|
||||
String result2 = "";
|
||||
|
||||
@Test
|
||||
public void givenScheduledWorker_whenScheduleAnAction_thenResultAction() throws InterruptedException {
|
||||
System.out.println("scheduling");
|
||||
Scheduler scheduler = Schedulers.immediate();
|
||||
Scheduler.Worker worker = scheduler.createWorker();
|
||||
worker.schedule(() -> result += "action");
|
||||
Assert.assertTrue(result.equals("action"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenScheduledWorker_whenUnsubscribeOnWorker_thenResultFirstAction() throws InterruptedException {
|
||||
System.out.println("canceling");
|
||||
Scheduler scheduler = Schedulers.newThread();
|
||||
Scheduler.Worker worker = scheduler.createWorker();
|
||||
worker.schedule(() -> {
|
||||
result += "First_Action";
|
||||
worker.unsubscribe();
|
||||
});
|
||||
worker.schedule(() -> result += "Second_Action");
|
||||
Thread.sleep(500);
|
||||
Assert.assertTrue(result.equals("First_Action"));
|
||||
}
|
||||
|
||||
@Ignore //it's not safe, not every time is running correctly
|
||||
@Test
|
||||
public void givenWorker_whenScheduledOnNewThread_thenResultIsBoundToNewThread() throws InterruptedException {
|
||||
System.out.println("newThread_1");
|
||||
Scheduler scheduler = Schedulers.newThread();
|
||||
Scheduler.Worker worker = scheduler.createWorker();
|
||||
worker.schedule(() -> {
|
||||
result += Thread.currentThread().getName() + "_Start";
|
||||
worker.schedule(() -> result += "_worker_");
|
||||
result += "_End";
|
||||
});
|
||||
Thread.sleep(2000);
|
||||
Assert.assertTrue(result.equals("RxNewThreadScheduler-1_Start_End_worker_"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenObserveOnNewThread_thenRunOnDifferentThreadEachTime() throws InterruptedException {
|
||||
System.out.println("newThread_2");
|
||||
Observable.just("Hello")
|
||||
.observeOn(Schedulers.newThread())
|
||||
.doOnNext(s ->
|
||||
result2 += Thread.currentThread().getName()
|
||||
)
|
||||
.observeOn(Schedulers.newThread())
|
||||
.subscribe(s ->
|
||||
result1 += Thread.currentThread().getName()
|
||||
);
|
||||
Thread.sleep(500);
|
||||
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
|
||||
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenWorker_whenScheduledOnImmediate_thenResultIsBoundToThread() throws InterruptedException {
|
||||
System.out.println("immediate_1");
|
||||
Scheduler scheduler = Schedulers.immediate();
|
||||
Scheduler.Worker worker = scheduler.createWorker();
|
||||
worker.schedule(() -> {
|
||||
result += Thread.currentThread().getName() + "_Start";
|
||||
worker.schedule(() -> result += "_worker_");
|
||||
result += "_End";
|
||||
});
|
||||
Thread.sleep(500);
|
||||
Assert.assertTrue(result.equals("main_Start_worker__End"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenImmediateScheduled_thenExecuteOnMainThread() throws InterruptedException {
|
||||
System.out.println("immediate_2");
|
||||
Observable.just("Hello")
|
||||
.subscribeOn(Schedulers.immediate())
|
||||
.subscribe(s ->
|
||||
result += Thread.currentThread().getName()
|
||||
);
|
||||
Thread.sleep(500);
|
||||
Assert.assertTrue(result.equals("main"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenTrampolineScheduled_thenExecuteOnMainThread() throws InterruptedException {
|
||||
System.out.println("trampoline_1");
|
||||
Observable.just(2, 4, 6, 8)
|
||||
.subscribeOn(Schedulers.trampoline())
|
||||
.subscribe(i -> result += "" + i);
|
||||
Observable.just(1, 3, 5, 7, 9)
|
||||
.subscribeOn(Schedulers.trampoline())
|
||||
.subscribe(i -> result += "" + i);
|
||||
Thread.sleep(500);
|
||||
Assert.assertTrue(result.equals("246813579"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenWorker_whenScheduledOnTrampoline_thenComposeResultAsBlocking() throws InterruptedException {
|
||||
System.out.println("trampoline_2");
|
||||
Scheduler scheduler = Schedulers.trampoline();
|
||||
Scheduler.Worker worker = scheduler.createWorker();
|
||||
worker.schedule(() -> {
|
||||
result += Thread.currentThread().getName() + "Start";
|
||||
worker.schedule(() -> {
|
||||
result += "_middleStart";
|
||||
worker.schedule(() ->
|
||||
result += "_worker_"
|
||||
);
|
||||
result += "_middleEnd";
|
||||
});
|
||||
result += "_mainEnd";
|
||||
});
|
||||
Thread.sleep(500);
|
||||
Assert.assertTrue(result
|
||||
.equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));
|
||||
}
|
||||
|
||||
private ThreadFactory threadFactory(String pattern) {
|
||||
return new ThreadFactoryBuilder()
|
||||
.setNameFormat(pattern)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenExecutors_whenSchedulerFromCreatedExecutors_thenReturnElementsOnEacheThread() throws InterruptedException {
|
||||
System.out.println("from");
|
||||
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
|
||||
Scheduler schedulerA = Schedulers.from(poolA);
|
||||
ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
|
||||
Scheduler schedulerB = Schedulers.from(poolB);
|
||||
|
||||
Observable<String> observable = Observable.create(subscriber -> {
|
||||
subscriber.onNext("Alfa");
|
||||
subscriber.onNext("Beta");
|
||||
subscriber.onCompleted();
|
||||
});;
|
||||
|
||||
observable
|
||||
.subscribeOn(schedulerA)
|
||||
.subscribeOn(schedulerB)
|
||||
.subscribe(
|
||||
x -> result += Thread.currentThread().getName() + x + "_",
|
||||
Throwable::printStackTrace,
|
||||
() -> result += "_Completed"
|
||||
);
|
||||
Thread.sleep(2000);
|
||||
Assert.assertTrue(result.equals("Sched-A-0Alfa_Sched-A-0Beta__Completed"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenIoScheduling_thenReturnThreadName() throws InterruptedException {
|
||||
System.out.println("io");
|
||||
Observable.just("io")
|
||||
.subscribeOn(Schedulers.io())
|
||||
.subscribe(i -> result += Thread.currentThread().getName());
|
||||
Thread.sleep(500);
|
||||
Assert.assertTrue(result.equals("RxIoScheduler-2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenComputationScheduling_thenReturnThreadName() throws InterruptedException {
|
||||
System.out.println("computation");
|
||||
Observable.just("computation")
|
||||
.subscribeOn(Schedulers.computation())
|
||||
.subscribe(i -> result += Thread.currentThread().getName());
|
||||
Thread.sleep(500);
|
||||
Assert.assertTrue(result.equals("RxComputationScheduler-1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenLetters_whenTestScheduling_thenReturnValuesControllingAdvanceTime() throws InterruptedException {
|
||||
List<String> letters = Arrays.asList("A", "B", "C");
|
||||
TestScheduler scheduler = Schedulers.test();
|
||||
TestSubscriber<String> subscriber = new TestSubscriber<>();
|
||||
|
||||
Observable<Long> tick = Observable.interval(1, TimeUnit.SECONDS, scheduler);
|
||||
|
||||
Observable.from(letters)
|
||||
.zipWith(tick, (string, index) -> index + "-" + string)
|
||||
.subscribeOn(scheduler)
|
||||
.subscribe(subscriber);
|
||||
|
||||
subscriber.assertNoValues();
|
||||
subscriber.assertNotCompleted();
|
||||
|
||||
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValues("0-A");
|
||||
|
||||
scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(3);
|
||||
assertThat(subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenLetters_whenDelay_thenReturne() throws InterruptedException {
|
||||
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched1-"));
|
||||
Scheduler schedulerA = Schedulers.from(poolA);
|
||||
Observable.just('A', 'B')
|
||||
.delay(1, TimeUnit.SECONDS, schedulerA)
|
||||
.subscribe(i -> result+= Thread.currentThread().getName() + i + " ");
|
||||
|
||||
Thread.sleep(2000);
|
||||
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user