Merge pull request #8130 from sjmillington/rxjava-move

[BAEL-16658] Split rxjava (& rxjava-2) by subject
This commit is contained in:
Josh Cummings 2019-11-05 21:26:10 -07:00 committed by GitHub
commit 046c1b3f00
65 changed files with 661 additions and 570 deletions

12
pom.xml
View File

@ -602,8 +602,10 @@
<!-- <module>rmi</module> --> <!-- Not a maven project -->
<module>rule-engines</module>
<module>rsocket</module>
<module>rxjava</module>
<module>rxjava-2</module>
<module>rxjava-core</module>
<module>rxjava-observables</module>
<module>rxjava-operators</module>
<module>rxjava-libraries</module>
<module>software-security/sql-injection-samples</module>
<module>tensorflow-java</module>
@ -1367,8 +1369,10 @@
<!-- <module>rmi</module> --> <!-- Not a maven project -->
<module>rule-engines</module>
<module>rsocket</module>
<module>rxjava</module>
<module>rxjava-2</module>
<module>rxjava-core</module>
<module>rxjava-observables</module>
<module>rxjava-operators</module>
<module>rxjava-libraries</module>
<module>oauth2-framework-impl</module>
<module>spf4j</module>
<module>spring-boot-performance</module>

View File

@ -1,14 +0,0 @@
## RxJava
This module contains articles about RxJava.
### Relevant articles:
- [RxJava and Error Handling](https://www.baeldung.com/rxjava-error-handling)
- [RxJava 2 Flowable](https://www.baeldung.com/rxjava-2-flowable)
- [RxJava Maybe](https://www.baeldung.com/rxjava-maybe)
- [Introduction to RxRelay for RxJava](https://www.baeldung.com/rx-relay)
- [Combining RxJava Completables](https://www.baeldung.com/rxjava-completable)
- [Converting Synchronous and Asynchronous APIs to Observables using RxJava2](https://www.baeldung.com/rxjava-apis-to-observables)
- [RxJava Hooks](https://www.baeldung.com/rxjava-hooks)
- More articles: [[<-- prev]](/rxjava)

View File

@ -1,53 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rxjava-2</artifactId>
<name>rxjava-2</name>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-java</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java2.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
<dependency>
<groupId>com.jakewharton.rxrelay2</groupId>
<artifactId>rxrelay</artifactId>
<version>${rxrelay.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.akarnokd/rxjava2-extensions -->
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>${rxjava2.ext.version}</version>
</dependency>
</dependencies>
<properties>
<assertj.version>3.8.0</assertj.version>
<rx.java2.version>2.2.2</rx.java2.version>
<awaitility.version>1.7.0</awaitility.version>
<rxrelay.version>2.0.0</rxrelay.version>
<rxjava2.ext.version>0.20.4</rxjava2.ext.version>
</properties>
</project>

16
rxjava-core/README.md Normal file
View File

@ -0,0 +1,16 @@
## RxJava
This module contains articles about RxJava.
### Relevant articles:
- [Dealing with Backpressure with RxJava](https://www.baeldung.com/rxjava-backpressure)
- [How to Test RxJava?](https://www.baeldung.com/rxjava-testing)
- [Introduction to RxJava](https://www.baeldung.com/rx-java)
- [Schedulers in RxJava](https://www.baeldung.com/rxjava-schedulers)
- [Difference Between Flatmap and Switchmap in RxJava](https://www.baeldung.com/rxjava-flatmap-switchmap)
- [RxJava and Error Handling](https://www.baeldung.com/rxjava-error-handling)
- [RxJava Maybe](https://www.baeldung.com/rxjava-maybe)
- [Combining RxJava Completables](https://www.baeldung.com/rxjava-completable)
- [RxJava Hooks](https://www.baeldung.com/rxjava-hooks)
- More articles: [[next -->]](/rxjava-2)

46
rxjava-core/pom.xml Normal file
View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rxjava-core</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rxjava-core</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-java</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java2.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
</dependencies>
<properties>
<assertj.version>3.8.0</assertj.version>
<rx.java.version>1.2.5</rx.java.version>
<awaitility.version>1.7.0</awaitility.version>
<rx.java2.version>2.2.2</rx.java2.version>
</properties>
</project>

View File

@ -1,84 +1,84 @@
package com.baeldung.rxjava;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class RxJavaHooksManualTest {
private boolean initHookCalled = false;
private boolean hookCalled = false;
@Test
public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.io())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 15)
.map(v -> v * 2)
.subscribeOn(Schedulers.newThread())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
assertTrue(hookCalled && initHookCalled);
}
@After
public void reset() {
hookCalled = false;
initHookCalled = false;
RxJavaPlugins.reset();
}
}
package com.baeldung.rxjava;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class RxJavaHooksManualTest {
private boolean initHookCalled = false;
private boolean hookCalled = false;
@Test
public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.io())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 15)
.map(v -> v * 2)
.subscribeOn(Schedulers.newThread())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
assertTrue(hookCalled && initHookCalled);
}
@After
public void reset() {
hookCalled = false;
initHookCalled = false;
RxJavaPlugins.reset();
}
}

View File

@ -1,244 +1,244 @@
package com.baeldung.rxjava;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Test;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class RxJavaHooksUnitTest {
private boolean initHookCalled = false;
private boolean hookCalled = false;
@Test
public void givenObservable_whenError_shouldExecuteTheHook() {
RxJavaPlugins.setErrorHandler(throwable -> {
hookCalled = true;
});
Observable.error(new IllegalStateException())
.subscribe();
assertTrue(hookCalled);
}
@Test
public void givenCompletable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnCompletableAssembly(completable -> {
hookCalled = true;
return completable;
});
Completable.fromSingle(Single.just(1));
assertTrue(hookCalled);
}
@Test
public void givenCompletable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
hookCalled = true;
return observer;
});
Completable.fromSingle(Single.just(1))
.test();
assertTrue(hookCalled);
}
@Test
public void givenObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnObservableAssembly(observable -> {
hookCalled = true;
return observable;
});
Observable.range(1, 10);
assertTrue(hookCalled);
}
@Test
public void givenObservable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
hookCalled = true;
return observer;
});
Observable.range(1, 10)
.test();
assertTrue(hookCalled);
}
@Test
public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
hookCalled = true;
return connectableObservable;
});
ConnectableObservable.range(1, 10)
.publish()
.connect();
assertTrue(hookCalled);
}
@Test
public void givenFlowable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
hookCalled = true;
return flowable;
});
Flowable.range(1, 10);
assertTrue(hookCalled);
}
@Test
public void givenFlowable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
hookCalled = true;
return observer;
});
Flowable.range(1, 10)
.test();
assertTrue(hookCalled);
}
@Test
public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
hookCalled = true;
return connectableFlowable;
});
ConnectableFlowable.range(1, 10)
.publish()
.connect();
assertTrue(hookCalled);
}
@Test
public void givenParallel_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
hookCalled = true;
return parallelFlowable;
});
Flowable.range(1, 10)
.parallel();
assertTrue(hookCalled);
}
@Test
public void givenMaybe_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnMaybeAssembly(maybe -> {
hookCalled = true;
return maybe;
});
Maybe.just(1);
assertTrue(hookCalled);
}
@Test
public void givenMaybe_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
hookCalled = true;
return observer;
});
Maybe.just(1)
.test();
assertTrue(hookCalled);
}
@Test
public void givenSingle_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnSingleAssembly(single -> {
hookCalled = true;
return single;
});
Single.just(1);
assertTrue(hookCalled);
}
@Test
public void givenSingle_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
hookCalled = true;
return observer;
});
Single.just(1)
.test();
assertTrue(hookCalled);
}
@Test
public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setScheduleHandler((runnable) -> {
hookCalled = true;
return runnable;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
hookCalled = false;
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled);
}
@Test
public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled && initHookCalled);
}
@After
public void reset() {
initHookCalled = false;
hookCalled = false;
RxJavaPlugins.reset();
}
}
package com.baeldung.rxjava;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Test;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class RxJavaHooksUnitTest {
private boolean initHookCalled = false;
private boolean hookCalled = false;
@Test
public void givenObservable_whenError_shouldExecuteTheHook() {
RxJavaPlugins.setErrorHandler(throwable -> {
hookCalled = true;
});
Observable.error(new IllegalStateException())
.subscribe();
assertTrue(hookCalled);
}
@Test
public void givenCompletable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnCompletableAssembly(completable -> {
hookCalled = true;
return completable;
});
Completable.fromSingle(Single.just(1));
assertTrue(hookCalled);
}
@Test
public void givenCompletable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
hookCalled = true;
return observer;
});
Completable.fromSingle(Single.just(1))
.test();
assertTrue(hookCalled);
}
@Test
public void givenObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnObservableAssembly(observable -> {
hookCalled = true;
return observable;
});
Observable.range(1, 10);
assertTrue(hookCalled);
}
@Test
public void givenObservable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
hookCalled = true;
return observer;
});
Observable.range(1, 10)
.test();
assertTrue(hookCalled);
}
@Test
public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
hookCalled = true;
return connectableObservable;
});
ConnectableObservable.range(1, 10)
.publish()
.connect();
assertTrue(hookCalled);
}
@Test
public void givenFlowable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
hookCalled = true;
return flowable;
});
Flowable.range(1, 10);
assertTrue(hookCalled);
}
@Test
public void givenFlowable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
hookCalled = true;
return observer;
});
Flowable.range(1, 10)
.test();
assertTrue(hookCalled);
}
@Test
public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
hookCalled = true;
return connectableFlowable;
});
ConnectableFlowable.range(1, 10)
.publish()
.connect();
assertTrue(hookCalled);
}
@Test
public void givenParallel_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
hookCalled = true;
return parallelFlowable;
});
Flowable.range(1, 10)
.parallel();
assertTrue(hookCalled);
}
@Test
public void givenMaybe_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnMaybeAssembly(maybe -> {
hookCalled = true;
return maybe;
});
Maybe.just(1);
assertTrue(hookCalled);
}
@Test
public void givenMaybe_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
hookCalled = true;
return observer;
});
Maybe.just(1)
.test();
assertTrue(hookCalled);
}
@Test
public void givenSingle_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnSingleAssembly(single -> {
hookCalled = true;
return single;
});
Single.just(1);
assertTrue(hookCalled);
}
@Test
public void givenSingle_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
hookCalled = true;
return observer;
});
Single.just(1)
.test();
assertTrue(hookCalled);
}
@Test
public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setScheduleHandler((runnable) -> {
hookCalled = true;
return runnable;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
hookCalled = false;
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled);
}
@Test
public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled && initHookCalled);
}
@After
public void reset() {
initHookCalled = false;
hookCalled = false;
RxJavaPlugins.reset();
}
}

