From 2065c3ca514c9a5c3a60c1610d059c0995534f46 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Wed, 1 Feb 2017 15:12:51 +0100 Subject: [PATCH] BEAL-572 different strategies of handling hot-observable --- .../rxjava/ColdObservableBackpressure.java | 44 +++++++++++++++++++ .../com/baelding/rxjava/ComputeFunction.java | 37 ++++++++++++++++ .../HotObservableBackPressureBatching.java | 22 ++++++++++ .../HotObservableBackPressureBuffering.java | 23 ++++++++++ .../HotObservableBackPressureSkipping.java | 27 ++++++++++++ .../rxjava/HotObservableBackpressure.java | 30 ------------- .../HotObservableWithoutBackPressure.java | 21 +++++++++ 7 files changed, 174 insertions(+), 30 deletions(-) create mode 100644 rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java delete mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java create mode 100644 rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java diff --git a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java new file mode 100644 index 0000000000..cebc2d35f6 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java @@ -0,0 +1,44 @@ +package com.baelding.rxjava; + + +import rx.Observable; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +public class ColdObservableBackPressure { + public static void main(String[] args) throws InterruptedException { + Observable.range(1, 1_000_000) + .observeOn(Schedulers.computation()) + .subscribe(v -> ComputeFunction.compute(v), Throwable::printStackTrace); + + Thread.sleep(10_000); + + +// Observable.range(1, 1_000_000) //implementation of reactive pull backpressure on cold observable +// .subscribe(new Subscriber() { +// @Override +// public void onStart() { +// request(1); +// } +// +// public void onNext(Integer v) { +// compute(v); +// +// request(1); +// } +// +// @Override +// public void onError(Throwable ex) { +// ex.printStackTrace(); +// } +// +// @Override +// public void onCompleted() { +// System.out.println("Done!"); +// } +// }); + + } + + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java new file mode 100644 index 0000000000..f83f34c3ee --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java @@ -0,0 +1,37 @@ +package com.baelding.rxjava; + + +import rx.Observable; + +import java.util.List; + +public class ComputeFunction { + public static void compute(Integer v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void compute(List v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void compute(Observable v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java new file mode 100644 index 0000000000..42c705774a --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java @@ -0,0 +1,22 @@ +package com.baelding.rxjava; + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +public class HotObservableBackPressureBatching { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source.window(500) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java new file mode 100644 index 0000000000..38451bab05 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java @@ -0,0 +1,23 @@ +package com.baelding.rxjava; + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + + +public class HotObservableBackPressureBuffering { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source + .buffer(1024) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java new file mode 100644 index 0000000000..691198c0fa --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java @@ -0,0 +1,27 @@ +package com.baelding.rxjava; + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +import java.util.concurrent.TimeUnit; + +public class HotObservableBackPressureSkipping { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source +// .debounce(1, TimeUnit.SECONDS) +// .sample(1, TimeUnit.SECONDS) +// .throttleFirst(100, TimeUnit.MILLISECONDS) + .throttleLast(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java deleted file mode 100644 index bc4f6710e5..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.baelding.rxjava; - - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -public class HotObservableBackPressure { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source - .observeOn(Schedulers.computation()) - .subscribe(v -> compute(v), Throwable::printStackTrace); - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - - Thread.sleep(10_000); - } - - private static void compute(Integer v) { - try { - System.out.println("compute integer v: "+ v); - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java new file mode 100644 index 0000000000..eea595b3b4 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java @@ -0,0 +1,21 @@ +package com.baelding.rxjava; + + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +public class HotObservableWithoutBackPressure { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source.observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } +}