Merge branch 'BAEL-572' of https://github.com/tomekl007/tutorials into tomekl007-BAEL-572
This commit is contained in:
		
						commit
						c8734b028b
					
				
							
								
								
									
										35
									
								
								rxjava/pom.xml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								rxjava/pom.xml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,35 @@ | ||||
| <?xml version="1.0" encoding="UTF-8"?> | ||||
| <project xmlns="http://maven.apache.org/POM/4.0.0" | ||||
|          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||||
|          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||||
|     <modelVersion>4.0.0</modelVersion> | ||||
| 
 | ||||
|     <groupId>com.baeldung</groupId> | ||||
|     <artifactId>rxjava</artifactId> | ||||
|     <version>1.0-SNAPSHOT</version> | ||||
|     <build> | ||||
|         <plugins> | ||||
|             <plugin> | ||||
|                 <groupId>org.apache.maven.plugins</groupId> | ||||
|                 <artifactId>maven-compiler-plugin</artifactId> | ||||
|                 <configuration> | ||||
|                     <source>1.8</source> | ||||
|                     <target>1.8</target> | ||||
|                 </configuration> | ||||
|             </plugin> | ||||
|         </plugins> | ||||
|     </build> | ||||
| 
 | ||||
|     <dependencies> | ||||
|         <dependency> | ||||
|             <groupId>io.reactivex</groupId> | ||||
|             <artifactId>rxjava</artifactId> | ||||
|             <version>${rx.java.version}</version> | ||||
|         </dependency> | ||||
|     </dependencies> | ||||
| 
 | ||||
|     <properties> | ||||
|         <rx.java.version>1.2.5</rx.java.version> | ||||
|     </properties> | ||||
| 
 | ||||
| </project> | ||||
| @ -0,0 +1,44 @@ | ||||
| package com.baelding.rxjava; | ||||
| 
 | ||||
| 
 | ||||
| import rx.Observable; | ||||
| import rx.Subscriber; | ||||
| import rx.schedulers.Schedulers; | ||||
| 
 | ||||
