Reformat Rxjava examples
This commit is contained in:
		
							parent
							
								
									10eb6bfcc0
								
							
						
					
					
						commit
						6d0dcd52fa
					
				| @ -5,7 +5,10 @@ import rx.schedulers.Schedulers; | |||||||
| 
 | 
 | ||||||
| public class ColdObservableBackpressure { | public class ColdObservableBackpressure { | ||||||
|     public static void main(String[] args) throws InterruptedException { |     public static void main(String[] args) throws InterruptedException { | ||||||
|         Observable.range(1, 1_000_000).observeOn(Schedulers.computation()).subscribe(v -> ComputeFunction.compute(v), Throwable::printStackTrace); |         Observable | ||||||
|  |           .range(1, 1_000_000) | ||||||
|  |           .observeOn(Schedulers.computation()) | ||||||
|  |           .subscribe(ComputeFunction::compute, Throwable::printStackTrace); | ||||||
| 
 | 
 | ||||||
|         Thread.sleep(10_000); |         Thread.sleep(10_000); | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -5,9 +5,12 @@ import rx.subjects.PublishSubject; | |||||||
| 
 | 
 | ||||||
| public class HotObservableBackpressureBatching { | public class HotObservableBackpressureBatching { | ||||||
|     public static void main(String[] args) throws InterruptedException { |     public static void main(String[] args) throws InterruptedException { | ||||||
|         PublishSubject<Integer> source = PublishSubject.<Integer>create(); |         PublishSubject<Integer> source = PublishSubject.<Integer> create(); | ||||||
| 
 | 
 | ||||||
|         source.window(500).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); |         source | ||||||
|  |           .window(500) | ||||||
|  |           .observeOn(Schedulers.computation()) | ||||||
|  |           .subscribe(ComputeFunction::compute, Throwable::printStackTrace); | ||||||
| 
 | 
 | ||||||
|         for (int i = 0; i < 1_000_000; i++) { |         for (int i = 0; i < 1_000_000; i++) { | ||||||
|             source.onNext(i); |             source.onNext(i); | ||||||
|  | |||||||
| @ -5,9 +5,12 @@ import rx.subjects.PublishSubject; | |||||||
| 
 | 
 | ||||||
| public class HotObservableBackpressureBuffering { | public class HotObservableBackpressureBuffering { | ||||||
|     public static void main(String[] args) throws InterruptedException { |     public static void main(String[] args) throws InterruptedException { | ||||||
|         PublishSubject<Integer> source = PublishSubject.<Integer>create(); |         PublishSubject<Integer> source = PublishSubject.<Integer> create(); | ||||||
| 
 | 
 | ||||||
|         source.buffer(1024).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); |         source | ||||||
|  |           .buffer(1024) | ||||||
|  |           .observeOn(Schedulers.computation()) | ||||||
|  |           .subscribe(ComputeFunction::compute, Throwable::printStackTrace); | ||||||
| 
 | 
 | ||||||
|         for (int i = 0; i < 1_000_000; i++) { |         for (int i = 0; i < 1_000_000; i++) { | ||||||
|             source.onNext(i); |             source.onNext(i); | ||||||
|  | |||||||
| @ -7,11 +7,12 @@ import java.util.concurrent.TimeUnit; | |||||||
| 
 | 
 | ||||||
| public class HotObservableBackpressureSkipping { | public class HotObservableBackpressureSkipping { | ||||||
|     public static void main(String[] args) throws InterruptedException { |     public static void main(String[] args) throws InterruptedException { | ||||||
|         PublishSubject<Integer> source = PublishSubject.<Integer>create(); |         PublishSubject<Integer> source = PublishSubject.<Integer> create(); | ||||||
| 
 | 
 | ||||||
|         source.sample(100, TimeUnit.MILLISECONDS) |         source.sample(100, TimeUnit.MILLISECONDS) | ||||||
|             //                .throttleFirst(100, TimeUnit.MILLISECONDS) |           //.throttleFirst(100, TimeUnit.MILLISECONDS) | ||||||
|             .observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); |           .observeOn(Schedulers.computation()) | ||||||
|  |           .subscribe(ComputeFunction::compute, Throwable::printStackTrace); | ||||||
| 
 | 
 | ||||||
|         for (int i = 0; i < 1_000_000; i++) { |         for (int i = 0; i < 1_000_000; i++) { | ||||||
|             source.onNext(i); |             source.onNext(i); | ||||||
|  | |||||||
| @ -6,11 +6,20 @@ import rx.schedulers.Schedulers; | |||||||
| 
 | 
 | ||||||
| public class HotObservableOnBackpressure { | public class HotObservableOnBackpressure { | ||||||
|     public static void main(String[] args) throws InterruptedException { |     public static void main(String[] args) throws InterruptedException { | ||||||
|         Observable.range(1, 1_000_000).onBackpressureBuffer(16, () -> { |         Observable | ||||||
|         }, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(e -> { |           .range(1, 1_000_000) | ||||||
|  |           .onBackpressureBuffer(16, () -> { | ||||||
|  |           }, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) | ||||||
|  |           .observeOn(Schedulers.computation()) | ||||||
|  |           .subscribe(e -> { | ||||||
|           }, Throwable::printStackTrace); |           }, Throwable::printStackTrace); | ||||||
| 
 | 
 | ||||||
|         Observable.range(1, 1_000_000).onBackpressureDrop().observeOn(Schedulers.io()).doOnNext(ComputeFunction::compute).subscribe(v -> { |         Observable | ||||||
|  |           .range(1, 1_000_000) | ||||||
|  |           .onBackpressureDrop() | ||||||
|  |           .observeOn(Schedulers.io()) | ||||||
|  |           .doOnNext(ComputeFunction::compute) | ||||||
|  |           .subscribe(v -> { | ||||||
|           }, Throwable::printStackTrace); |           }, Throwable::printStackTrace); | ||||||
|         Thread.sleep(10_000); |         Thread.sleep(10_000); | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -1,17 +1,16 @@ | |||||||
| package com.baelding.rxjava; | package com.baelding.rxjava; | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
| import rx.schedulers.Schedulers; | import rx.schedulers.Schedulers; | ||||||
| import rx.subjects.PublishSubject; | import rx.subjects.PublishSubject; | ||||||
| 
 | 
 | ||||||
| public class HotObservableWithoutBackpressure { | public class HotObservableWithoutBackpressure { | ||||||
|     public static void main(String[] args) throws InterruptedException { |     public static void main(String[] args) throws InterruptedException { | ||||||
|         PublishSubject<Integer> source = PublishSubject.<Integer>create(); |         PublishSubject<Integer> source = PublishSubject.<Integer> create(); | ||||||
| 
 | 
 | ||||||
|         source.observeOn(Schedulers.computation()) |         source | ||||||
|  |           .observeOn(Schedulers.computation()) | ||||||
|           .subscribe(ComputeFunction::compute, Throwable::printStackTrace); |           .subscribe(ComputeFunction::compute, Throwable::printStackTrace); | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
|         for (int i = 0; i < 1_000_000; i++) { |         for (int i = 0; i < 1_000_000; i++) { | ||||||
|             source.onNext(i); |             source.onNext(i); | ||||||
|         } |         } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user