From a32cb1a48968e1a33fa0fa161b2c9d12a2eaf132 Mon Sep 17 00:00:00 2001 From: TINO Date: Sun, 18 Nov 2018 21:53:48 +0300 Subject: [PATCH] BAEL - 1511 --- rxjava-2/pom.xml | 8 ++ ...yncAndSyncToObservableIntegrationTest.java | 103 ++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java diff --git a/rxjava-2/pom.xml b/rxjava-2/pom.xml index a18b096b6d..f437a6005b 100644 --- a/rxjava-2/pom.xml +++ b/rxjava-2/pom.xml @@ -34,6 +34,13 @@ rxrelay ${rxrelay.version} + + + com.github.akarnokd + rxjava2-extensions + ${rxjava2.ext.version} + + @@ -41,5 +48,6 @@ 2.2.2 1.7.0 2.0.0 + 0.20.4 \ No newline at end of file diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java b/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java new file mode 100644 index 0000000000..a646b453ff --- /dev/null +++ b/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java @@ -0,0 +1,103 @@ +package com.baeldung.rxjava; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import hu.akarnokd.rxjava2.async.AsyncObservable; +import io.reactivex.Observable; + +public class AsyncAndSyncToObservableIntegrationTest { + + AtomicInteger counter = new AtomicInteger(); + Callable callable = () -> counter.incrementAndGet(); + + @Test + public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() {// method will execute every time it gets subscribed + + Observable source = Observable.fromCallable(callable); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(i); + + assertEquals(i, counter.get()); + } + } + + @Test + public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() {// method will execute only once and cache its result. + + Observable source = AsyncObservable.start(callable); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + + assertEquals(1, counter.get()); + } + } + + @Test + public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() { // method will execute only once and cache its result. + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(callable); + Observable source = Observable.fromFuture(future); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + + assertEquals(1, counter.get()); + } + + executor.shutdown(); + } + + @Test + public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() {// method will execute every time it gets subscribed + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Observable source = AsyncObservable.startFuture(() -> executor.submit(callable)); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(i); + + assertEquals(i, counter.get()); + } + + executor.shutdown(); + } + + @Test + public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() { // method will execute only once and cache its result. + + List list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() }); + ExecutorService exec = Executors.newSingleThreadExecutor(); + Callable> callable = () -> Observable.fromIterable(list); + Observable source = AsyncObservable.deferFuture(() -> exec.submit(callable)); + for (int i = 1; i < 4; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3); + } + + exec.shutdown(); + } + +}