| public class ColdObservableBackPressure { | ||||
|     public static void main(String[] args) throws InterruptedException { | ||||
|         Observable.range(1, 1_000_000) | ||||
|                 .observeOn(Schedulers.computation()) | ||||
|                 .subscribe(v -> ComputeFunction.compute(v), Throwable::printStackTrace); | ||||
| 
 | ||||
|         Thread.sleep(10_000); | ||||
| 
 | ||||
| 
 | ||||
| //        Observable.range(1, 1_000_000) //implementation of reactive pull backpressure on cold observable | ||||
| //                .subscribe(new Subscriber<Integer>() { | ||||
| //                    @Override | ||||
| //                    public void onStart() { | ||||
| //                        request(1); | ||||
| //                    } | ||||
| // | ||||
| //                    public void onNext(Integer v) { | ||||
| //                        compute(v); | ||||
| // | ||||
| //                        request(1); | ||||
| //                    } | ||||
| // | ||||
| //                    @Override | ||||
| //                    public void onError(Throwable ex) { | ||||
| //                        ex.printStackTrace(); | ||||
| //                    } | ||||
| // | ||||
| //                    @Override | ||||
| //                    public void onCompleted() { | ||||
| //                        System.out.println("Done!"); | ||||
| //                    } | ||||
| //                }); | ||||
| 
 | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,45 @@ | ||||
| package com.baelding.rxjava; | ||||
| 
 | ||||
| 
 | ||||
| import rx.Observable; | ||||
| 
 | ||||
| import java.util.List; | ||||
| 
 | ||||
| public class ComputeFunction { | ||||
|     public static void compute(Integer v) { | ||||
|         try { | ||||
|             System.out.println("compute integer v: " + v); | ||||
|             Thread.sleep(1000); | ||||
|         } catch (InterruptedException e) { | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public static void compute(List<Integer> v) { | ||||
|         try { | ||||
|             System.out.println("compute integer v: " + v); | ||||
|             Thread.sleep(1000); | ||||
|         } catch (InterruptedException e) { | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public static void compute(Observable<Integer> v) { | ||||
|         try { | ||||
|             v.forEach(System.out::println); | ||||
|             Thread.sleep(1000); | ||||
|         } catch (InterruptedException e) { | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
|     public static void compute(Long v) { | ||||
|         try { | ||||
|             System.out.println("compute integer v: " + v); | ||||
|             Thread.sleep(1000); | ||||
|         } catch (InterruptedException e) { | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,21 @@ | ||||
| package com.baelding.rxjava; | ||||
| 
 | ||||
| import rx.schedulers.Schedulers; | ||||
| import rx.subjects.PublishSubject; | ||||
| 
 | ||||
| public class HotObservableBackPressureBatching { | ||||
|     public static void main(String[] args) throws InterruptedException { | ||||
|         PublishSubject<Integer> source = PublishSubject.<Integer>create(); | ||||
| 
 | ||||
|         source.window(500) | ||||
|                 .observeOn(Schedulers.computation()) | ||||
|                 .subscribe(ComputeFunction::compute, Throwable::printStackTrace); | ||||
| 
 | ||||
| 
 | ||||
|         for (int i = 0; i < 1_000_000; i++) { | ||||
|             source.onNext(i); | ||||
|         } | ||||
|         Thread.sleep(10_000); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,22 @@ | ||||
| package com.baelding.rxjava; | ||||
| 
 | ||||
| import rx.schedulers.Schedulers; | ||||
| import rx.subjects.PublishSubject; | ||||
| 
 | ||||
| 
 | ||||
| public class HotObservableBackPressureBuffering { | ||||
|     public static void main(String[] args) throws InterruptedException { | ||||
|         PublishSubject<Integer> source = PublishSubject.<Integer>create(); | ||||
| 
 | ||||
|         source | ||||
|                 .buffer(1024) | ||||
|                 .observeOn(Schedulers.computation()) | ||||
|                 .subscribe(ComputeFunction::compute, Throwable::printStackTrace); | ||||
| 
 | ||||
| 
 | ||||
|         for (int i = 0; i < 1_000_000; i++) { | ||||
|             source.onNext(i); | ||||
|         } | ||||
|         Thread.sleep(10_000); | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,23 @@ | ||||
| package com.baelding.rxjava; | ||||
| 
 | ||||
| import rx.schedulers.Schedulers; | ||||
| import rx.subjects.PublishSubject; | ||||
| 
 | ||||
| import java.util.concurrent.TimeUnit; | ||||
| 
 | ||||
| public class HotObservableBackPressureSkipping { | ||||
|     public static void main(String[] args) throws InterruptedException { | ||||
|         PublishSubject<Integer> source = PublishSubject.<Integer>create(); | ||||
| 
 | ||||
|         source.sample(100, TimeUnit.MILLISECONDS) | ||||
| //                .throttleFirst(100, TimeUnit.MILLISECONDS) | ||||
|                 .observeOn(Schedulers.computation()) | ||||
|                 .subscribe(ComputeFunction::compute, Throwable::printStackTrace); | ||||
| 
 | ||||
| 
 | ||||
|         for (int i = 0; i < 1_000_000; i++) { | ||||
|             source.onNext(i); | ||||
|         } | ||||
|         Thread.sleep(10_000); | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,28 @@ | ||||
| package com.baelding.rxjava; | ||||
| 
 | ||||
| 
 | ||||
| import rx.BackpressureOverflow; | ||||
| import rx.Observable; | ||||
| import rx.schedulers.Schedulers; | ||||
| 
 | ||||
| public class HotObservableOnBackPressure { | ||||
|     public static void main(String[] args) throws InterruptedException { | ||||
|         Observable.range(1, 1_000_000) | ||||
|                 .onBackpressureBuffer(16, () -> { | ||||
|                         }, | ||||
|                         BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) | ||||
|                 .observeOn(Schedulers.computation()) | ||||
|                 .subscribe(e -> { | ||||
|                 }, Throwable::printStackTrace); | ||||
| 
 | ||||
| 
 | ||||
|         Observable.range(1, 1_000_000) | ||||
|                 .onBackpressureDrop() | ||||
|                 .observeOn(Schedulers.io()) | ||||
|                 .doOnNext(ComputeFunction::compute) | ||||
|                 .subscribe(v -> { | ||||
|                 }, Throwable::printStackTrace); | ||||
|         Thread.sleep(10_000); | ||||
| 
 | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,20 @@ | ||||
| package com.baelding.rxjava; | ||||
| 
 | ||||
| 
 | ||||
| import rx.schedulers.Schedulers; | ||||
| import rx.subjects.PublishSubject; | ||||
| 
 | ||||
| public class HotObservableWithoutBackPressure { | ||||
|     public static void main(String[] args) throws InterruptedException { | ||||
|         PublishSubject<Integer> source = PublishSubject.<Integer>create(); | ||||
| 
 | ||||
|         source.observeOn(Schedulers.computation()) | ||||
|                 .subscribe(ComputeFunction::compute, Throwable::printStackTrace); | ||||
| 
 | ||||
| 
 | ||||
|         for (int i = 0; i < 1_000_000; i++) { | ||||
|             source.onNext(i); | ||||
|         } | ||||
|         Thread.sleep(10_000); | ||||
|     } | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user