[BAEL-16658] Split rxjava (& rxjava-2) by subject
This commit is contained in:
parent
d47ea19834
commit
596809a9ce
12
pom.xml
12
pom.xml
|
@ -603,8 +603,10 @@
|
||||||
<!-- <module>rmi</module> --> <!-- Not a maven project -->
|
<!-- <module>rmi</module> --> <!-- Not a maven project -->
|
||||||
<module>rule-engines</module>
|
<module>rule-engines</module>
|
||||||
<module>rsocket</module>
|
<module>rsocket</module>
|
||||||
<module>rxjava</module>
|
<module>rxjava-core</module>
|
||||||
<module>rxjava-2</module>
|
<module>rxjava-observables</module>
|
||||||
|
<module>rxjava-operators</module>
|
||||||
|
<module>rxjava-libraries</module>
|
||||||
<module>software-security/sql-injection-samples</module>
|
<module>software-security/sql-injection-samples</module>
|
||||||
|
|
||||||
<module>tensorflow-java</module>
|
<module>tensorflow-java</module>
|
||||||
|
@ -1369,8 +1371,10 @@
|
||||||
<!-- <module>rmi</module> --> <!-- Not a maven project -->
|
<!-- <module>rmi</module> --> <!-- Not a maven project -->
|
||||||
<module>rule-engines</module>
|
<module>rule-engines</module>
|
||||||
<module>rsocket</module>
|
<module>rsocket</module>
|
||||||
<module>rxjava</module>
|
<module>rxjava-core</module>
|
||||||
<module>rxjava-2</module>
|
<module>rxjava-observables</module>
|
||||||
|
<module>rxjava-operators</module>
|
||||||
|
<module>rxjava-libraries</module>
|
||||||
<module>oauth2-framework-impl</module>
|
<module>oauth2-framework-impl</module>
|
||||||
<module>spf4j</module>
|
<module>spf4j</module>
|
||||||
<module>spring-boot-performance</module>
|
<module>spring-boot-performance</module>
|
||||||
|
|
|
@ -1,14 +0,0 @@
|
||||||
## RxJava
|
|
||||||
|
|
||||||
This module contains articles about RxJava.
|
|
||||||
|
|
||||||
### Relevant articles:
|
|
||||||
|
|
||||||
- [RxJava and Error Handling](https://www.baeldung.com/rxjava-error-handling)
|
|
||||||
- [RxJava 2 – Flowable](https://www.baeldung.com/rxjava-2-flowable)
|
|
||||||
- [RxJava Maybe](https://www.baeldung.com/rxjava-maybe)
|
|
||||||
- [Introduction to RxRelay for RxJava](https://www.baeldung.com/rx-relay)
|
|
||||||
- [Combining RxJava Completables](https://www.baeldung.com/rxjava-completable)
|
|
||||||
- [Converting Synchronous and Asynchronous APIs to Observables using RxJava2](https://www.baeldung.com/rxjava-apis-to-observables)
|
|
||||||
- [RxJava Hooks](https://www.baeldung.com/rxjava-hooks)
|
|
||||||
- More articles: [[<-- prev]](/rxjava)
|
|
|
@ -1,53 +0,0 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
|
||||||
<artifactId>rxjava-2</artifactId>
|
|
||||||
<name>rxjava-2</name>
|
|
||||||
<version>1.0-SNAPSHOT</version>
|
|
||||||
|
|
||||||
<parent>
|
|
||||||
<groupId>com.baeldung</groupId>
|
|
||||||
<artifactId>parent-java</artifactId>
|
|
||||||
<version>0.0.1-SNAPSHOT</version>
|
|
||||||
<relativePath>../parent-java</relativePath>
|
|
||||||
</parent>
|
|
||||||
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<groupId>io.reactivex.rxjava2</groupId>
|
|
||||||
<artifactId>rxjava</artifactId>
|
|
||||||
<version>${rx.java2.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.jayway.awaitility</groupId>
|
|
||||||
<artifactId>awaitility</artifactId>
|
|
||||||
<version>${awaitility.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.assertj</groupId>
|
|
||||||
<artifactId>assertj-core</artifactId>
|
|
||||||
<version>${assertj.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.jakewharton.rxrelay2</groupId>
|
|
||||||
<artifactId>rxrelay</artifactId>
|
|
||||||
<version>${rxrelay.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<!-- https://mvnrepository.com/artifact/com.github.akarnokd/rxjava2-extensions -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.github.akarnokd</groupId>
|
|
||||||
<artifactId>rxjava2-extensions</artifactId>
|
|
||||||
<version>${rxjava2.ext.version}</version>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
|
|
||||||
<properties>
|
|
||||||
<assertj.version>3.8.0</assertj.version>
|
|
||||||
<rx.java2.version>2.2.2</rx.java2.version>
|
|
||||||
<awaitility.version>1.7.0</awaitility.version>
|
|
||||||
<rxrelay.version>2.0.0</rxrelay.version>
|
|
||||||
<rxjava2.ext.version>0.20.4</rxjava2.ext.version>
|
|
||||||
</properties>
|
|
||||||
</project>
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
## RxJava
|
||||||
|
|
||||||
|
This module contains articles about RxJava.
|
||||||
|
|
||||||
|
### Relevant articles:
|
||||||
|
|
||||||
|
- [Dealing with Backpressure with RxJava](https://www.baeldung.com/rxjava-backpressure)
|
||||||
|
- [How to Test RxJava?](https://www.baeldung.com/rxjava-testing)
|
||||||
|
- [Introduction to RxJava](https://www.baeldung.com/rx-java)
|
||||||
|
- [Schedulers in RxJava](https://www.baeldung.com/rxjava-schedulers)
|
||||||
|
- [Difference Between Flatmap and Switchmap in RxJava](https://www.baeldung.com/rxjava-flatmap-switchmap)
|
||||||
|
- [RxJava and Error Handling](https://www.baeldung.com/rxjava-error-handling)
|
||||||
|
- [RxJava Maybe](https://www.baeldung.com/rxjava-maybe)
|
||||||
|
- [Combining RxJava Completables](https://www.baeldung.com/rxjava-completable)
|
||||||
|
- [RxJava Hooks](https://www.baeldung.com/rxjava-hooks)
|
||||||
|
- More articles: [[next -->]](/rxjava-2)
|
|
@ -0,0 +1,46 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>rxjava-core</artifactId>
|
||||||
|
<version>1.0-SNAPSHOT</version>
|
||||||
|
<name>rxjava-core</name>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>parent-java</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<relativePath>../parent-java</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.reactivex</groupId>
|
||||||
|
<artifactId>rxjava</artifactId>
|
||||||
|
<version>${rx.java.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.reactivex.rxjava2</groupId>
|
||||||
|
<artifactId>rxjava</artifactId>
|
||||||
|
<version>${rx.java2.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.jayway.awaitility</groupId>
|
||||||
|
<artifactId>awaitility</artifactId>
|
||||||
|
<version>${awaitility.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.assertj</groupId>
|
||||||
|
<artifactId>assertj-core</artifactId>
|
||||||
|
<version>${assertj.version}</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<assertj.version>3.8.0</assertj.version>
|
||||||
|
<rx.java.version>1.2.5</rx.java.version>
|
||||||
|
<awaitility.version>1.7.0</awaitility.version>
|
||||||
|
<rx.java2.version>2.2.2</rx.java2.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
</project>
|
|
@ -1,84 +1,84 @@
|
||||||
package com.baeldung.rxjava;
|
package com.baeldung.rxjava;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import io.reactivex.Observable;
|
import io.reactivex.Observable;
|
||||||
import io.reactivex.plugins.RxJavaPlugins;
|
import io.reactivex.plugins.RxJavaPlugins;
|
||||||
import io.reactivex.schedulers.Schedulers;
|
import io.reactivex.schedulers.Schedulers;
|
||||||
|
|
||||||
public class RxJavaHooksManualTest {
|
public class RxJavaHooksManualTest {
|
||||||
|
|
||||||
private boolean initHookCalled = false;
|
private boolean initHookCalled = false;
|
||||||
private boolean hookCalled = false;
|
private boolean hookCalled = false;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() {
|
public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() {
|
||||||
|
|
||||||
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
|
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
|
||||||
initHookCalled = true;
|
initHookCalled = true;
|
||||||
return scheduler.call();
|
return scheduler.call();
|
||||||
});
|
});
|
||||||
|
|
||||||
RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
|
RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return scheduler;
|
return scheduler;
|
||||||
});
|
});
|
||||||
|
|
||||||
Observable.range(1, 10)
|
Observable.range(1, 10)
|
||||||
.map(v -> v * 2)
|
.map(v -> v * 2)
|
||||||
.subscribeOn(Schedulers.io())
|
.subscribeOn(Schedulers.io())
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled && initHookCalled);
|
assertTrue(hookCalled && initHookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() {
|
public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
|
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
|
||||||
initHookCalled = true;
|
initHookCalled = true;
|
||||||
return scheduler.call();
|
return scheduler.call();
|
||||||
});
|
});
|
||||||
|
|
||||||
RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
|
RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return scheduler;
|
return scheduler;
|
||||||
});
|
});
|
||||||
|
|
||||||
Observable.range(1, 15)
|
Observable.range(1, 15)
|
||||||
.map(v -> v * 2)
|
.map(v -> v * 2)
|
||||||
.subscribeOn(Schedulers.newThread())
|
.subscribeOn(Schedulers.newThread())
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled && initHookCalled);
|
assertTrue(hookCalled && initHookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() {
|
public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() {
|
||||||
|
|
||||||
RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
|
RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
|
||||||
initHookCalled = true;
|
initHookCalled = true;
|
||||||
return scheduler.call();
|
return scheduler.call();
|
||||||
});
|
});
|
||||||
|
|
||||||
RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
|
RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return scheduler;
|
return scheduler;
|
||||||
});
|
});
|
||||||
|
|
||||||
Observable.range(1, 10)
|
Observable.range(1, 10)
|
||||||
.map(v -> v * 2)
|
.map(v -> v * 2)
|
||||||
.subscribeOn(Schedulers.single())
|
.subscribeOn(Schedulers.single())
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled && initHookCalled);
|
assertTrue(hookCalled && initHookCalled);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void reset() {
|
public void reset() {
|
||||||
hookCalled = false;
|
hookCalled = false;
|
||||||
initHookCalled = false;
|
initHookCalled = false;
|
||||||
RxJavaPlugins.reset();
|
RxJavaPlugins.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,244 +1,244 @@
|
||||||
package com.baeldung.rxjava;
|
package com.baeldung.rxjava;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import io.reactivex.Completable;
|
import io.reactivex.Completable;
|
||||||
import io.reactivex.Flowable;
|
import io.reactivex.Flowable;
|
||||||
import io.reactivex.Maybe;
|
import io.reactivex.Maybe;
|
||||||
import io.reactivex.Observable;
|
import io.reactivex.Observable;
|
||||||
import io.reactivex.Single;
|
import io.reactivex.Single;
|
||||||
import io.reactivex.flowables.ConnectableFlowable;
|
import io.reactivex.flowables.ConnectableFlowable;
|
||||||
import io.reactivex.observables.ConnectableObservable;
|
import io.reactivex.observables.ConnectableObservable;
|
||||||
import io.reactivex.plugins.RxJavaPlugins;
|
import io.reactivex.plugins.RxJavaPlugins;
|
||||||
import io.reactivex.schedulers.Schedulers;
|
import io.reactivex.schedulers.Schedulers;
|
||||||
|
|
||||||
public class RxJavaHooksUnitTest {
|
public class RxJavaHooksUnitTest {
|
||||||
|
|
||||||
private boolean initHookCalled = false;
|
private boolean initHookCalled = false;
|
||||||
private boolean hookCalled = false;
|
private boolean hookCalled = false;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenObservable_whenError_shouldExecuteTheHook() {
|
public void givenObservable_whenError_shouldExecuteTheHook() {
|
||||||
RxJavaPlugins.setErrorHandler(throwable -> {
|
RxJavaPlugins.setErrorHandler(throwable -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
});
|
});
|
||||||
|
|
||||||
Observable.error(new IllegalStateException())
|
Observable.error(new IllegalStateException())
|
||||||
.subscribe();
|
.subscribe();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenCompletable_whenAssembled_shouldExecuteTheHook() {
|
public void givenCompletable_whenAssembled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnCompletableAssembly(completable -> {
|
RxJavaPlugins.setOnCompletableAssembly(completable -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return completable;
|
return completable;
|
||||||
});
|
});
|
||||||
Completable.fromSingle(Single.just(1));
|
Completable.fromSingle(Single.just(1));
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenCompletable_whenSubscribed_shouldExecuteTheHook() {
|
public void givenCompletable_whenSubscribed_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
|
RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return observer;
|
return observer;
|
||||||
});
|
});
|
||||||
|
|
||||||
Completable.fromSingle(Single.just(1))
|
Completable.fromSingle(Single.just(1))
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenObservable_whenAssembled_shouldExecuteTheHook() {
|
public void givenObservable_whenAssembled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnObservableAssembly(observable -> {
|
RxJavaPlugins.setOnObservableAssembly(observable -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return observable;
|
return observable;
|
||||||
});
|
});
|
||||||
|
|
||||||
Observable.range(1, 10);
|
Observable.range(1, 10);
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenObservable_whenSubscribed_shouldExecuteTheHook() {
|
public void givenObservable_whenSubscribed_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
|
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return observer;
|
return observer;
|
||||||
});
|
});
|
||||||
|
|
||||||
Observable.range(1, 10)
|
Observable.range(1, 10)
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() {
|
public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
|
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return connectableObservable;
|
return connectableObservable;
|
||||||
});
|
});
|
||||||
|
|
||||||
ConnectableObservable.range(1, 10)
|
ConnectableObservable.range(1, 10)
|
||||||
.publish()
|
.publish()
|
||||||
.connect();
|
.connect();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenFlowable_whenAssembled_shouldExecuteTheHook() {
|
public void givenFlowable_whenAssembled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
|
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return flowable;
|
return flowable;
|
||||||
});
|
});
|
||||||
|
|
||||||
Flowable.range(1, 10);
|
Flowable.range(1, 10);
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenFlowable_whenSubscribed_shouldExecuteTheHook() {
|
public void givenFlowable_whenSubscribed_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
|
RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return observer;
|
return observer;
|
||||||
});
|
});
|
||||||
|
|
||||||
Flowable.range(1, 10)
|
Flowable.range(1, 10)
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() {
|
public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
|
RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return connectableFlowable;
|
return connectableFlowable;
|
||||||
});
|
});
|
||||||
|
|
||||||
ConnectableFlowable.range(1, 10)
|
ConnectableFlowable.range(1, 10)
|
||||||
.publish()
|
.publish()
|
||||||
.connect();
|
.connect();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenParallel_whenAssembled_shouldExecuteTheHook() {
|
public void givenParallel_whenAssembled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
|
RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return parallelFlowable;
|
return parallelFlowable;
|
||||||
});
|
});
|
||||||
|
|
||||||
Flowable.range(1, 10)
|
Flowable.range(1, 10)
|
||||||
.parallel();
|
.parallel();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenMaybe_whenAssembled_shouldExecuteTheHook() {
|
public void givenMaybe_whenAssembled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnMaybeAssembly(maybe -> {
|
RxJavaPlugins.setOnMaybeAssembly(maybe -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return maybe;
|
return maybe;
|
||||||
});
|
});
|
||||||
|
|
||||||
Maybe.just(1);
|
Maybe.just(1);
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenMaybe_whenSubscribed_shouldExecuteTheHook() {
|
public void givenMaybe_whenSubscribed_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
|
RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return observer;
|
return observer;
|
||||||
});
|
});
|
||||||
|
|
||||||
Maybe.just(1)
|
Maybe.just(1)
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenSingle_whenAssembled_shouldExecuteTheHook() {
|
public void givenSingle_whenAssembled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnSingleAssembly(single -> {
|
RxJavaPlugins.setOnSingleAssembly(single -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return single;
|
return single;
|
||||||
});
|
});
|
||||||
|
|
||||||
Single.just(1);
|
Single.just(1);
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenSingle_whenSubscribed_shouldExecuteTheHook() {
|
public void givenSingle_whenSubscribed_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
|
RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return observer;
|
return observer;
|
||||||
});
|
});
|
||||||
|
|
||||||
Single.just(1)
|
Single.just(1)
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() {
|
public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() {
|
||||||
|
|
||||||
RxJavaPlugins.setScheduleHandler((runnable) -> {
|
RxJavaPlugins.setScheduleHandler((runnable) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return runnable;
|
return runnable;
|
||||||
});
|
});
|
||||||
|
|
||||||
Observable.range(1, 10)
|
Observable.range(1, 10)
|
||||||
.map(v -> v * 2)
|
.map(v -> v * 2)
|
||||||
.subscribeOn(Schedulers.single())
|
.subscribeOn(Schedulers.single())
|
||||||
.test();
|
.test();
|
||||||
hookCalled = false;
|
hookCalled = false;
|
||||||
Observable.range(1, 10)
|
Observable.range(1, 10)
|
||||||
.map(v -> v * 2)
|
.map(v -> v * 2)
|
||||||
.subscribeOn(Schedulers.computation())
|
.subscribeOn(Schedulers.computation())
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled);
|
assertTrue(hookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() {
|
public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() {
|
||||||
|
|
||||||
RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
|
RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
|
||||||
initHookCalled = true;
|
initHookCalled = true;
|
||||||
return scheduler.call();
|
return scheduler.call();
|
||||||
});
|
});
|
||||||
RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
|
RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
|
||||||
hookCalled = true;
|
hookCalled = true;
|
||||||
return scheduler;
|
return scheduler;
|
||||||
});
|
});
|
||||||
|
|
||||||
Observable.range(1, 10)
|
Observable.range(1, 10)
|
||||||
.map(v -> v * 2)
|
.map(v -> v * 2)
|
||||||
.subscribeOn(Schedulers.computation())
|
.subscribeOn(Schedulers.computation())
|
||||||
.test();
|
.test();
|
||||||
assertTrue(hookCalled && initHookCalled);
|
assertTrue(hookCalled && initHookCalled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void reset() {
|
public void reset() {
|
||||||
initHookCalled = false;
|
initHookCalled = false;
|
||||||
hookCalled = false;
|
hookCalled = false;
|
||||||
RxJavaPlugins.reset();
|
RxJavaPlugins.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,10 @@
|
||||||
|
## RxJava Libraries
|
||||||
|
|
||||||
|
This module contains articles about RxJava libraries
|
||||||
|
|
||||||
|
### Related Articles:
|
||||||
|
|
||||||
|
- [RxJava 2 – Flowable](https://www.baeldung.com/rxjava-2-flowable)
|
||||||
|
- [Introduction to RxRelay for RxJava](https://www.baeldung.com/rx-relay)
|
||||||
|
- [Introduction to rxjava-jdbc](https://www.baeldung.com/rxjava-jdbc)
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>rxjava-libraries</artifactId>
|
||||||
|
<version>1.0-SNAPSHOT</version>
|
||||||
|
<name>rxjava-libraries</name>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>parent-java</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<relativePath>../parent-java</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.reactivex</groupId>
|
||||||
|
<artifactId>rxjava</artifactId>
|
||||||
|
<version>${rx.java.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.reactivex.rxjava2</groupId>
|
||||||
|
<artifactId>rxjava</artifactId>
|
||||||
|
<version>${rx.java2.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.jakewharton.rxrelay2</groupId>
|
||||||
|
<artifactId>rxrelay</artifactId>
|
||||||
|
<version>${rxrelay.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.davidmoten</groupId>
|
||||||
|
<artifactId>rxjava-jdbc</artifactId>
|
||||||
|
<version>${rx.java.jdbc.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.h2database</groupId>
|
||||||
|
<artifactId>h2</artifactId>
|
||||||
|
<version>${h2.version}</version>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.assertj</groupId>
|
||||||
|
<artifactId>assertj-core</artifactId>
|
||||||
|
<version>${assertj.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<rx.java.jdbc.version>0.7.11</rx.java.jdbc.version>
|
||||||
|
<rx.java.version>1.2.5</rx.java.version>
|
||||||
|
<rxrelay.version>2.0.0</rxrelay.version>
|
||||||
|
<rx.java2.version>2.2.2</rx.java2.version>
|
||||||
|
<assertj.version>3.8.0</assertj.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,11 @@
|
||||||
|
## RxJava Observables
|
||||||
|
|
||||||
|
This module contains articles about RxJava Observables
|
||||||
|
|
||||||
|
### Related Articles:
|
||||||
|
|
||||||
|
- [Combining Observables in RxJava](https://www.baeldung.com/rxjava-combine-observables)
|
||||||
|
- [RxJava One Observable, Multiple Subscribers](https://www.baeldung.com/rxjava-multiple-subscribers-observable)
|
||||||
|
- [RxJava StringObservable](https://www.baeldung.com/rxjava-string)
|
||||||
|
- [Filtering Observables in RxJava](https://www.baeldung.com/rxjava-filtering)
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>rxjava-observables</artifactId>
|
||||||
|
<version>1.0-SNAPSHOT</version>
|
||||||
|
<name>rxjava-observables</name>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>parent-java</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<relativePath>../parent-java</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.reactivex</groupId>
|
||||||
|
<artifactId>rxjava</artifactId>
|
||||||
|
<version>${rx.java.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.reactivex</groupId>
|
||||||
|
<artifactId>rxjava-string</artifactId>
|
||||||
|
<version>${rx.java.string.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.assertj</groupId>
|
||||||
|
<artifactId>assertj-core</artifactId>
|
||||||
|
<version>${assertj.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<rx.java.string.version>1.1.1</rx.java.string.version>
|
||||||
|
<rx.java.version>1.2.5</rx.java.version>
|
||||||
|
<assertj.version>3.8.0</assertj.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,12 @@
|
||||||
|
## RxJava Operators
|
||||||
|
|
||||||
|
This module contains articles about RxJava Operators
|
||||||
|
|
||||||
|
### Related Articles:
|
||||||
|
|
||||||
|
- [Mathematical and Aggregate Operators in RxJava](https://www.baeldung.com/rxjava-math)
|
||||||
|
- [Observable Utility Operators in RxJava](https://www.baeldung.com/rxjava-observable-operators)
|
||||||
|
- [Implementing Custom Operators in RxJava](https://www.baeldung.com/rxjava-custom-operators)
|
||||||
|
- [Converting Synchronous and Asynchronous APIs to Observables using RxJava2](https://www.baeldung.com/rxjava-apis-to-observables)
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<artifactId>rxjava</artifactId>
|
<artifactId>rxjava-operators</artifactId>
|
||||||
<version>1.0-SNAPSHOT</version>
|
<version>1.0-SNAPSHOT</version>
|
||||||
<name>rxjava</name>
|
<name>rxjava-operators</name>
|
||||||
|
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>com.baeldung</groupId>
|
<groupId>com.baeldung</groupId>
|
||||||
<artifactId>parent-java</artifactId>
|
<artifactId>parent-java</artifactId>
|
||||||
|
@ -19,48 +19,40 @@
|
||||||
<artifactId>rxjava</artifactId>
|
<artifactId>rxjava</artifactId>
|
||||||
<version>${rx.java.version}</version>
|
<version>${rx.java.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.reactivex</groupId>
|
<groupId>io.reactivex.rxjava2</groupId>
|
||||||
<artifactId>rxjava-math</artifactId>
|
<artifactId>rxjava</artifactId>
|
||||||
<version>${rx.java.math.version}</version>
|
<version>${rx.java2.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<!-- https://mvnrepository.com/artifact/com.github.akarnokd/rxjava2-extensions -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.reactivex</groupId>
|
<groupId>com.github.akarnokd</groupId>
|
||||||
<artifactId>rxjava-string</artifactId>
|
<artifactId>rxjava2-extensions</artifactId>
|
||||||
<version>${rx.java.string.version}</version>
|
<version>${rxjava2.ext.version}</version>
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.jayway.awaitility</groupId>
|
|
||||||
<artifactId>awaitility</artifactId>
|
|
||||||
<version>${awaitility.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.github.davidmoten</groupId>
|
|
||||||
<artifactId>rxjava-jdbc</artifactId>
|
|
||||||
<version>${rx.java.jdbc.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.h2database</groupId>
|
|
||||||
<artifactId>h2</artifactId>
|
|
||||||
<version>${h2.version}</version>
|
|
||||||
<scope>runtime</scope>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.assertj</groupId>
|
<groupId>org.assertj</groupId>
|
||||||
<artifactId>assertj-core</artifactId>
|
<artifactId>assertj-core</artifactId>
|
||||||
<version>${assertj.version}</version>
|
<version>${assertj.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.reactivex</groupId>
|
||||||
|
<artifactId>rxjava-math</artifactId>
|
||||||
|
<version>${rx.java.math.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.jayway.awaitility</groupId>
|
||||||
|
<artifactId>awaitility</artifactId>
|
||||||
|
<version>${awaitility.version}</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
<rxjava2.ext.version>0.20.4</rxjava2.ext.version>
|
||||||
|
<rx.java2.version>2.2.2</rx.java2.version>
|
||||||
<assertj.version>3.8.0</assertj.version>
|
<assertj.version>3.8.0</assertj.version>
|
||||||
<rx.java.version>1.2.5</rx.java.version>
|
<rx.java.version>1.2.5</rx.java.version>
|
||||||
<rx.java.jdbc.version>0.7.11</rx.java.jdbc.version>
|
|
||||||
<rx.java.math.version>1.0.0</rx.java.math.version>
|
<rx.java.math.version>1.0.0</rx.java.math.version>
|
||||||
<rx.java.string.version>1.1.1</rx.java.string.version>
|
|
||||||
<awaitility.version>1.7.0</awaitility.version>
|
<awaitility.version>1.7.0</awaitility.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
|
@ -1,107 +1,107 @@
|
||||||
package com.baeldung.rxjava;
|
package com.baeldung.rxjava;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import hu.akarnokd.rxjava2.async.AsyncObservable;
|
import hu.akarnokd.rxjava2.async.AsyncObservable;
|
||||||
import io.reactivex.Observable;
|
import io.reactivex.Observable;
|
||||||
|
|
||||||
public class AsyncAndSyncToObservableIntegrationTest {
|
public class AsyncAndSyncToObservableIntegrationTest {
|
||||||
|
|
||||||
AtomicInteger counter = new AtomicInteger();
|
AtomicInteger counter = new AtomicInteger();
|
||||||
Callable<Integer> callable = () -> counter.incrementAndGet();
|
Callable<Integer> callable = () -> counter.incrementAndGet();
|
||||||
|
|
||||||
/* Method will execute every time it gets subscribed*/
|
/* Method will execute every time it gets subscribed*/
|
||||||
@Test
|
@Test
|
||||||
public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() {
|
public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() {
|
||||||
|
|
||||||
Observable<Integer> source = Observable.fromCallable(callable);
|
Observable<Integer> source = Observable.fromCallable(callable);
|
||||||
|
|
||||||
for (int i = 1; i < 5; i++) {
|
for (int i = 1; i < 5; i++) {
|
||||||
source.test()
|
source.test()
|
||||||
.awaitDone(5, TimeUnit.SECONDS)
|
.awaitDone(5, TimeUnit.SECONDS)
|
||||||
.assertResult(i);
|
.assertResult(i);
|
||||||
|
|
||||||
assertEquals(i, counter.get());
|
assertEquals(i, counter.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Method will execute only once and cache its result.*/
|
/* Method will execute only once and cache its result.*/
|
||||||
@Test
|
@Test
|
||||||
public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() {
|
public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() {
|
||||||
|
|
||||||
Observable<Integer> source = AsyncObservable.start(callable);
|
Observable<Integer> source = AsyncObservable.start(callable);
|
||||||
|
|
||||||
for (int i = 1; i < 5; i++) {
|
for (int i = 1; i < 5; i++) {
|
||||||
source.test()
|
source.test()
|
||||||
.awaitDone(5, TimeUnit.SECONDS)
|
.awaitDone(5, TimeUnit.SECONDS)
|
||||||
.assertResult(1);
|
.assertResult(1);
|
||||||
|
|
||||||
assertEquals(1, counter.get());
|
assertEquals(1, counter.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Method will execute only once and cache its result.*/
|
/* Method will execute only once and cache its result.*/
|
||||||
@Test
|
@Test
|
||||||
public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() {
|
public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() {
|
||||||
|
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||||
Future<Integer> future = executor.submit(callable);
|
Future<Integer> future = executor.submit(callable);
|
||||||
Observable<Integer> source = Observable.fromFuture(future);
|
Observable<Integer> source = Observable.fromFuture(future);
|
||||||
|
|
||||||
for (int i = 1; i < 5; i++) {
|
for (int i = 1; i < 5; i++) {
|
||||||
source.test()
|
source.test()
|
||||||
.awaitDone(5, TimeUnit.SECONDS)
|
.awaitDone(5, TimeUnit.SECONDS)
|
||||||
.assertResult(1);
|
.assertResult(1);
|
||||||
|
|
||||||
assertEquals(1, counter.get());
|
assertEquals(1, counter.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Method will execute every time it gets subscribed*/
|
/* Method will execute every time it gets subscribed*/
|
||||||
@Test
|
@Test
|
||||||
public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() {
|
public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() {
|
||||||
|
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||||
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
|
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
|
||||||
|
|
||||||
for (int i = 1; i < 5; i++) {
|
for (int i = 1; i < 5; i++) {
|
||||||
source.test()
|
source.test()
|
||||||
.awaitDone(5, TimeUnit.SECONDS)
|
.awaitDone(5, TimeUnit.SECONDS)
|
||||||
.assertResult(i);
|
.assertResult(i);
|
||||||
|
|
||||||
assertEquals(i, counter.get());
|
assertEquals(i, counter.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*Method will execute only once and cache its result.*/
|
/*Method will execute only once and cache its result.*/
|
||||||
@Test
|
@Test
|
||||||
public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() {
|
public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() {
|
||||||
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() });
|
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() });
|
||||||
ExecutorService exec = Executors.newSingleThreadExecutor();
|
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||||
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
|
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
|
||||||
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
|
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
|
||||||
for (int i = 1; i < 4; i++) {
|
for (int i = 1; i < 4; i++) {
|
||||||
source.test()
|
source.test()
|
||||||
.awaitDone(5, TimeUnit.SECONDS)
|
.awaitDone(5, TimeUnit.SECONDS)
|
||||||
.assertResult(1, 2, 3);
|
.assertResult(1, 2, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.shutdown();
|
exec.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,20 +0,0 @@
|
||||||
## RxJava
|
|
||||||
|
|
||||||
This module contains articles about RxJava.
|
|
||||||
|
|
||||||
### Relevant articles:
|
|
||||||
|
|
||||||
- [Dealing with Backpressure with RxJava](https://www.baeldung.com/rxjava-backpressure)
|
|
||||||
- [How to Test RxJava?](https://www.baeldung.com/rxjava-testing)
|
|
||||||
- [Implementing Custom Operators in RxJava](https://www.baeldung.com/rxjava-custom-operators)
|
|
||||||
- [Introduction to RxJava](https://www.baeldung.com/rx-java)
|
|
||||||
- [Observable Utility Operators in RxJava](https://www.baeldung.com/rxjava-observable-operators)
|
|
||||||
- [Introduction to rxjava-jdbc](https://www.baeldung.com/rxjava-jdbc)
|
|
||||||
- [Schedulers in RxJava](https://www.baeldung.com/rxjava-schedulers)
|
|
||||||
- [Mathematical and Aggregate Operators in RxJava](https://www.baeldung.com/rxjava-math)
|
|
||||||
- [Combining Observables in RxJava](https://www.baeldung.com/rxjava-combine-observables)
|
|
||||||
- [RxJava StringObservable](https://www.baeldung.com/rxjava-string)
|
|
||||||
- [Filtering Observables in RxJava](https://www.baeldung.com/rxjava-filtering)
|
|
||||||
- [RxJava One Observable, Multiple Subscribers](https://www.baeldung.com/rxjava-multiple-subscribers-observable)
|
|
||||||
- [Difference Between Flatmap and Switchmap in RxJava](https://www.baeldung.com/rxjava-flatmap-switchmap)
|
|
||||||
- More articles: [[next -->]](/rxjava-2)
|
|
|
@ -1,13 +0,0 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<configuration>
|
|
||||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
|
||||||
<encoder>
|
|
||||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
|
||||||
</pattern>
|
|
||||||
</encoder>
|
|
||||||
</appender>
|
|
||||||
|
|
||||||
<root level="INFO">
|
|
||||||
<appender-ref ref="STDOUT" />
|
|
||||||
</root>
|
|
||||||
</configuration>
|
|
Loading…
Reference in New Issue