From 3bda12d20c9b820531e9feebb7bcd89a35421bf7 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Sat, 4 Feb 2017 17:00:22 +0100 Subject: [PATCH 1/4] BEAL-572 use same chedulers to be consistent --- .../java/com/baelding/rxjava/ColdObservableBackpressure.java | 2 +- .../java/com/baelding/rxjava/HotObservableOnBackPressure.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java index cebc2d35f6..b0be29957d 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java @@ -9,7 +9,7 @@ public class ColdObservableBackPressure { public static void main(String[] args) throws InterruptedException { Observable.range(1, 1_000_000) .observeOn(Schedulers.computation()) - .subscribe(v -> ComputeFunction.compute(v), Throwable::printStackTrace); + .subscribe(ComputeFunction::compute); Thread.sleep(10_000); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java index bf86312fff..124ee64d0b 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java @@ -18,7 +18,7 @@ public class HotObservableOnBackPressure { Observable.range(1, 1_000_000) .onBackpressureDrop() - .observeOn(Schedulers.io()) + .observeOn(Schedulers.computation()) .doOnNext(ComputeFunction::compute) .subscribe(v -> { }, Throwable::printStackTrace); From d4cb92250d036680bafa5a4ee322b7388d0c85b1 Mon Sep 17 00:00:00 2001 From: pivovarit Date: Sun, 5 Feb 2017 10:20:12 +0100 Subject: [PATCH 2/4] LiveTest -> Test --- ...esourceEndpointLiveTest.java => ResourceEndpointTest.java} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename spring-security-cache-control/src/test/java/com/baeldung/cachecontrol/{ResourceEndpointLiveTest.java => ResourceEndpointTest.java} (95%) diff --git a/spring-security-cache-control/src/test/java/com/baeldung/cachecontrol/ResourceEndpointLiveTest.java b/spring-security-cache-control/src/test/java/com/baeldung/cachecontrol/ResourceEndpointTest.java similarity index 95% rename from spring-security-cache-control/src/test/java/com/baeldung/cachecontrol/ResourceEndpointLiveTest.java rename to spring-security-cache-control/src/test/java/com/baeldung/cachecontrol/ResourceEndpointTest.java index 94b6052ba4..6d532f98fc 100644 --- a/spring-security-cache-control/src/test/java/com/baeldung/cachecontrol/ResourceEndpointLiveTest.java +++ b/spring-security-cache-control/src/test/java/com/baeldung/cachecontrol/ResourceEndpointTest.java @@ -11,7 +11,7 @@ import static com.jayway.restassured.RestAssured.given; @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = AppRunner.class) -public class ResourceEndpointLiveTest { +public class ResourceEndpointTest { @LocalServerPort private int serverPort; @@ -66,7 +66,7 @@ public class ResourceEndpointLiveTest { } private String getBaseUrl() { - return "http://localhost:" + serverPort; + return String.format("http://localhost:%d", serverPort); } } \ No newline at end of file From 4b0249410c4334427e1bc6ba2a580b2dde9a05f3 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Sun, 5 Feb 2017 10:27:13 +0100 Subject: [PATCH 3/4] Update README.md --- spring-core/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-core/README.md b/spring-core/README.md index f6aaaf44a0..a32d30939f 100644 --- a/spring-core/README.md +++ b/spring-core/README.md @@ -3,4 +3,4 @@ - [Exploring the Spring BeanFactory API](http://www.baeldung.com/spring-beanfactory) - [How to use the Spring FactoryBean?](http://www.baeldung.com/spring-factorybean) - [Constructor Dependency Injection in Spring](http://www.baeldung.com/constructor-injection-in-spring) -- [Constructor Injection in Spring with Lombok](http://inprogress.baeldung.com/constructor-injection-in-spring-with-lombok) +- [Constructor Injection in Spring with Lombok](http://www.baeldung.com/spring-injection-lombok) From 0e78d76f2291836c319f1b707e4ad553935fe501 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Sun, 5 Feb 2017 10:27:44 +0100 Subject: [PATCH 4/4] BAEL-572 move code to tests --- rxjava/pom.xml | 6 + .../rxjava/ColdObservableBackpressure.java | 16 -- .../HotObservableBackpressureBatching.java | 18 -- .../HotObservableBackpressureBuffering.java | 17 -- .../HotObservableBackpressureSkipping.java | 21 --- .../HotObservableWithoutBackpressure.java | 20 --- .../rxjava/RxJavaBackpressureTest.java | 155 ++++++++++++++++++ 7 files changed, 161 insertions(+), 92 deletions(-) delete mode 100644 rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java delete mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java delete mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java delete mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java delete mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java create mode 100644 rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java diff --git a/rxjava/pom.xml b/rxjava/pom.xml index 63aa1f127e..6a351d98bd 100644 --- a/rxjava/pom.xml +++ b/rxjava/pom.xml @@ -26,9 +26,15 @@ rxjava ${rx.java.version} + + junit + junit + ${junit.version} + + 4.12 1.2.5 diff --git a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java deleted file mode 100644 index abb0b99100..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.baelding.rxjava; - -import rx.Observable; -import rx.schedulers.Schedulers; - -public class ColdObservableBackpressure { - public static void main(String[] args) throws InterruptedException { - - Observable.range(1, 1_000_000) - .observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute); - - Thread.sleep(10_000); - } - -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java deleted file mode 100644 index 6acda7eaad..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.baelding.rxjava; - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -public class HotObservableBackpressureBatching { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source.window(500).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - Thread.sleep(10_000); - } - -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java deleted file mode 100644 index 50638f4c8a..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.baelding.rxjava; - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -public class HotObservableBackpressureBuffering { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source.buffer(1024).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - Thread.sleep(10_000); - } -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java deleted file mode 100644 index f6f8b9f563..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.baelding.rxjava; - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -import java.util.concurrent.TimeUnit; - -public class HotObservableBackpressureSkipping { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source.sample(100, TimeUnit.MILLISECONDS) - // .throttleFirst(100, TimeUnit.MILLISECONDS) - .observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - Thread.sleep(10_000); - } -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java deleted file mode 100644 index 7745dbe5c4..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.baelding.rxjava; - - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -public class HotObservableWithoutBackpressure { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source.observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute, Throwable::printStackTrace); - - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - Thread.sleep(10_000); - } -} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java b/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java new file mode 100644 index 0000000000..30357e7a27 --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java @@ -0,0 +1,155 @@ +package com.baeldung.rxjava; + +import org.junit.Test; +import rx.BackpressureOverflow; +import rx.Observable; +import rx.exceptions.MissingBackpressureException; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class RxJavaBackpressureTest { + + @Test + public void givenColdObservable_shouldNotThrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + + //when + Observable.range(1, 1_000_000) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + //then + testSubscriber.awaitTerminalEvent(); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + @Test + public void givenHotObservable_whenBackpressureNotDefined_shouldTrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + PublishSubject source = PublishSubject.create(); + + source.observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + //when + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + + } + + //then + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertError(MissingBackpressureException.class); + } + + @Test + public void givenHotObservable_whenWindowIsDefined_shouldNotThrowException() { + //given + TestSubscriber> testSubscriber = new TestSubscriber<>(); + PublishSubject source = PublishSubject.create(); + + //when + source.window(500) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + for (int i = 0; i < 1_000; i++) { + source.onNext(i); + } + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + @Test + public void givenHotObservable_whenBufferIsDefined_shouldNotThrowException() { + //given + TestSubscriber> testSubscriber = new TestSubscriber<>(); + PublishSubject source = PublishSubject.create(); + + //when + source.buffer(1024) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + for (int i = 0; i < 1_000; i++) { + source.onNext(i); + } + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + + @Test + public void givenHotObservable_whenSkippingOperationIsDefined_shouldNotThrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + PublishSubject source = PublishSubject.create(); + + //when + source.sample(100, TimeUnit.MILLISECONDS) +// .throttleFirst(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + for (int i = 0; i < 1_000; i++) { + source.onNext(i); + } + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + @Test + public void givenHotObservable_whenOnBackpressureBufferDefined_shouldNotThrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + + //when + Observable.range(1, 1_000_000) + .onBackpressureBuffer(16, () -> { + }, + BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } + + + @Test + public void givenHotObservable_whenOnBackpressureDropDefined_shouldNotThrowException() { + //given + TestSubscriber testSubscriber = new TestSubscriber<>(); + + //when + Observable.range(1, 1_000_000) + .onBackpressureDrop() + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + //then + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + + } +}