BEAL-572 make examples simpler

This commit is contained in:
Tomasz Lelek 2017-02-02 21:36:11 +01:00
parent 51d8c0cdb7
commit c8d818c2f5
6 changed files with 5 additions and 15 deletions

View File

@ -26,7 +26,7 @@ public class ComputeFunction {
public static void compute(Observable<Integer> v) { public static void compute(Observable<Integer> v) {
try { try {
System.out.println("compute integer v: " + v); v.forEach(System.out::println);
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();

View File

@ -7,7 +7,6 @@ public class HotObservableBackPressureBatching {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
PublishSubject<Integer> source = PublishSubject.<Integer>create(); PublishSubject<Integer> source = PublishSubject.<Integer>create();
//buffer
source.window(500) source.window(500)
.observeOn(Schedulers.computation()) .observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace); .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

View File

@ -8,7 +8,6 @@ public class HotObservableBackPressureBuffering {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
PublishSubject<Integer> source = PublishSubject.<Integer>create(); PublishSubject<Integer> source = PublishSubject.<Integer>create();
//buffer
source source
.buffer(1024) .buffer(1024)
.observeOn(Schedulers.computation()) .observeOn(Schedulers.computation())

View File

@ -9,12 +9,8 @@ public class HotObservableBackPressureSkipping {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
PublishSubject<Integer> source = PublishSubject.<Integer>create(); PublishSubject<Integer> source = PublishSubject.<Integer>create();
//buffer source.sample(100, TimeUnit.MILLISECONDS)
source
// .debounce(1, TimeUnit.SECONDS)
// .sample(1, TimeUnit.SECONDS)
// .throttleFirst(100, TimeUnit.MILLISECONDS) // .throttleFirst(100, TimeUnit.MILLISECONDS)
.throttleLast(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation()) .observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace); .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

View File

@ -5,8 +5,6 @@ import rx.BackpressureOverflow;
import rx.Observable; import rx.Observable;
import rx.schedulers.Schedulers; import rx.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class HotObservableOnBackPressure { public class HotObservableOnBackPressure {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
Observable.range(1, 1_000_000) Observable.range(1, 1_000_000)
@ -18,9 +16,8 @@ public class HotObservableOnBackPressure {
}, Throwable::printStackTrace); }, Throwable::printStackTrace);
Observable.interval(1, TimeUnit.MINUTES) Observable.range(1, 1_000_000)
.onBackpressureDrop() .onBackpressureDrop()
// .onBackpressureLatest()
.observeOn(Schedulers.io()) .observeOn(Schedulers.io())
.doOnNext(ComputeFunction::compute) .doOnNext(ComputeFunction::compute)
.subscribe(v -> { .subscribe(v -> {

View File

@ -8,7 +8,6 @@ public class HotObservableWithoutBackPressure {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
PublishSubject<Integer> source = PublishSubject.<Integer>create(); PublishSubject<Integer> source = PublishSubject.<Integer>create();
//buffer
source.observeOn(Schedulers.computation()) source.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace); .subscribe(ComputeFunction::compute, Throwable::printStackTrace);