From 2dcc2311c5775b388cd7a01e55b8b53024ae218f Mon Sep 17 00:00:00 2001 From: Pedja Date: Fri, 3 Feb 2017 17:37:54 +0100 Subject: [PATCH] BAEL-572 BackPressure -> Backpressure; code reformatted; included rxjava in parent pom --- pom.xml | 1 + .../rxjava/ColdObservableBackpressure.java | 56 +++++++++---------- .../com/baelding/rxjava/ComputeFunction.java | 2 - ...=> HotObservableBackpressureBatching.java} | 7 +-- ...> HotObservableBackpressureBuffering.java} | 9 +-- ...=> HotObservableBackpressureSkipping.java} | 8 +-- .../rxjava/HotObservableOnBackPressure.java | 28 ---------- .../rxjava/HotObservableOnBackpressure.java | 18 ++++++ ... => HotObservableWithoutBackpressure.java} | 2 +- 9 files changed, 52 insertions(+), 79 deletions(-) rename rxjava/src/main/java/com/baelding/rxjava/{HotObservableBackPressureBatching.java => HotObservableBackpressureBatching.java} (63%) rename rxjava/src/main/java/com/baelding/rxjava/{HotObservableBackPressureBuffering.java => HotObservableBackpressureBuffering.java} (61%) rename rxjava/src/main/java/com/baelding/rxjava/{HotObservableBackPressureSkipping.java => HotObservableBackpressureSkipping.java} (65%) delete mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackpressure.java rename rxjava/src/main/java/com/baelding/rxjava/{HotObservableWithoutBackPressure.java => HotObservableWithoutBackpressure.java} (91%) diff --git a/pom.xml b/pom.xml index 06feb5e4f5..41235dcc26 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ rest-assured rest-testing resteasy + rxjava selenium-junit-testng solr-fulltext-search diff --git a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java index cebc2d35f6..9855123a3b 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java @@ -1,44 +1,38 @@ package com.baelding.rxjava; - import rx.Observable; -import rx.Subscriber; import rx.schedulers.Schedulers; -public class ColdObservableBackPressure { +public class ColdObservableBackpressure { public static void main(String[] args) throws InterruptedException { - Observable.range(1, 1_000_000) - .observeOn(Schedulers.computation()) - .subscribe(v -> ComputeFunction.compute(v), Throwable::printStackTrace); + Observable.range(1, 1_000_000).observeOn(Schedulers.computation()).subscribe(v -> ComputeFunction.compute(v), Throwable::printStackTrace); Thread.sleep(10_000); - -// Observable.range(1, 1_000_000) //implementation of reactive pull backpressure on cold observable -// .subscribe(new Subscriber() { -// @Override -// public void onStart() { -// request(1); -// } -// -// public void onNext(Integer v) { -// compute(v); -// -// request(1); -// } -// -// @Override -// public void onError(Throwable ex) { -// ex.printStackTrace(); -// } -// -// @Override -// public void onCompleted() { -// System.out.println("Done!"); -// } -// }); + // Observable.range(1, 1_000_000) //implementation of reactive pull backpressure on cold observable + // .subscribe(new Subscriber() { + // @Override + // public void onStart() { + // request(1); + // } + // + // public void onNext(Integer v) { + // compute(v); + // + // request(1); + // } + // + // @Override + // public void onError(Throwable ex) { + // ex.printStackTrace(); + // } + // + // @Override + // public void onCompleted() { + // System.out.println("Done!"); + // } + // }); } - } diff --git a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java index bebccd0300..924862ab37 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java @@ -1,6 +1,5 @@ package com.baelding.rxjava; - import rx.Observable; import java.util.List; @@ -33,7 +32,6 @@ public class ComputeFunction { } } - public static void compute(Long v) { try { System.out.println("compute integer v: " + v); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java similarity index 63% rename from rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java rename to rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java index c13f94c4f7..6acda7eaad 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java @@ -3,14 +3,11 @@ package com.baelding.rxjava; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; -public class HotObservableBackPressureBatching { +public class HotObservableBackpressureBatching { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - source.window(500) - .observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute, Throwable::printStackTrace); - + source.window(500).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java similarity index 61% rename from rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java rename to rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java index 67922e2efb..50638f4c8a 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java @@ -3,16 +3,11 @@ package com.baelding.rxjava; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; - -public class HotObservableBackPressureBuffering { +public class HotObservableBackpressureBuffering { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - source - .buffer(1024) - .observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute, Throwable::printStackTrace); - + source.buffer(1024).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java similarity index 65% rename from rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java rename to rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java index 2e8244166f..f6f8b9f563 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java @@ -5,15 +5,13 @@ import rx.subjects.PublishSubject; import java.util.concurrent.TimeUnit; -public class HotObservableBackPressureSkipping { +public class HotObservableBackpressureSkipping { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); source.sample(100, TimeUnit.MILLISECONDS) -// .throttleFirst(100, TimeUnit.MILLISECONDS) - .observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute, Throwable::printStackTrace); - + // .throttleFirst(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java deleted file mode 100644 index bf86312fff..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.baelding.rxjava; - - -import rx.BackpressureOverflow; -import rx.Observable; -import rx.schedulers.Schedulers; - -public class HotObservableOnBackPressure { - public static void main(String[] args) throws InterruptedException { - Observable.range(1, 1_000_000) - .onBackpressureBuffer(16, () -> { - }, - BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) - .observeOn(Schedulers.computation()) - .subscribe(e -> { - }, Throwable::printStackTrace); - - - Observable.range(1, 1_000_000) - .onBackpressureDrop() - .observeOn(Schedulers.io()) - .doOnNext(ComputeFunction::compute) - .subscribe(v -> { - }, Throwable::printStackTrace); - Thread.sleep(10_000); - - } -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackpressure.java new file mode 100644 index 0000000000..afef8027bf --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackpressure.java @@ -0,0 +1,18 @@ +package com.baelding.rxjava; + +import rx.BackpressureOverflow; +import rx.Observable; +import rx.schedulers.Schedulers; + +public class HotObservableOnBackpressure { + public static void main(String[] args) throws InterruptedException { + Observable.range(1, 1_000_000).onBackpressureBuffer(16, () -> { + }, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(e -> { + }, Throwable::printStackTrace); + + Observable.range(1, 1_000_000).onBackpressureDrop().observeOn(Schedulers.io()).doOnNext(ComputeFunction::compute).subscribe(v -> { + }, Throwable::printStackTrace); + Thread.sleep(10_000); + + } +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java similarity index 91% rename from rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java rename to rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java index 07ff5b42c0..7745dbe5c4 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java @@ -4,7 +4,7 @@ package com.baelding.rxjava; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; -public class HotObservableWithoutBackPressure { +public class HotObservableWithoutBackpressure { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create();