From 694931a190f91e6b6e3ec440408fe107753f686d Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Wed, 1 Feb 2017 10:58:07 +0100 Subject: [PATCH 1/4] BEAL-572 rx project structure, hot cold observables --- rxjava/pom.xml | 35 +++++++++++++++++++ .../rxjava/HotObservableBackpressure.java | 30 ++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 rxjava/pom.xml create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java diff --git a/rxjava/pom.xml b/rxjava/pom.xml new file mode 100644 index 0000000000..63aa1f127e --- /dev/null +++ b/rxjava/pom.xml @@ -0,0 +1,35 @@ + + + 4.0.0 + + com.baeldung + rxjava + 1.0-SNAPSHOT + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + + + io.reactivex + rxjava + ${rx.java.version} + + + + + 1.2.5 + + + \ No newline at end of file diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java new file mode 100644 index 0000000000..bc4f6710e5 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java @@ -0,0 +1,30 @@ +package com.baelding.rxjava; + + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +public class HotObservableBackPressure { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + source + .observeOn(Schedulers.computation()) + .subscribe(v -> compute(v), Throwable::printStackTrace); + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + + Thread.sleep(10_000); + } + + private static void compute(Integer v) { + try { + System.out.println("compute integer v: "+ v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} From 2065c3ca514c9a5c3a60c1610d059c0995534f46 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Wed, 1 Feb 2017 15:12:51 +0100 Subject: [PATCH 2/4] BEAL-572 different strategies of handling hot-observable --- .../rxjava/ColdObservableBackpressure.java | 44 +++++++++++++++++++ .../com/baelding/rxjava/ComputeFunction.java | 37 ++++++++++++++++ .../HotObservableBackPressureBatching.java | 22 ++++++++++ .../HotObservableBackPressureBuffering.java | 23 ++++++++++ .../HotObservableBackPressureSkipping.java | 27 ++++++++++++ .../rxjava/HotObservableBackpressure.java | 30 ------------- .../HotObservableWithoutBackPressure.java | 21 +++++++++ 7 files changed, 174 insertions(+), 30 deletions(-) create mode 100644 rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java delete mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java diff --git a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java new file mode 100644 index 0000000000..cebc2d35f6 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java @@ -0,0 +1,44 @@ +package com.baelding.rxjava; + + +import rx.Observable; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +public class ColdObservableBackPressure { + public static void main(String[] args) throws InterruptedException { + Observable.range(1, 1_000_000) + .observeOn(Schedulers.computation()) + .subscribe(v -> ComputeFunction.compute(v), Throwable::printStackTrace); + + Thread.sleep(10_000); + + +// Observable.range(1, 1_000_000) //implementation of reactive pull backpressure on cold observable +// .subscribe(new Subscriber() { +// @Override +// public void onStart() { +// request(1); +// } +// +// public void onNext(Integer v) { +// compute(v); +// +// request(1); +// } +// +// @Override +// public void onError(Throwable ex) { +// ex.printStackTrace(); +// } +// +// @Override +// public void onCompleted() { +// System.out.println("Done!"); +// } +// }); + + } + + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java new file mode 100644 index 0000000000..f83f34c3ee --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java @@ -0,0 +1,37 @@ +package com.baelding.rxjava; + + +import rx.Observable; + +import java.util.List; + +public class ComputeFunction { + public static void compute(Integer v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void compute(List v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void compute(Observable v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java new file mode 100644 index 0000000000..42c705774a --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java @@ -0,0 +1,22 @@ +package com.baelding.rxjava; + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +public class HotObservableBackPressureBatching { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source.window(500) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java new file mode 100644 index 0000000000..38451bab05 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java @@ -0,0 +1,23 @@ +package com.baelding.rxjava; + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + + +public class HotObservableBackPressureBuffering { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source + .buffer(1024) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java new file mode 100644 index 0000000000..691198c0fa --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java @@ -0,0 +1,27 @@ +package com.baelding.rxjava; + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +import java.util.concurrent.TimeUnit; + +public class HotObservableBackPressureSkipping { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source +// .debounce(1, TimeUnit.SECONDS) +// .sample(1, TimeUnit.SECONDS) +// .throttleFirst(100, TimeUnit.MILLISECONDS) + .throttleLast(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java deleted file mode 100644 index bc4f6710e5..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.baelding.rxjava; - - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -public class HotObservableBackPressure { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source - .observeOn(Schedulers.computation()) - .subscribe(v -> compute(v), Throwable::printStackTrace); - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - - Thread.sleep(10_000); - } - - private static void compute(Integer v) { - try { - System.out.println("compute integer v: "+ v); - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java new file mode 100644 index 0000000000..eea595b3b4 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java @@ -0,0 +1,21 @@ +package com.baelding.rxjava; + + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +public class HotObservableWithoutBackPressure { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source.observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } +} From 51d8c0cdb7caccf19aa743fcab8282c43c77a25b Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Wed, 1 Feb 2017 16:43:15 +0100 Subject: [PATCH 3/4] BEAL-572 onBackpressure operations --- .../com/baelding/rxjava/ComputeFunction.java | 8 +++++ .../rxjava/HotObservableOnBackPressure.java | 31 +++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java diff --git a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java index f83f34c3ee..557eb04612 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java @@ -34,4 +34,12 @@ public class ComputeFunction { } + public static void compute(Long v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java new file mode 100644 index 0000000000..f61536e827 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java @@ -0,0 +1,31 @@ +package com.baelding.rxjava; + + +import rx.BackpressureOverflow; +import rx.Observable; +import rx.schedulers.Schedulers; + +import java.util.concurrent.TimeUnit; + +public class HotObservableOnBackPressure { + public static void main(String[] args) throws InterruptedException { + Observable.range(1, 1_000_000) + .onBackpressureBuffer(16, () -> { + }, + BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) + .observeOn(Schedulers.computation()) + .subscribe(e -> { + }, Throwable::printStackTrace); + + + Observable.interval(1, TimeUnit.MINUTES) + .onBackpressureDrop() +// .onBackpressureLatest() + .observeOn(Schedulers.io()) + .doOnNext(ComputeFunction::compute) + .subscribe(v -> { + }, Throwable::printStackTrace); + Thread.sleep(10_000); + + } +} From c8d818c2f55c4f88498364d8697a7e92362a5242 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Thu, 2 Feb 2017 21:36:11 +0100 Subject: [PATCH 4/4] BEAL-572 make examples simpler --- .../main/java/com/baelding/rxjava/ComputeFunction.java | 2 +- .../rxjava/HotObservableBackPressureBatching.java | 1 - .../rxjava/HotObservableBackPressureBuffering.java | 1 - .../rxjava/HotObservableBackPressureSkipping.java | 8 ++------ .../com/baelding/rxjava/HotObservableOnBackPressure.java | 5 +---- .../baelding/rxjava/HotObservableWithoutBackPressure.java | 3 +-- 6 files changed, 5 insertions(+), 15 deletions(-) diff --git a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java index 557eb04612..bebccd0300 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java @@ -26,7 +26,7 @@ public class ComputeFunction { public static void compute(Observable v) { try { - System.out.println("compute integer v: " + v); + v.forEach(System.out::println); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java index 42c705774a..c13f94c4f7 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java @@ -7,7 +7,6 @@ public class HotObservableBackPressureBatching { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - //buffer source.window(500) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java index 38451bab05..67922e2efb 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java @@ -8,7 +8,6 @@ public class HotObservableBackPressureBuffering { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - //buffer source .buffer(1024) .observeOn(Schedulers.computation()) diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java index 691198c0fa..2e8244166f 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java @@ -9,12 +9,8 @@ public class HotObservableBackPressureSkipping { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - //buffer - source -// .debounce(1, TimeUnit.SECONDS) -// .sample(1, TimeUnit.SECONDS) -// .throttleFirst(100, TimeUnit.MILLISECONDS) - .throttleLast(100, TimeUnit.MILLISECONDS) + source.sample(100, TimeUnit.MILLISECONDS) +// .throttleFirst(100, TimeUnit.MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java index f61536e827..bf86312fff 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java @@ -5,8 +5,6 @@ import rx.BackpressureOverflow; import rx.Observable; import rx.schedulers.Schedulers; -import java.util.concurrent.TimeUnit; - public class HotObservableOnBackPressure { public static void main(String[] args) throws InterruptedException { Observable.range(1, 1_000_000) @@ -18,9 +16,8 @@ public class HotObservableOnBackPressure { }, Throwable::printStackTrace); - Observable.interval(1, TimeUnit.MINUTES) + Observable.range(1, 1_000_000) .onBackpressureDrop() -// .onBackpressureLatest() .observeOn(Schedulers.io()) .doOnNext(ComputeFunction::compute) .subscribe(v -> { diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java index eea595b3b4..07ff5b42c0 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java @@ -8,9 +8,8 @@ public class HotObservableWithoutBackPressure { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - //buffer source.observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) {