View File

@ -0,0 +1,10 @@
## RxJava Libraries
This module contains articles about RxJava libraries
### Related Articles:
- [RxJava 2 Flowable](https://www.baeldung.com/rxjava-2-flowable)
- [Introduction to RxRelay for RxJava](https://www.baeldung.com/rx-relay)
- [Introduction to rxjava-jdbc](https://www.baeldung.com/rxjava-jdbc)

59
rxjava-libraries/pom.xml Normal file
View File

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rxjava-libraries</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rxjava-libraries</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-java</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java2.version}</version>
</dependency>
<dependency>
<groupId>com.jakewharton.rxrelay2</groupId>
<artifactId>rxrelay</artifactId>
<version>${rxrelay.version}</version>
</dependency>
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-jdbc</artifactId>
<version>${rx.java.jdbc.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
</dependencies>
<properties>
<rx.java.jdbc.version>0.7.11</rx.java.jdbc.version>
<rx.java.version>1.2.5</rx.java.version>
<rxrelay.version>2.0.0</rxrelay.version>
<rx.java2.version>2.2.2</rx.java2.version>
<assertj.version>3.8.0</assertj.version>
</properties>
</project>

View File

@ -0,0 +1,11 @@
## RxJava Observables
This module contains articles about RxJava Observables
### Related Articles:
- [Combining Observables in RxJava](https://www.baeldung.com/rxjava-combine-observables)
- [RxJava One Observable, Multiple Subscribers](https://www.baeldung.com/rxjava-multiple-subscribers-observable)
- [RxJava StringObservable](https://www.baeldung.com/rxjava-string)
- [Filtering Observables in RxJava](https://www.baeldung.com/rxjava-filtering)

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rxjava-observables</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rxjava-observables</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-java</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-string</artifactId>
<version>${rx.java.string.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
</dependencies>
<properties>
<rx.java.string.version>1.1.1</rx.java.string.version>
<rx.java.version>1.2.5</rx.java.version>
<assertj.version>3.8.0</assertj.version>
</properties>
</project>

View File

@ -0,0 +1,12 @@
## RxJava Operators
This module contains articles about RxJava Operators
### Related Articles:
- [Mathematical and Aggregate Operators in RxJava](https://www.baeldung.com/rxjava-math)
- [Observable Utility Operators in RxJava](https://www.baeldung.com/rxjava-observable-operators)
- [Implementing Custom Operators in RxJava](https://www.baeldung.com/rxjava-custom-operators)
- [Converting Synchronous and Asynchronous APIs to Observables using RxJava2](https://www.baeldung.com/rxjava-apis-to-observables)

View File

@ -1,11 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rxjava</artifactId>
<artifactId>rxjava-operators</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rxjava</name>
<name>rxjava-operators</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-java</artifactId>
@ -19,48 +19,40 @@
<artifactId>rxjava</artifactId>
<version>${rx.java.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
<version>${rx.java.math.version}</version>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java2.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.akarnokd/rxjava2-extensions -->
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-string</artifactId>
<version>${rx.java.string.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-jdbc</artifactId>
<version>${rx.java.jdbc.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
<scope>runtime</scope>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>${rxjava2.ext.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
<version>${rx.java.math.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
</dependencies>
<properties>
<rxjava2.ext.version>0.20.4</rxjava2.ext.version>
<rx.java2.version>2.2.2</rx.java2.version>
<assertj.version>3.8.0</assertj.version>
<rx.java.version>1.2.5</rx.java.version>
<rx.java.jdbc.version>0.7.11</rx.java.jdbc.version>
<rx.java.math.version>1.0.0</rx.java.math.version>
<rx.java.string.version>1.1.1</rx.java.string.version>
<awaitility.version>1.7.0</awaitility.version>
</properties>

View File

@ -1,107 +1,107 @@
package com.baeldung.rxjava;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import hu.akarnokd.rxjava2.async.AsyncObservable;
import io.reactivex.Observable;
public class AsyncAndSyncToObservableIntegrationTest {
AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();
/* Method will execute every time it gets subscribed*/
@Test
public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() {
Observable<Integer> source = Observable.fromCallable(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
}
/* Method will execute only once and cache its result.*/
@Test
public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() {
Observable<Integer> source = AsyncObservable.start(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
}
/* Method will execute only once and cache its result.*/
@Test
public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);
Observable<Integer> source = Observable.fromFuture(future);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
executor.shutdown();
}
/* Method will execute every time it gets subscribed*/
@Test
public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
executor.shutdown();
}
/*Method will execute only once and cache its result.*/
@Test
public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() {
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3);
}
exec.shutdown();
}
}
package com.baeldung.rxjava;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import hu.akarnokd.rxjava2.async.AsyncObservable;
import io.reactivex.Observable;
public class AsyncAndSyncToObservableIntegrationTest {
AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();
/* Method will execute every time it gets subscribed*/
@Test
public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() {
Observable<Integer> source = Observable.fromCallable(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
}
/* Method will execute only once and cache its result.*/
@Test
public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() {
Observable<Integer> source = AsyncObservable.start(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
}
/* Method will execute only once and cache its result.*/
@Test
public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);
Observable<Integer> source = Observable.fromFuture(future);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
executor.shutdown();
}
/* Method will execute every time it gets subscribed*/
@Test
public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
executor.shutdown();
}
/*Method will execute only once and cache its result.*/
@Test
public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() {
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3);
}
exec.shutdown();
}
}

View File

@ -1,20 +0,0 @@
## RxJava
This module contains articles about RxJava.
### Relevant articles:
- [Dealing with Backpressure with RxJava](https://www.baeldung.com/rxjava-backpressure)
- [How to Test RxJava?](https://www.baeldung.com/rxjava-testing)
- [Implementing Custom Operators in RxJava](https://www.baeldung.com/rxjava-custom-operators)
- [Introduction to RxJava](https://www.baeldung.com/rx-java)
- [Observable Utility Operators in RxJava](https://www.baeldung.com/rxjava-observable-operators)
- [Introduction to rxjava-jdbc](https://www.baeldung.com/rxjava-jdbc)
- [Schedulers in RxJava](https://www.baeldung.com/rxjava-schedulers)
- [Mathematical and Aggregate Operators in RxJava](https://www.baeldung.com/rxjava-math)
- [Combining Observables in RxJava](https://www.baeldung.com/rxjava-combine-observables)
- [RxJava StringObservable](https://www.baeldung.com/rxjava-string)
- [Filtering Observables in RxJava](https://www.baeldung.com/rxjava-filtering)
- [RxJava One Observable, Multiple Subscribers](https://www.baeldung.com/rxjava-multiple-subscribers-observable)
- [Difference Between Flatmap and Switchmap in RxJava](https://www.baeldung.com/rxjava-flatmap-switchmap)
- More articles: [[next -->]](/rxjava-2)

View File

@ -1,13 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>