diff --git a/rxjava-2/pom.xml b/rxjava-2/pom.xml
index a18b096b6d..f437a6005b 100644
--- a/rxjava-2/pom.xml
+++ b/rxjava-2/pom.xml
@@ -34,6 +34,13 @@
rxrelay
${rxrelay.version}
+
+
+ com.github.akarnokd
+ rxjava2-extensions
+ ${rxjava2.ext.version}
+
+
@@ -41,5 +48,6 @@
2.2.2
1.7.0
2.0.0
+ 0.20.4
\ No newline at end of file
diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java b/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java
new file mode 100644
index 0000000000..a646b453ff
--- /dev/null
+++ b/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java
@@ -0,0 +1,103 @@
+package com.baeldung.rxjava;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import hu.akarnokd.rxjava2.async.AsyncObservable;
+import io.reactivex.Observable;
+
+public class AsyncAndSyncToObservableIntegrationTest {
+
+ AtomicInteger counter = new AtomicInteger();
+ Callable callable = () -> counter.incrementAndGet();
+
+ @Test
+ public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() {// method will execute every time it gets subscribed
+
+ Observable source = Observable.fromCallable(callable);
+
+ for (int i = 1; i < 5; i++) {
+ source.test()
+ .awaitDone(5, TimeUnit.SECONDS)
+ .assertResult(i);
+
+ assertEquals(i, counter.get());
+ }
+ }
+
+ @Test
+ public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() {// method will execute only once and cache its result.
+
+ Observable source = AsyncObservable.start(callable);
+
+ for (int i = 1; i < 5; i++) {
+ source.test()
+ .awaitDone(5, TimeUnit.SECONDS)
+ .assertResult(1);
+
+ assertEquals(1, counter.get());
+ }
+ }
+
+ @Test
+ public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() { // method will execute only once and cache its result.
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future future = executor.submit(callable);
+ Observable source = Observable.fromFuture(future);
+
+ for (int i = 1; i < 5; i++) {
+ source.test()
+ .awaitDone(5, TimeUnit.SECONDS)
+ .assertResult(1);
+
+ assertEquals(1, counter.get());
+ }
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() {// method will execute every time it gets subscribed
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Observable source = AsyncObservable.startFuture(() -> executor.submit(callable));
+
+ for (int i = 1; i < 5; i++) {
+ source.test()
+ .awaitDone(5, TimeUnit.SECONDS)
+ .assertResult(i);
+
+ assertEquals(i, counter.get());
+ }
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() { // method will execute only once and cache its result.
+
+ List list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() });
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ Callable> callable = () -> Observable.fromIterable(list);
+ Observable source = AsyncObservable.deferFuture(() -> exec.submit(callable));
+ for (int i = 1; i < 4; i++) {
+ source.test()
+ .awaitDone(5, TimeUnit.SECONDS)
+ .assertResult(1, 2, 3);
+ }
+
+ exec.shutdown();
+ }
+
+}