diff --git a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java index f83f34c3ee..557eb04612 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java @@ -34,4 +34,12 @@ public class ComputeFunction { } + public static void compute(Long v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java new file mode 100644 index 0000000000..f61536e827 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java @@ -0,0 +1,31 @@ +package com.baelding.rxjava; + + +import rx.BackpressureOverflow; +import rx.Observable; +import rx.schedulers.Schedulers; + +import java.util.concurrent.TimeUnit; + +public class HotObservableOnBackPressure { + public static void main(String[] args) throws InterruptedException { + Observable.range(1, 1_000_000) + .onBackpressureBuffer(16, () -> { + }, + BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) + .observeOn(Schedulers.computation()) + .subscribe(e -> { + }, Throwable::printStackTrace); + + + Observable.interval(1, TimeUnit.MINUTES) + .onBackpressureDrop() +// .onBackpressureLatest() + .observeOn(Schedulers.io()) + .doOnNext(ComputeFunction::compute) + .subscribe(v -> { + }, Throwable::printStackTrace); + Thread.sleep(10_000); + + } +}