diff --git a/pom.xml b/pom.xml index f5776731c9..1cf3eac3ba 100644 --- a/pom.xml +++ b/pom.xml @@ -603,8 +603,10 @@ rule-engines rsocket - rxjava - rxjava-2 + rxjava-core + rxjava-observables + rxjava-operators + rxjava-libraries software-security/sql-injection-samples tensorflow-java @@ -1369,8 +1371,10 @@ rule-engines rsocket - rxjava - rxjava-2 + rxjava-core + rxjava-observables + rxjava-operators + rxjava-libraries oauth2-framework-impl spf4j spring-boot-performance diff --git a/rxjava-2/README.md b/rxjava-2/README.md deleted file mode 100644 index 87f606a323..0000000000 --- a/rxjava-2/README.md +++ /dev/null @@ -1,14 +0,0 @@ -## RxJava - -This module contains articles about RxJava. - -### Relevant articles: - -- [RxJava and Error Handling](https://www.baeldung.com/rxjava-error-handling) -- [RxJava 2 – Flowable](https://www.baeldung.com/rxjava-2-flowable) -- [RxJava Maybe](https://www.baeldung.com/rxjava-maybe) -- [Introduction to RxRelay for RxJava](https://www.baeldung.com/rx-relay) -- [Combining RxJava Completables](https://www.baeldung.com/rxjava-completable) -- [Converting Synchronous and Asynchronous APIs to Observables using RxJava2](https://www.baeldung.com/rxjava-apis-to-observables) -- [RxJava Hooks](https://www.baeldung.com/rxjava-hooks) -- More articles: [[<-- prev]](/rxjava) diff --git a/rxjava-2/pom.xml b/rxjava-2/pom.xml deleted file mode 100644 index 47d16ec8dd..0000000000 --- a/rxjava-2/pom.xml +++ /dev/null @@ -1,53 +0,0 @@ - - - 4.0.0 - rxjava-2 - rxjava-2 - 1.0-SNAPSHOT - - - com.baeldung - parent-java - 0.0.1-SNAPSHOT - ../parent-java - - - - - io.reactivex.rxjava2 - rxjava - ${rx.java2.version} - - - com.jayway.awaitility - awaitility - ${awaitility.version} - - - org.assertj - assertj-core - ${assertj.version} - - - com.jakewharton.rxrelay2 - rxrelay - ${rxrelay.version} - - - - com.github.akarnokd - rxjava2-extensions - ${rxjava2.ext.version} - - - - - 3.8.0 - 2.2.2 - 1.7.0 - 2.0.0 - 0.20.4 - - \ No newline at end of file diff --git a/rxjava-core/README.md b/rxjava-core/README.md new file mode 100644 index 0000000000..2773bd9423 --- /dev/null +++ b/rxjava-core/README.md @@ -0,0 +1,16 @@ +## RxJava + +This module contains articles about RxJava. + +### Relevant articles: + +- [Dealing with Backpressure with RxJava](https://www.baeldung.com/rxjava-backpressure) +- [How to Test RxJava?](https://www.baeldung.com/rxjava-testing) +- [Introduction to RxJava](https://www.baeldung.com/rx-java) +- [Schedulers in RxJava](https://www.baeldung.com/rxjava-schedulers) +- [Difference Between Flatmap and Switchmap in RxJava](https://www.baeldung.com/rxjava-flatmap-switchmap) +- [RxJava and Error Handling](https://www.baeldung.com/rxjava-error-handling) +- [RxJava Maybe](https://www.baeldung.com/rxjava-maybe) +- [Combining RxJava Completables](https://www.baeldung.com/rxjava-completable) +- [RxJava Hooks](https://www.baeldung.com/rxjava-hooks) +- More articles: [[next -->]](/rxjava-2) diff --git a/rxjava-core/pom.xml b/rxjava-core/pom.xml new file mode 100644 index 0000000000..401ad83808 --- /dev/null +++ b/rxjava-core/pom.xml @@ -0,0 +1,46 @@ + + + 4.0.0 + rxjava-core + 1.0-SNAPSHOT + rxjava-core + + + com.baeldung + parent-java + 0.0.1-SNAPSHOT + ../parent-java + + + + + io.reactivex + rxjava + ${rx.java.version} + + + io.reactivex.rxjava2 + rxjava + ${rx.java2.version} + + + com.jayway.awaitility + awaitility + ${awaitility.version} + + + org.assertj + assertj-core + ${assertj.version} + + + + + 3.8.0 + 1.2.5 + 1.7.0 + 2.2.2 + + + \ No newline at end of file diff --git a/rxjava/src/main/java/com/baeldung/rxjava/ComputeFunction.java b/rxjava-core/src/main/java/com/baeldung/rxjava/ComputeFunction.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/ComputeFunction.java rename to rxjava-core/src/main/java/com/baeldung/rxjava/ComputeFunction.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/ConnectableObservableImpl.java b/rxjava-core/src/main/java/com/baeldung/rxjava/ConnectableObservableImpl.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/ConnectableObservableImpl.java rename to rxjava-core/src/main/java/com/baeldung/rxjava/ConnectableObservableImpl.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/ObservableImpl.java b/rxjava-core/src/main/java/com/baeldung/rxjava/ObservableImpl.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/ObservableImpl.java rename to rxjava-core/src/main/java/com/baeldung/rxjava/ObservableImpl.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/ResourceManagement.java b/rxjava-core/src/main/java/com/baeldung/rxjava/ResourceManagement.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/ResourceManagement.java rename to rxjava-core/src/main/java/com/baeldung/rxjava/ResourceManagement.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/SingleImpl.java b/rxjava-core/src/main/java/com/baeldung/rxjava/SingleImpl.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/SingleImpl.java rename to rxjava-core/src/main/java/com/baeldung/rxjava/SingleImpl.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/SubjectImpl.java b/rxjava-core/src/main/java/com/baeldung/rxjava/SubjectImpl.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/SubjectImpl.java rename to rxjava-core/src/main/java/com/baeldung/rxjava/SubjectImpl.java diff --git a/rxjava-2/src/main/resources/logback.xml b/rxjava-core/src/main/resources/logback.xml similarity index 100% rename from rxjava-2/src/main/resources/logback.xml rename to rxjava-core/src/main/resources/logback.xml diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/CompletableUnitTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/CompletableUnitTest.java similarity index 100% rename from rxjava-2/src/test/java/com/baeldung/rxjava/CompletableUnitTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/CompletableUnitTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/ConnectableObservableIntegrationTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/ConnectableObservableIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/ConnectableObservableIntegrationTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/ConnectableObservableIntegrationTest.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/MaybeUnitTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/MaybeUnitTest.java similarity index 100% rename from rxjava-2/src/test/java/com/baeldung/rxjava/MaybeUnitTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/MaybeUnitTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/ObservableUnitTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/ObservableUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/ObservableUnitTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/ObservableUnitTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/ResourceManagementUnitTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/ResourceManagementUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/ResourceManagementUnitTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/ResourceManagementUnitTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureLongRunningUnitTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaBackpressureLongRunningUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureLongRunningUnitTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaBackpressureLongRunningUnitTest.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java similarity index 96% rename from rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java index b79fa9af22..f71d9e8839 100644 --- a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java +++ b/rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaHooksManualTest.java @@ -1,84 +1,84 @@ -package com.baeldung.rxjava; - -import static org.junit.Assert.assertTrue; - -import org.junit.After; -import org.junit.Test; - -import io.reactivex.Observable; -import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.Schedulers; - -public class RxJavaHooksManualTest { - - private boolean initHookCalled = false; - private boolean hookCalled = false; - - @Test - public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() { - - RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> { - initHookCalled = true; - return scheduler.call(); - }); - - RxJavaPlugins.setIoSchedulerHandler((scheduler) -> { - hookCalled = true; - return scheduler; - }); - - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.io()) - .test(); - assertTrue(hookCalled && initHookCalled); - } - - @Test - public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() { - - RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> { - initHookCalled = true; - return scheduler.call(); - }); - - RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> { - hookCalled = true; - return scheduler; - }); - - Observable.range(1, 15) - .map(v -> v * 2) - .subscribeOn(Schedulers.newThread()) - .test(); - assertTrue(hookCalled && initHookCalled); - } - - @Test - public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() { - - RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> { - initHookCalled = true; - return scheduler.call(); - }); - - RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> { - hookCalled = true; - return scheduler; - }); - - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.single()) - .test(); - assertTrue(hookCalled && initHookCalled); - - } - - @After - public void reset() { - hookCalled = false; - initHookCalled = false; - RxJavaPlugins.reset(); - } -} +package com.baeldung.rxjava; + +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Test; + +import io.reactivex.Observable; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +public class RxJavaHooksManualTest { + + private boolean initHookCalled = false; + private boolean hookCalled = false; + + @Test + public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() { + + RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); + + RxJavaPlugins.setIoSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.io()) + .test(); + assertTrue(hookCalled && initHookCalled); + } + + @Test + public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() { + + RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); + + RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 15) + .map(v -> v * 2) + .subscribeOn(Schedulers.newThread()) + .test(); + assertTrue(hookCalled && initHookCalled); + } + + @Test + public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() { + + RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); + + RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.single()) + .test(); + assertTrue(hookCalled && initHookCalled); + + } + + @After + public void reset() { + hookCalled = false; + initHookCalled = false; + RxJavaPlugins.reset(); + } +} diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java similarity index 96% rename from rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java index dd4287a4a9..cea6cc70f6 100644 --- a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java +++ b/rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java @@ -1,244 +1,244 @@ -package com.baeldung.rxjava; - -import static org.junit.Assert.assertTrue; - -import org.junit.After; -import org.junit.Test; - -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; -import io.reactivex.flowables.ConnectableFlowable; -import io.reactivex.observables.ConnectableObservable; -import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.Schedulers; - -public class RxJavaHooksUnitTest { - - private boolean initHookCalled = false; - private boolean hookCalled = false; - - @Test - public void givenObservable_whenError_shouldExecuteTheHook() { - RxJavaPlugins.setErrorHandler(throwable -> { - hookCalled = true; - }); - - Observable.error(new IllegalStateException()) - .subscribe(); - assertTrue(hookCalled); - } - - @Test - public void givenCompletable_whenAssembled_shouldExecuteTheHook() { - - RxJavaPlugins.setOnCompletableAssembly(completable -> { - hookCalled = true; - return completable; - }); - Completable.fromSingle(Single.just(1)); - assertTrue(hookCalled); - } - - @Test - public void givenCompletable_whenSubscribed_shouldExecuteTheHook() { - - RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> { - hookCalled = true; - return observer; - }); - - Completable.fromSingle(Single.just(1)) - .test(); - assertTrue(hookCalled); - } - - @Test - public void givenObservable_whenAssembled_shouldExecuteTheHook() { - - RxJavaPlugins.setOnObservableAssembly(observable -> { - hookCalled = true; - return observable; - }); - - Observable.range(1, 10); - assertTrue(hookCalled); - } - - @Test - public void givenObservable_whenSubscribed_shouldExecuteTheHook() { - - RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> { - hookCalled = true; - return observer; - }); - - Observable.range(1, 10) - .test(); - assertTrue(hookCalled); - } - - @Test - public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() { - - RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> { - hookCalled = true; - return connectableObservable; - }); - - ConnectableObservable.range(1, 10) - .publish() - .connect(); - assertTrue(hookCalled); - } - - @Test - public void givenFlowable_whenAssembled_shouldExecuteTheHook() { - - RxJavaPlugins.setOnFlowableAssembly(flowable -> { - hookCalled = true; - return flowable; - }); - - Flowable.range(1, 10); - assertTrue(hookCalled); - } - - @Test - public void givenFlowable_whenSubscribed_shouldExecuteTheHook() { - - RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> { - hookCalled = true; - return observer; - }); - - Flowable.range(1, 10) - .test(); - assertTrue(hookCalled); - } - - @Test - public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() { - - RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> { - hookCalled = true; - return connectableFlowable; - }); - - ConnectableFlowable.range(1, 10) - .publish() - .connect(); - assertTrue(hookCalled); - } - - @Test - public void givenParallel_whenAssembled_shouldExecuteTheHook() { - - RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> { - hookCalled = true; - return parallelFlowable; - }); - - Flowable.range(1, 10) - .parallel(); - assertTrue(hookCalled); - } - - @Test - public void givenMaybe_whenAssembled_shouldExecuteTheHook() { - - RxJavaPlugins.setOnMaybeAssembly(maybe -> { - hookCalled = true; - return maybe; - }); - - Maybe.just(1); - assertTrue(hookCalled); - } - - @Test - public void givenMaybe_whenSubscribed_shouldExecuteTheHook() { - - RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> { - hookCalled = true; - return observer; - }); - - Maybe.just(1) - .test(); - assertTrue(hookCalled); - } - - @Test - public void givenSingle_whenAssembled_shouldExecuteTheHook() { - - RxJavaPlugins.setOnSingleAssembly(single -> { - hookCalled = true; - return single; - }); - - Single.just(1); - assertTrue(hookCalled); - } - - @Test - public void givenSingle_whenSubscribed_shouldExecuteTheHook() { - - RxJavaPlugins.setOnSingleSubscribe((single, observer) -> { - hookCalled = true; - return observer; - }); - - Single.just(1) - .test(); - assertTrue(hookCalled); - } - - @Test - public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() { - - RxJavaPlugins.setScheduleHandler((runnable) -> { - hookCalled = true; - return runnable; - }); - - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.single()) - .test(); - hookCalled = false; - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.computation()) - .test(); - assertTrue(hookCalled); - } - - @Test - public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() { - - RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> { - initHookCalled = true; - return scheduler.call(); - }); - RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> { - hookCalled = true; - return scheduler; - }); - - Observable.range(1, 10) - .map(v -> v * 2) - .subscribeOn(Schedulers.computation()) - .test(); - assertTrue(hookCalled && initHookCalled); - } - - @After - public void reset() { - initHookCalled = false; - hookCalled = false; - RxJavaPlugins.reset(); - } -} +package com.baeldung.rxjava; + +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Test; + +import io.reactivex.Completable; +import io.reactivex.Flowable; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; +import io.reactivex.flowables.ConnectableFlowable; +import io.reactivex.observables.ConnectableObservable; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +public class RxJavaHooksUnitTest { + + private boolean initHookCalled = false; + private boolean hookCalled = false; + + @Test + public void givenObservable_whenError_shouldExecuteTheHook() { + RxJavaPlugins.setErrorHandler(throwable -> { + hookCalled = true; + }); + + Observable.error(new IllegalStateException()) + .subscribe(); + assertTrue(hookCalled); + } + + @Test + public void givenCompletable_whenAssembled_shouldExecuteTheHook() { + + RxJavaPlugins.setOnCompletableAssembly(completable -> { + hookCalled = true; + return completable; + }); + Completable.fromSingle(Single.just(1)); + assertTrue(hookCalled); + } + + @Test + public void givenCompletable_whenSubscribed_shouldExecuteTheHook() { + + RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> { + hookCalled = true; + return observer; + }); + + Completable.fromSingle(Single.just(1)) + .test(); + assertTrue(hookCalled); + } + + @Test + public void givenObservable_whenAssembled_shouldExecuteTheHook() { + + RxJavaPlugins.setOnObservableAssembly(observable -> { + hookCalled = true; + return observable; + }); + + Observable.range(1, 10); + assertTrue(hookCalled); + } + + @Test + public void givenObservable_whenSubscribed_shouldExecuteTheHook() { + + RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> { + hookCalled = true; + return observer; + }); + + Observable.range(1, 10) + .test(); + assertTrue(hookCalled); + } + + @Test + public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() { + + RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> { + hookCalled = true; + return connectableObservable; + }); + + ConnectableObservable.range(1, 10) + .publish() + .connect(); + assertTrue(hookCalled); + } + + @Test + public void givenFlowable_whenAssembled_shouldExecuteTheHook() { + + RxJavaPlugins.setOnFlowableAssembly(flowable -> { + hookCalled = true; + return flowable; + }); + + Flowable.range(1, 10); + assertTrue(hookCalled); + } + + @Test + public void givenFlowable_whenSubscribed_shouldExecuteTheHook() { + + RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> { + hookCalled = true; + return observer; + }); + + Flowable.range(1, 10) + .test(); + assertTrue(hookCalled); + } + + @Test + public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() { + + RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> { + hookCalled = true; + return connectableFlowable; + }); + + ConnectableFlowable.range(1, 10) + .publish() + .connect(); + assertTrue(hookCalled); + } + + @Test + public void givenParallel_whenAssembled_shouldExecuteTheHook() { + + RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> { + hookCalled = true; + return parallelFlowable; + }); + + Flowable.range(1, 10) + .parallel(); + assertTrue(hookCalled); + } + + @Test + public void givenMaybe_whenAssembled_shouldExecuteTheHook() { + + RxJavaPlugins.setOnMaybeAssembly(maybe -> { + hookCalled = true; + return maybe; + }); + + Maybe.just(1); + assertTrue(hookCalled); + } + + @Test + public void givenMaybe_whenSubscribed_shouldExecuteTheHook() { + + RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> { + hookCalled = true; + return observer; + }); + + Maybe.just(1) + .test(); + assertTrue(hookCalled); + } + + @Test + public void givenSingle_whenAssembled_shouldExecuteTheHook() { + + RxJavaPlugins.setOnSingleAssembly(single -> { + hookCalled = true; + return single; + }); + + Single.just(1); + assertTrue(hookCalled); + } + + @Test + public void givenSingle_whenSubscribed_shouldExecuteTheHook() { + + RxJavaPlugins.setOnSingleSubscribe((single, observer) -> { + hookCalled = true; + return observer; + }); + + Single.just(1) + .test(); + assertTrue(hookCalled); + } + + @Test + public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() { + + RxJavaPlugins.setScheduleHandler((runnable) -> { + hookCalled = true; + return runnable; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.single()) + .test(); + hookCalled = false; + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.computation()) + .test(); + assertTrue(hookCalled); + } + + @Test + public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() { + + RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> { + initHookCalled = true; + return scheduler.call(); + }); + RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> { + hookCalled = true; + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.computation()) + .test(); + assertTrue(hookCalled && initHookCalled); + } + + @After + public void reset() { + initHookCalled = false; + hookCalled = false; + RxJavaPlugins.reset(); + } +} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/RxJavaUnitTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/RxJavaUnitTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/RxJavaUnitTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/SchedulersLiveTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/SchedulersLiveTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/SchedulersLiveTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/SchedulersLiveTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/SingleUnitTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/SingleUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/SingleUnitTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/SingleUnitTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/SubjectUnitTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/SubjectUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/SubjectUnitTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/SubjectUnitTest.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/onerror/ExceptionHandlingIntegrationTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/onerror/ExceptionHandlingIntegrationTest.java similarity index 100% rename from rxjava-2/src/test/java/com/baeldung/rxjava/onerror/ExceptionHandlingIntegrationTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/onerror/ExceptionHandlingIntegrationTest.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/onerror/OnErrorRetryIntegrationTest.java b/rxjava-core/src/test/java/com/baeldung/rxjava/onerror/OnErrorRetryIntegrationTest.java similarity index 100% rename from rxjava-2/src/test/java/com/baeldung/rxjava/onerror/OnErrorRetryIntegrationTest.java rename to rxjava-core/src/test/java/com/baeldung/rxjava/onerror/OnErrorRetryIntegrationTest.java diff --git a/rxjava-libraries/README.md b/rxjava-libraries/README.md new file mode 100644 index 0000000000..ac8aac6908 --- /dev/null +++ b/rxjava-libraries/README.md @@ -0,0 +1,10 @@ +## RxJava Libraries + + This module contains articles about RxJava libraries + +### Related Articles: + +- [RxJava 2 – Flowable](https://www.baeldung.com/rxjava-2-flowable) +- [Introduction to RxRelay for RxJava](https://www.baeldung.com/rx-relay) +- [Introduction to rxjava-jdbc](https://www.baeldung.com/rxjava-jdbc) + diff --git a/rxjava-libraries/pom.xml b/rxjava-libraries/pom.xml new file mode 100644 index 0000000000..541d9116c8 --- /dev/null +++ b/rxjava-libraries/pom.xml @@ -0,0 +1,59 @@ + + + 4.0.0 + rxjava-libraries + 1.0-SNAPSHOT + rxjava-libraries + + + com.baeldung + parent-java + 0.0.1-SNAPSHOT + ../parent-java + + + + + io.reactivex + rxjava + ${rx.java.version} + + + io.reactivex.rxjava2 + rxjava + ${rx.java2.version} + + + com.jakewharton.rxrelay2 + rxrelay + ${rxrelay.version} + + + com.github.davidmoten + rxjava-jdbc + ${rx.java.jdbc.version} + + + com.h2database + h2 + ${h2.version} + runtime + + + org.assertj + assertj-core + ${assertj.version} + + + + + + 0.7.11 + 1.2.5 + 2.0.0 + 2.2.2 + 3.8.0 + + + \ No newline at end of file diff --git a/rxjava-2/src/main/java/com/baeldung/rxjava/RandomRelay.java b/rxjava-libraries/src/main/java/com/baeldung/rxjava/RandomRelay.java similarity index 100% rename from rxjava-2/src/main/java/com/baeldung/rxjava/RandomRelay.java rename to rxjava-libraries/src/main/java/com/baeldung/rxjava/RandomRelay.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/jdbc/Connector.java b/rxjava-libraries/src/main/java/com/baeldung/rxjava/jdbc/Connector.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/jdbc/Connector.java rename to rxjava-libraries/src/main/java/com/baeldung/rxjava/jdbc/Connector.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/jdbc/Employee.java b/rxjava-libraries/src/main/java/com/baeldung/rxjava/jdbc/Employee.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/jdbc/Employee.java rename to rxjava-libraries/src/main/java/com/baeldung/rxjava/jdbc/Employee.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/jdbc/Manager.java b/rxjava-libraries/src/main/java/com/baeldung/rxjava/jdbc/Manager.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/jdbc/Manager.java rename to rxjava-libraries/src/main/java/com/baeldung/rxjava/jdbc/Manager.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/jdbc/Utils.java b/rxjava-libraries/src/main/java/com/baeldung/rxjava/jdbc/Utils.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/jdbc/Utils.java rename to rxjava-libraries/src/main/java/com/baeldung/rxjava/jdbc/Utils.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/FlowableIntegrationTest.java b/rxjava-libraries/src/test/java/com/baeldung/rxjava/FlowableIntegrationTest.java similarity index 100% rename from rxjava-2/src/test/java/com/baeldung/rxjava/FlowableIntegrationTest.java rename to rxjava-libraries/src/test/java/com/baeldung/rxjava/FlowableIntegrationTest.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/RxRelayIntegrationTest.java b/rxjava-libraries/src/test/java/com/baeldung/rxjava/RxRelayIntegrationTest.java similarity index 100% rename from rxjava-2/src/test/java/com/baeldung/rxjava/RxRelayIntegrationTest.java rename to rxjava-libraries/src/test/java/com/baeldung/rxjava/RxRelayIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/jdbc/AutomapClassIntegrationTest.java b/rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/AutomapClassIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/jdbc/AutomapClassIntegrationTest.java rename to rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/AutomapClassIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/jdbc/AutomapInterfaceIntegrationTest.java b/rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/AutomapInterfaceIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/jdbc/AutomapInterfaceIntegrationTest.java rename to rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/AutomapInterfaceIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/jdbc/BasicQueryTypesIntegrationTest.java b/rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/BasicQueryTypesIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/jdbc/BasicQueryTypesIntegrationTest.java rename to rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/BasicQueryTypesIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/jdbc/InsertBlobIntegrationTest.java b/rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/InsertBlobIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/jdbc/InsertBlobIntegrationTest.java rename to rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/InsertBlobIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/jdbc/InsertClobIntegrationTest.java b/rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/InsertClobIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/jdbc/InsertClobIntegrationTest.java rename to rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/InsertClobIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/jdbc/ReturnKeysIntegrationTest.java b/rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/ReturnKeysIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/jdbc/ReturnKeysIntegrationTest.java rename to rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/ReturnKeysIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/jdbc/TransactionIntegrationTest.java b/rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/TransactionIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/jdbc/TransactionIntegrationTest.java rename to rxjava-libraries/src/test/java/com/baeldung/rxjava/jdbc/TransactionIntegrationTest.java diff --git a/rxjava/src/test/resources/actual_clob b/rxjava-libraries/src/test/resources/actual_clob similarity index 100% rename from rxjava/src/test/resources/actual_clob rename to rxjava-libraries/src/test/resources/actual_clob diff --git a/rxjava/src/test/resources/expected_clob b/rxjava-libraries/src/test/resources/expected_clob similarity index 100% rename from rxjava/src/test/resources/expected_clob rename to rxjava-libraries/src/test/resources/expected_clob diff --git a/rxjava-observables/README.md b/rxjava-observables/README.md new file mode 100644 index 0000000000..3bec990012 --- /dev/null +++ b/rxjava-observables/README.md @@ -0,0 +1,11 @@ +## RxJava Observables + + This module contains articles about RxJava Observables + +### Related Articles: + +- [Combining Observables in RxJava](https://www.baeldung.com/rxjava-combine-observables) +- [RxJava One Observable, Multiple Subscribers](https://www.baeldung.com/rxjava-multiple-subscribers-observable) +- [RxJava StringObservable](https://www.baeldung.com/rxjava-string) +- [Filtering Observables in RxJava](https://www.baeldung.com/rxjava-filtering) + diff --git a/rxjava-observables/pom.xml b/rxjava-observables/pom.xml new file mode 100644 index 0000000000..c2bf0bcd88 --- /dev/null +++ b/rxjava-observables/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + rxjava-observables + 1.0-SNAPSHOT + rxjava-observables + + + com.baeldung + parent-java + 0.0.1-SNAPSHOT + ../parent-java + + + + + io.reactivex + rxjava + ${rx.java.version} + + + io.reactivex + rxjava-string + ${rx.java.string.version} + + + org.assertj + assertj-core + ${assertj.version} + + + + + + 1.1.1 + 1.2.5 + 3.8.0 + + + \ No newline at end of file diff --git a/rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersColdObs.java b/rxjava-observables/src/main/java/com/baeldung/rxjava/MultipleSubscribersColdObs.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersColdObs.java rename to rxjava-observables/src/main/java/com/baeldung/rxjava/MultipleSubscribersColdObs.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersHotObs.java b/rxjava-observables/src/main/java/com/baeldung/rxjava/MultipleSubscribersHotObs.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersHotObs.java rename to rxjava-observables/src/main/java/com/baeldung/rxjava/MultipleSubscribersHotObs.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/combine/ObservableCombineUnitTest.java b/rxjava-observables/src/test/java/com/baeldung/rxjava/combine/ObservableCombineUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/combine/ObservableCombineUnitTest.java rename to rxjava-observables/src/test/java/com/baeldung/rxjava/combine/ObservableCombineUnitTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaFilterOperatorsIntegrationTest.java b/rxjava-observables/src/test/java/com/baeldung/rxjava/filters/RxJavaFilterOperatorsIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaFilterOperatorsIntegrationTest.java rename to rxjava-observables/src/test/java/com/baeldung/rxjava/filters/RxJavaFilterOperatorsIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaSkipOperatorsIntegrationTest.java b/rxjava-observables/src/test/java/com/baeldung/rxjava/filters/RxJavaSkipOperatorsIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaSkipOperatorsIntegrationTest.java rename to rxjava-observables/src/test/java/com/baeldung/rxjava/filters/RxJavaSkipOperatorsIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsIntegrationTest.java b/rxjava-observables/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsIntegrationTest.java rename to rxjava-observables/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/operators/RxStringOperatorsUnitTest.java b/rxjava-observables/src/test/java/com/baeldung/rxjava/operators/RxStringOperatorsUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/operators/RxStringOperatorsUnitTest.java rename to rxjava-observables/src/test/java/com/baeldung/rxjava/operators/RxStringOperatorsUnitTest.java diff --git a/rxjava-operators/README.md b/rxjava-operators/README.md new file mode 100644 index 0000000000..293d310bbd --- /dev/null +++ b/rxjava-operators/README.md @@ -0,0 +1,12 @@ +## RxJava Operators + + This module contains articles about RxJava Operators + +### Related Articles: + +- [Mathematical and Aggregate Operators in RxJava](https://www.baeldung.com/rxjava-math) +- [Observable Utility Operators in RxJava](https://www.baeldung.com/rxjava-observable-operators) +- [Implementing Custom Operators in RxJava](https://www.baeldung.com/rxjava-custom-operators) +- [Converting Synchronous and Asynchronous APIs to Observables using RxJava2](https://www.baeldung.com/rxjava-apis-to-observables) + + diff --git a/rxjava/pom.xml b/rxjava-operators/pom.xml similarity index 66% rename from rxjava/pom.xml rename to rxjava-operators/pom.xml index 85106d1127..8064613f45 100644 --- a/rxjava/pom.xml +++ b/rxjava-operators/pom.xml @@ -1,11 +1,11 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - rxjava + rxjava-operators 1.0-SNAPSHOT - rxjava - + rxjava-operators + com.baeldung parent-java @@ -19,48 +19,40 @@ rxjava ${rx.java.version} - - io.reactivex - rxjava-math - ${rx.java.math.version} + io.reactivex.rxjava2 + rxjava + ${rx.java2.version} - + - io.reactivex - rxjava-string - ${rx.java.string.version} - - - - com.jayway.awaitility - awaitility - ${awaitility.version} - - - com.github.davidmoten - rxjava-jdbc - ${rx.java.jdbc.version} - - - com.h2database - h2 - ${h2.version} - runtime + com.github.akarnokd + rxjava2-extensions + ${rxjava2.ext.version} org.assertj assertj-core ${assertj.version} + + io.reactivex + rxjava-math + ${rx.java.math.version} + + + com.jayway.awaitility + awaitility + ${awaitility.version} + + 0.20.4 + 2.2.2 3.8.0 1.2.5 - 0.7.11 1.0.0 - 1.1.1 1.7.0 diff --git a/rxjava/src/main/java/com/baeldung/rxjava/operator/ToCleanString.java b/rxjava-operators/src/main/java/com/baeldung/rxjava/operator/ToCleanString.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/operator/ToCleanString.java rename to rxjava-operators/src/main/java/com/baeldung/rxjava/operator/ToCleanString.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/operator/ToLength.java b/rxjava-operators/src/main/java/com/baeldung/rxjava/operator/ToLength.java similarity index 100% rename from rxjava/src/main/java/com/baeldung/rxjava/operator/ToLength.java rename to rxjava-operators/src/main/java/com/baeldung/rxjava/operator/ToLength.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java b/rxjava-operators/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java similarity index 96% rename from rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java rename to rxjava-operators/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java index 90f4fe94ae..2842fab80e 100644 --- a/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java +++ b/rxjava-operators/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java @@ -1,107 +1,107 @@ -package com.baeldung.rxjava; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -import hu.akarnokd.rxjava2.async.AsyncObservable; -import io.reactivex.Observable; - -public class AsyncAndSyncToObservableIntegrationTest { - - AtomicInteger counter = new AtomicInteger(); - Callable callable = () -> counter.incrementAndGet(); - - /* Method will execute every time it gets subscribed*/ - @Test - public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() { - - Observable source = Observable.fromCallable(callable); - - for (int i = 1; i < 5; i++) { - source.test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(i); - - assertEquals(i, counter.get()); - } - } - - /* Method will execute only once and cache its result.*/ - @Test - public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() { - - Observable source = AsyncObservable.start(callable); - - for (int i = 1; i < 5; i++) { - source.test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - - assertEquals(1, counter.get()); - } - } - - /* Method will execute only once and cache its result.*/ - @Test - public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() { - - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future future = executor.submit(callable); - Observable source = Observable.fromFuture(future); - - for (int i = 1; i < 5; i++) { - source.test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - - assertEquals(1, counter.get()); - } - - executor.shutdown(); - } - - /* Method will execute every time it gets subscribed*/ - @Test - public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() { - - ExecutorService executor = Executors.newSingleThreadExecutor(); - Observable source = AsyncObservable.startFuture(() -> executor.submit(callable)); - - for (int i = 1; i < 5; i++) { - source.test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(i); - - assertEquals(i, counter.get()); - } - - executor.shutdown(); - } - - /*Method will execute only once and cache its result.*/ - @Test - public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() { - List list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() }); - ExecutorService exec = Executors.newSingleThreadExecutor(); - Callable> callable = () -> Observable.fromIterable(list); - Observable source = AsyncObservable.deferFuture(() -> exec.submit(callable)); - for (int i = 1; i < 4; i++) { - source.test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1, 2, 3); - } - - exec.shutdown(); - } - -} +package com.baeldung.rxjava; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import hu.akarnokd.rxjava2.async.AsyncObservable; +import io.reactivex.Observable; + +public class AsyncAndSyncToObservableIntegrationTest { + + AtomicInteger counter = new AtomicInteger(); + Callable callable = () -> counter.incrementAndGet(); + + /* Method will execute every time it gets subscribed*/ + @Test + public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() { + + Observable source = Observable.fromCallable(callable); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(i); + + assertEquals(i, counter.get()); + } + } + + /* Method will execute only once and cache its result.*/ + @Test + public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() { + + Observable source = AsyncObservable.start(callable); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + + assertEquals(1, counter.get()); + } + } + + /* Method will execute only once and cache its result.*/ + @Test + public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() { + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(callable); + Observable source = Observable.fromFuture(future); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + + assertEquals(1, counter.get()); + } + + executor.shutdown(); + } + + /* Method will execute every time it gets subscribed*/ + @Test + public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() { + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Observable source = AsyncObservable.startFuture(() -> executor.submit(callable)); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(i); + + assertEquals(i, counter.get()); + } + + executor.shutdown(); + } + + /*Method will execute only once and cache its result.*/ + @Test + public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() { + List list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() }); + ExecutorService exec = Executors.newSingleThreadExecutor(); + Callable> callable = () -> Observable.fromIterable(list); + Observable source = AsyncObservable.deferFuture(() -> exec.submit(callable)); + for (int i = 1; i < 4; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3); + } + + exec.shutdown(); + } + +} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/RxJavaCustomOperatorUnitTest.java b/rxjava-operators/src/test/java/com/baeldung/rxjava/RxJavaCustomOperatorUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/RxJavaCustomOperatorUnitTest.java rename to rxjava-operators/src/test/java/com/baeldung/rxjava/RxJavaCustomOperatorUnitTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/UtilityOperatorsIntegrationTest.java b/rxjava-operators/src/test/java/com/baeldung/rxjava/UtilityOperatorsIntegrationTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/UtilityOperatorsIntegrationTest.java rename to rxjava-operators/src/test/java/com/baeldung/rxjava/UtilityOperatorsIntegrationTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/operators/RxAggregateOperatorsUnitTest.java b/rxjava-operators/src/test/java/com/baeldung/rxjava/operators/RxAggregateOperatorsUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/operators/RxAggregateOperatorsUnitTest.java rename to rxjava-operators/src/test/java/com/baeldung/rxjava/operators/RxAggregateOperatorsUnitTest.java diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/operators/RxFlatmapAndSwitchmapUnitTest.java b/rxjava-operators/src/test/java/com/baeldung/rxjava/operators/RxFlatmapAndSwitchmapUnitTest.java similarity index 100% rename from rxjava-2/src/test/java/com/baeldung/rxjava/operators/RxFlatmapAndSwitchmapUnitTest.java rename to rxjava-operators/src/test/java/com/baeldung/rxjava/operators/RxFlatmapAndSwitchmapUnitTest.java diff --git a/rxjava/src/test/java/com/baeldung/rxjava/operators/RxMathematicalOperatorsUnitTest.java b/rxjava-operators/src/test/java/com/baeldung/rxjava/operators/RxMathematicalOperatorsUnitTest.java similarity index 100% rename from rxjava/src/test/java/com/baeldung/rxjava/operators/RxMathematicalOperatorsUnitTest.java rename to rxjava-operators/src/test/java/com/baeldung/rxjava/operators/RxMathematicalOperatorsUnitTest.java diff --git a/rxjava/README.md b/rxjava/README.md deleted file mode 100644 index c9308ddcc6..0000000000 --- a/rxjava/README.md +++ /dev/null @@ -1,20 +0,0 @@ -## RxJava - -This module contains articles about RxJava. - -### Relevant articles: - -- [Dealing with Backpressure with RxJava](https://www.baeldung.com/rxjava-backpressure) -- [How to Test RxJava?](https://www.baeldung.com/rxjava-testing) -- [Implementing Custom Operators in RxJava](https://www.baeldung.com/rxjava-custom-operators) -- [Introduction to RxJava](https://www.baeldung.com/rx-java) -- [Observable Utility Operators in RxJava](https://www.baeldung.com/rxjava-observable-operators) -- [Introduction to rxjava-jdbc](https://www.baeldung.com/rxjava-jdbc) -- [Schedulers in RxJava](https://www.baeldung.com/rxjava-schedulers) -- [Mathematical and Aggregate Operators in RxJava](https://www.baeldung.com/rxjava-math) -- [Combining Observables in RxJava](https://www.baeldung.com/rxjava-combine-observables) -- [RxJava StringObservable](https://www.baeldung.com/rxjava-string) -- [Filtering Observables in RxJava](https://www.baeldung.com/rxjava-filtering) -- [RxJava One Observable, Multiple Subscribers](https://www.baeldung.com/rxjava-multiple-subscribers-observable) -- [Difference Between Flatmap and Switchmap in RxJava](https://www.baeldung.com/rxjava-flatmap-switchmap) -- More articles: [[next -->]](/rxjava-2) diff --git a/rxjava/src/main/resources/logback.xml b/rxjava/src/main/resources/logback.xml deleted file mode 100644 index 7d900d8ea8..0000000000 --- a/rxjava/src/main/resources/logback.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - - \ No newline at end of file