From c8ee65b1d2a01ba6a45922c42f26b13daa042709 Mon Sep 17 00:00:00 2001 From: TINO Date: Sat, 9 Mar 2019 15:36:20 +0300 Subject: [PATCH 1/3] BAEL - 1060 --- .../baeldung/rxjava/RxJavaHooksUnitTest.java | 329 ++++++++++++++++++ 1 file changed, 329 insertions(+) create mode 100644 rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java new file mode 100644 index 0000000000..e31838448b --- /dev/null +++ b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java @@ -0,0 +1,329 @@ +package com.baeldung.rxjava; + +import org.junit.Test; + +import io.reactivex.Completable; +import io.reactivex.Flowable; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; +import io.reactivex.flowables.ConnectableFlowable; +import io.reactivex.observables.ConnectableObservable; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +public class RxJavaHooksUnitTest { + + @Test + public void givenCompletable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnCompletableAssembly(completable -> { + System.out.println("Assembling Completable"); + return completable; + }); + Completable.fromSingle(Single.just(1)); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenCompletable_whenSubscribed_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> { + System.out.println("Subscribing to Completable"); + return observer; + }); + + Completable.fromSingle(Single.just(1)) + .test(true); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenObservable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnObservableAssembly(observable -> { + System.out.println("Assembling Observable"); + return observable; + }); + + Observable.range(1, 10); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenObservable_whenSubscribed_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> { + System.out.println("Suscribing to Observable"); + return observer; + }); + + Observable.range(1, 10) + .test(true); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> { + System.out.println("Assembling ConnectableObservable"); + return connectableObservable; + }); + + ConnectableObservable.range(1, 10) + .publish() + .connect(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenFlowable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnFlowableAssembly(flowable -> { + System.out.println("Assembling Flowable"); + return flowable; + }); + + Flowable.range(1, 10); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenFlowable_whenSubscribed_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> { + System.out.println("Suscribing to Flowable"); + return observer; + }); + + Flowable.range(1, 10) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> { + System.out.println("Assembling ConnectableFlowable"); + return connectableFlowable; + }); + + ConnectableFlowable.range(1, 10) + .publish() + .connect(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenParallel_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> { + System.out.println("Assembling ParallelFlowable"); + return parallelFlowable; + }); + + Flowable.range(1, 10) + .parallel(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenMaybe_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnMaybeAssembly(maybe -> { + System.out.println("Assembling Maybe"); + return maybe; + }); + + Maybe.just(1); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenMaybe_whenSubscribed_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> { + System.out.println("Suscribing to Maybe"); + return observer; + }); + + Maybe.just(1) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenSingle_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnSingleAssembly(single -> { + System.out.println("Assembling Single"); + return single; + }); + + Single.just(1); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenSingle_whenSubscribed_shouldExecuteTheHook() { + + try { + RxJavaPlugins.setOnSingleSubscribe((single, observer) -> { + System.out.println("Suscribing to Single"); + return observer; + }); + + Single.just(1) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setScheduleHandler((runnable) -> { + System.out.println("Executing Scheduler"); + return runnable; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.single()) + .test(); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.computation()) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() { + try { + RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> { + System.out.println("Initializing Computation Scheduler"); + return scheduler.call(); + }); + RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> { + System.out.println("Executing Computation Scheduler"); + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.computation()) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() { + try { + RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> { + System.out.println("Initializing IO Scheduler"); + return scheduler.call(); + }); + + RxJavaPlugins.setIoSchedulerHandler((scheduler) -> { + System.out.println("Executing IO Scheduler"); + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.io()) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> { + System.out.println("Initializing newThread Scheduler"); + return scheduler.call(); + }); + + RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> { + System.out.println("Executing newThread Scheduler"); + return scheduler; + }); + + Observable.range(1, 15) + .map(v -> v * 2) + .subscribeOn(Schedulers.newThread()) + .test(); + + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() { + try { + RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> { + System.out.println("Initializing Single Scheduler"); + return scheduler.call(); + }); + + RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> { + System.out.println("Executing Single Scheduler"); + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.single()) + .test(); + + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenObservable_whenError_shouldExecuteTheHook() { + RxJavaPlugins.setErrorHandler(throwable -> { + System.out.println("Handling error" + throwable.getCause()); + }); + + Observable.error(new IllegalStateException()) + .subscribe(); + } +} From 9f2f10dfa2a9705ba052b21091e78bc771349252 Mon Sep 17 00:00:00 2001 From: TINO Date: Mon, 11 Mar 2019 22:23:46 +0300 Subject: [PATCH 2/3] BAEL - 1060 Review comments incorporated --- .../baeldung/rxjava/RxJavaHooksUnitTest.java | 410 +++++++++--------- 1 file changed, 193 insertions(+), 217 deletions(-) diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java index e31838448b..79b80f71ab 100644 --- a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java +++ b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java @@ -1,5 +1,8 @@ package com.baeldung.rxjava; +import static org.junit.Assert.assertTrue; + +import org.junit.After; import org.junit.Test; import io.reactivex.Completable; @@ -14,316 +17,289 @@ import io.reactivex.schedulers.Schedulers; public class RxJavaHooksUnitTest { + private boolean initHookCalled = false; + private boolean hookCalled = false; + + @Test + public void givenObservable_whenError_shouldExecuteTheHook() { + RxJavaPlugins.setErrorHandler(throwable -> { + hookCalled = true; + }); + + Observable.error(new IllegalStateException()) + .subscribe(); + assertTrue(hookCalled); + } + @Test public void givenCompletable_whenAssembled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnCompletableAssembly(completable -> { - System.out.println("Assembling Completable"); - return completable; - }); - Completable.fromSingle(Single.just(1)); - } finally { - RxJavaPlugins.reset(); - } + + RxJavaPlugins.setOnCompletableAssembly(completable -> { + hookCalled = true; + return completable; + }); + Completable.fromSingle(Single.just(1)); + assertTrue(hookCalled); } @Test public void givenCompletable_whenSubscribed_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> { - System.out.println("Subscribing to Completable"); - return observer; - }); - Completable.fromSingle(Single.just(1)) - .test(true); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> { + hookCalled = true; + return observer; + }); + + Completable.fromSingle(Single.just(1)) + .test(); + assertTrue(hookCalled); } @Test public void givenObservable_whenAssembled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnObservableAssembly(observable -> { - System.out.println("Assembling Observable"); - return observable; - }); - Observable.range(1, 10); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnObservableAssembly(observable -> { + hookCalled = true; + return observable; + }); + + Observable.range(1, 10); + assertTrue(hookCalled); } @Test public void givenObservable_whenSubscribed_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> { - System.out.println("Suscribing to Observable"); - return observer; - }); - Observable.range(1, 10) - .test(true); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> { + hookCalled = true; + return observer; + }); + + Observable.range(1, 10) + .test(); + assertTrue(hookCalled); } @Test public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> { - System.out.println("Assembling ConnectableObservable"); - return connectableObservable; - }); - ConnectableObservable.range(1, 10) - .publish() - .connect(); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> { + hookCalled = true; + return connectableObservable; + }); + + ConnectableObservable.range(1, 10) + .publish() + .connect(); + assertTrue(hookCalled); } @Test public void givenFlowable_whenAssembled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnFlowableAssembly(flowable -> { - System.out.println("Assembling Flowable"); - return flowable; - }); - Flowable.range(1, 10); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnFlowableAssembly(flowable -> { + hookCalled = true; + return flowable; + }); + + Flowable.range(1, 10); + assertTrue(hookCalled); } @Test public void givenFlowable_whenSubscribed_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> { - System.out.println("Suscribing to Flowable"); - return observer; - }); - Flowable.range(1, 10) - .test(); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> { + hookCalled = true; + return observer; + }); + + Flowable.range(1, 10) + .test(); + assertTrue(hookCalled); } @Test public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> { - System.out.println("Assembling ConnectableFlowable"); - return connectableFlowable; - }); - ConnectableFlowable.range(1, 10) - .publish() - .connect(); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> { + hookCalled = true; + return connectableFlowable; + }); + + ConnectableFlowable.range(1, 10) + .publish() + .connect(); + assertTrue(hookCalled); } @Test public void givenParallel_whenAssembled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> { - System.out.println("Assembling ParallelFlowable"); - return parallelFlowable; - }); - Flowable.range(1, 10) - .parallel(); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> { + hookCalled = true; + return parallelFlowable; + }); + + Flowable.range(1, 10) + .parallel(); + assertTrue(hookCalled); } @Test public void givenMaybe_whenAssembled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnMaybeAssembly(maybe -> { - System.out.println("Assembling Maybe"); - return maybe; - }); - Maybe.just(1); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnMaybeAssembly(maybe -> { + hookCalled = true; + return maybe; + }); + + Maybe.just(1); + assertTrue(hookCalled); } @Test public void givenMaybe_whenSubscribed_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> { - System.out.println("Suscribing to Maybe"); - return observer; - }); - Maybe.just(1) - .test(); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> { + hookCalled = true; + return observer; + }); + + Maybe.just(1) + .test(); + assertTrue(hookCalled); } @Test public void givenSingle_whenAssembled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnSingleAssembly(single -> { - System.out.println("Assembling Single"); - return single; - }); - Single.just(1); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setOnSingleAssembly(single -> { + hookCalled = true; + return single; + }); + + Single.just(1); + assertTrue(hookCalled); } @Test public void givenSingle_whenSubscribed_shouldExecuteTheHook() { - try { - RxJavaPlugins.setOnSingleSubscribe((single, observer) -> { - System.out.println("Suscribing to Single"); - return observer; - }); + RxJavaPlugins.setOnSingleSubscribe((single, observer) -> { + hookCalled = true; + return observer; + }); - Single.just(1) - .test(); - } finally { - RxJavaPlugins.reset(); - } + Single.just(1) + .test(); + assertTrue(hookCalled); } @Test public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setScheduleHandler((runnable) -> { - System.out.println("Executing Scheduler"); - return runnable; - }); - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.single()) - .test(); + RxJavaPlugins.setScheduleHandler((runnable) -> { + hookCalled = true; + return runnable; + }); - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.computation()) - .test(); - } finally { - RxJavaPlugins.reset(); - } + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.single()) + .test(); + hookCalled = false; + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.computation()) + .test(); + assertTrue(hookCalled); } @Test public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() { - try { - RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> { - System.out.println("Initializing Computation Scheduler"); - return scheduler.call(); - }); - RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> { - System.out.println("Executing Computation Scheduler"); - return scheduler; - }); - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.computation()) - .test(); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); + RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.computation()) + .test(); + assertTrue(hookCalled && initHookCalled); } @Test public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() { - try { - RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> { - System.out.println("Initializing IO Scheduler"); - return scheduler.call(); - }); - RxJavaPlugins.setIoSchedulerHandler((scheduler) -> { - System.out.println("Executing IO Scheduler"); - return scheduler; - }); + RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.io()) - .test(); - } finally { - RxJavaPlugins.reset(); - } + RxJavaPlugins.setIoSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.io()) + .test(); + assertTrue(hookCalled && initHookCalled); } @Test public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() { - try { - RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> { - System.out.println("Initializing newThread Scheduler"); - return scheduler.call(); - }); - RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> { - System.out.println("Executing newThread Scheduler"); - return scheduler; - }); + RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); - Observable.range(1, 15) - .map(v -> v * 2) - .subscribeOn(Schedulers.newThread()) - .test(); + RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); - } finally { - RxJavaPlugins.reset(); - } + Observable.range(1, 15) + .map(v -> v * 2) + .subscribeOn(Schedulers.newThread()) + .test(); + assertTrue(hookCalled && initHookCalled); } @Test public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() { - try { - RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> { - System.out.println("Initializing Single Scheduler"); - return scheduler.call(); - }); - RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> { - System.out.println("Executing Single Scheduler"); - return scheduler; - }); - - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.single()) - .test(); - - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void givenObservable_whenError_shouldExecuteTheHook() { - RxJavaPlugins.setErrorHandler(throwable -> { - System.out.println("Handling error" + throwable.getCause()); + RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); }); - Observable.error(new IllegalStateException()) - .subscribe(); + RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.single()) + .test(); + assertTrue(hookCalled && initHookCalled); + + } + + @After + public void reset() { + initHookCalled = false; + hookCalled = false; + RxJavaPlugins.reset(); } } From 22dea679618d7e7ba460f14290643ae7015c0985 Mon Sep 17 00:00:00 2001 From: TINO Date: Wed, 13 Mar 2019 20:07:48 +0300 Subject: [PATCH 3/3] BAEL - 1060 Review comments incorporated --- .../rxjava/RxJavaHooksManualTest.java | 84 +++++++++++++++++++ .../baeldung/rxjava/RxJavaHooksUnitTest.java | 61 -------------- 2 files changed, 84 insertions(+), 61 deletions(-) create mode 100644 rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java new file mode 100644 index 0000000000..b79fa9af22 --- /dev/null +++ b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java @@ -0,0 +1,84 @@ +package com.baeldung.rxjava; + +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Test; + +import io.reactivex.Observable; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +public class RxJavaHooksManualTest { + + private boolean initHookCalled = false; + private boolean hookCalled = false; + + @Test + public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() { + + RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); + + RxJavaPlugins.setIoSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.io()) + .test(); + assertTrue(hookCalled && initHookCalled); + } + + @Test + public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() { + + RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); + + RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 15) + .map(v -> v * 2) + .subscribeOn(Schedulers.newThread()) + .test(); + assertTrue(hookCalled && initHookCalled); + } + + @Test + public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() { + + RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); + + RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.single()) + .test(); + assertTrue(hookCalled && initHookCalled); + + } + + @After + public void reset() { + hookCalled = false; + initHookCalled = false; + RxJavaPlugins.reset(); + } +} diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java index 79b80f71ab..dd4287a4a9 100644 --- a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java +++ b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java @@ -235,67 +235,6 @@ public class RxJavaHooksUnitTest { assertTrue(hookCalled && initHookCalled); } - @Test - public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() { - - RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> { - initHookCalled = true; - return scheduler.call(); - }); - - RxJavaPlugins.setIoSchedulerHandler((scheduler) -> { - hookCalled = true; - return scheduler; - }); - - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.io()) - .test(); - assertTrue(hookCalled && initHookCalled); - } - - @Test - public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() { - - RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> { - initHookCalled = true; - return scheduler.call(); - }); - - RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> { - hookCalled = true; - return scheduler; - }); - - Observable.range(1, 15) - .map(v -> v * 2) - .subscribeOn(Schedulers.newThread()) - .test(); - assertTrue(hookCalled && initHookCalled); - } - - @Test - public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() { - - RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> { - initHookCalled = true; - return scheduler.call(); - }); - - RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> { - hookCalled = true; - return scheduler; - }); - - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.single()) - .test(); - assertTrue(hookCalled && initHookCalled); - - } - @After public void reset() { initHookCalled = false;