diff --git a/rxjava/pom.xml b/rxjava/pom.xml index 63aa1f127e..6a351d98bd 100644 --- a/rxjava/pom.xml +++ b/rxjava/pom.xml @@ -26,9 +26,15 @@ rxjava ${rx.java.version} + + junit + junit + ${junit.version} + + 4.12 1.2.5 diff --git a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java deleted file mode 100644 index abb0b99100..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.baelding.rxjava; - -import rx.Observable; -import rx.schedulers.Schedulers; - -public class ColdObservableBackpressure { - public static void main(String[] args) throws InterruptedException { - - Observable.range(1, 1_000_000) - .observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute); - - Thread.sleep(10_000); - } - -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java deleted file mode 100644 index 6acda7eaad..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.baelding.rxjava; - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -public class HotObservableBackpressureBatching { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source.window(500).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - Thread.sleep(10_000); - } - -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java deleted file mode 100644 index 50638f4c8a..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.baelding.rxjava; - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -public class HotObservableBackpressureBuffering { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source.buffer(1024).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - Thread.sleep(10_000); - } -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java deleted file mode 100644 index f6f8b9f563..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.baelding.rxjava; - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -import java.util.concurrent.TimeUnit; - -public class HotObservableBackpressureSkipping { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source.sample(100, TimeUnit.MILLISECONDS) - // .throttleFirst(100, TimeUnit.MILLISECONDS) - .observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - Thread.sleep(10_000); - } -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java deleted file mode 100644 index 7745dbe5c4..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.baelding.rxjava; - - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -public class HotObservableWithoutBackpressure { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source.observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute, Throwable::printStackTrace); - - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - Thread.sleep(10_000); - } -} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java b/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java new file mode 100644 index 0000000000..30357e7a27 --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java @@ -0,0 +1,155 @@ +package com.baeldung.rxjava; + +import org.junit.Test; +import rx.BackpressureOverflow; +import rx.Observable; +import rx.exceptions.MissingBackpressureException; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class RxJavaBackpressureTest { + + @Test + public void givenColdObservable_shouldNotThrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + + //when + Observable.range(1, 1_000_000) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + //then + testSubscriber.awaitTerminalEvent(); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + @Test + public void givenHotObservable_whenBackpressureNotDefined_shouldTrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + PublishSubject source = PublishSubject.create(); + + source.observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + //when + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + + } + + //then + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertError(MissingBackpressureException.class); + } + + @Test + public void givenHotObservable_whenWindowIsDefined_shouldNotThrowException() { + //given + TestSubscriber> testSubscriber = new TestSubscriber<>(); + PublishSubject source = PublishSubject.create(); + + //when + source.window(500) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + for (int i = 0; i < 1_000; i++) { + source.onNext(i); + } + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + @Test + public void givenHotObservable_whenBufferIsDefined_shouldNotThrowException() { + //given + TestSubscriber> testSubscriber = new TestSubscriber<>(); + PublishSubject source = PublishSubject.create(); + + //when + source.buffer(1024) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + for (int i = 0; i < 1_000; i++) { + source.onNext(i); + } + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + + @Test + public void givenHotObservable_whenSkippingOperationIsDefined_shouldNotThrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + PublishSubject source = PublishSubject.create(); + + //when + source.sample(100, TimeUnit.MILLISECONDS) +// .throttleFirst(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + for (int i = 0; i < 1_000; i++) { + source.onNext(i); + } + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + @Test + public void givenHotObservable_whenOnBackpressureBufferDefined_shouldNotThrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + + //when + Observable.range(1, 1_000_000) + .onBackpressureBuffer(16, () -> { + }, + BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + + @Test + public void givenHotObservable_whenOnBackpressureDropDefined_shouldNotThrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + + //when + Observable.range(1, 1_000_000) + .onBackpressureDrop() + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } +}