diff --git a/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java b/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java index c5da7c1df5..8788b894aa 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java @@ -15,7 +15,7 @@ public class ConnectableObservableImpl { System.out.println("Connect"); connectable.connect(); - Thread.currentThread().sleep(500); + Thread.sleep(500); System.out.println("Sleep"); } } diff --git a/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java b/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java index 6254d03491..f8f5b81883 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java @@ -5,7 +5,6 @@ import rx.observables.BlockingObservable; public class ObservableImpl { - public static void main(String[] args) { Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; @@ -14,12 +13,9 @@ public class ObservableImpl { System.out.println("-------Just-----------"); Observable observable = Observable.just("Hello"); observable.subscribe( - //onNext - System.out::println, - //onError - Throwable::printStackTrace, - //onCompleted - () -> System.out.println("onCompleted") + System.out::println, //onNext + Throwable::printStackTrace, //onError + () -> System.out.println("onCompleted") //onCompleted ); BlockingObservable blockingObservable = observable.toBlocking(); @@ -27,79 +23,58 @@ public class ObservableImpl { System.out.println(); System.out.println("-------Map-----------"); Observable.from(letters) - .map((letter) -> { - return letter.toUpperCase(); - }) - .subscribe( - System.out::print - ); + .map(String::toUpperCase) + .subscribe(System.out::print); System.out.println(); System.out.println("-------FlatMap-----------"); Observable.from(letters) - .flatMap((letter) -> { - String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()}; - return Observable.from(returnStrings); - }) - .subscribe( - System.out::print - ); + .flatMap((letter) -> { + String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()}; + return Observable.from(returnStrings); + }) + .subscribe( + System.out::print + ); System.out.println(); System.out.println("--------Scan----------"); Observable.from(letters) - .scan(new StringBuilder(), (buffer, nextLetter) -> { - return buffer.append(nextLetter); - }) - .subscribe((total) -> { - System.out.println(total.toString()); - }); + .scan(new StringBuilder(), StringBuilder::append) + .subscribe(System.out::println); System.out.println(); System.out.println("------GroubBy------------"); Observable.from(numbers) - .groupBy((i) -> { - return 0 == (i % 2) ? "EVEN" : "ODD"; - }) - .subscribe((group) -> { - group.subscribe((number) -> { - System.out.println(group.getKey() + " : " + number); - }); - }); + .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD") + .subscribe((group) -> group.subscribe((number) -> { + System.out.println(group.getKey() + " : " + number); + })); System.out.println(); System.out.println("-------Filter-----------"); Observable.from(numbers) - .filter((i) -> { - return (i % 2 == 1); - }) - .subscribe( - System.out::println - ); - + .filter(i -> (i % 2 == 1)) + .subscribe( + System.out::println + ); System.out.println("------DefaultIfEmpty------------"); Observable.empty() - .defaultIfEmpty("Observable is empty") - .subscribe( - System.out::println - ); - - + .defaultIfEmpty("Observable is empty") + .subscribe( + System.out::println + ); System.out.println("------DefaultIfEmpty-2-----------"); Observable.from(letters) - .defaultIfEmpty("Observable is empty") - .first() - .subscribe(System.out::println); + .defaultIfEmpty("Observable is empty") + .first() + .subscribe(System.out::println); System.out.println("-------TakeWhile-----------"); Observable.from(numbers) - .takeWhile((i) -> { - return i < 5; - }) - .subscribe(System.out::println); - - + .takeWhile(i -> i < 5) + .subscribe(System.out::println); } } \ No newline at end of file diff --git a/rxjava/src/main/java/com/baelding/rxjava/ResourceManagement.java b/rxjava/src/main/java/com/baelding/rxjava/ResourceManagement.java index 9bea622447..e097da1a29 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ResourceManagement.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ResourceManagement.java @@ -1,37 +1,29 @@ package com.baelding.rxjava; - import rx.Observable; public class ResourceManagement { - public static void main(String[] args) { Observable values = Observable.using( - //a factory function that creates a disposable resource - () -> { - String resource = "MyResource"; - System.out.println("Leased: " + resource); - return resource; - }, - //a factory function that creates an Observable - (resource) -> { - return Observable.create(o -> { - for (Character c : resource.toCharArray()) - o.onNext(c); - o.onCompleted(); - }); - }, - //a function that disposes of the resource - (resource) -> System.out.println("Disposed: " + resource) + () -> { + String resource = "MyResource"; + System.out.println("Leased: " + resource); + return resource; + }, + r -> Observable.create(o -> { + for (Character c : r.toCharArray()) + o.onNext(c); + o.onCompleted(); + }), + r -> System.out.println("Disposed: " + r) ); values.subscribe( - v -> System.out.println(v), - e -> System.out.println(e) + System.out::println, + System.out::println ); - } } diff --git a/rxjava/src/main/java/com/baelding/rxjava/SingleImpl.java b/rxjava/src/main/java/com/baelding/rxjava/SingleImpl.java index f6636999b6..f625ab6b44 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/SingleImpl.java +++ b/rxjava/src/main/java/com/baelding/rxjava/SingleImpl.java @@ -8,17 +8,11 @@ public class SingleImpl { public static void main(String[] args) { Single single = Observable.just("Hello") - .toSingle() - .doOnSuccess( - System.out::print - ) - .doOnError( - (error) -> { - throw new RuntimeException(error.getMessage()); - }); + .toSingle() + .doOnSuccess(System.out::print) + .doOnError(e -> { + throw new RuntimeException(e.getMessage()); + }); single.subscribe(); - - } - } diff --git a/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java b/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java index e96f82a171..244c291f00 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java +++ b/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java @@ -4,7 +4,6 @@ import rx.Observable; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; - public class SubjectImpl { public static final String[] subscriber1 = {""}; @@ -12,63 +11,53 @@ public class SubjectImpl { public static String subjectMethod() throws InterruptedException { - String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; - Long signal = new Long(500L); + Long signal = 500L; PublishSubject subject; synchronized (signal) { subject = PublishSubject.create(); subject.subscribe( - (letter) -> { - subscriber1[0] += letter; - System.out.println("Subscriber 1: " + subscriber1[0]); - try { - Thread.currentThread().sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - if (letter.equals("c")) { - synchronized (signal) { - signal.notify(); - } - } - } + (letter) -> { + subscriber1[0] += letter; + System.out.println("Subscriber 1: " + subscriber1[0]); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (letter.equals("c")) { + synchronized (signal) { + signal.notify(); + } + } + } ); } Observable.from(letters) - .subscribeOn(Schedulers.computation()) - .subscribe( - (letter) -> { - subject.onNext(letter); - }, - (t) -> { - subject.onError(t); - }, - () -> { - System.out.println("Subscriber 1 completed "); - subject.onCompleted(); - synchronized (signal) { - signal.notify(); - } - - } - ); + .subscribeOn(Schedulers.computation()) + .subscribe( + subject::onNext, + subject::onError, + () -> { + System.out.println("Subscriber 1 completed "); + subject.onCompleted(); + synchronized (signal) { + signal.notify(); + } + } + ); synchronized (signal) { signal.wait(); subject.subscribe( - (letter) -> { - subscriber2[0] += letter; - System.out.println("Subscriber 2: " + subscriber2[0]); - }, - (t) -> { - subject.onError(t); - }, - () -> { - System.out.println("Subscriber 2 completed "); - } + (letter) -> { + subscriber2[0] += letter; + System.out.println("Subscriber 2: " + subscriber2[0]); + }, + subject::onError, + () -> System.out.println("Subscriber 2 completed ") ); } @@ -76,7 +65,5 @@ public class SubjectImpl { signal.wait(); return subscriber1[0] + subscriber2[0]; } - } - } diff --git a/rxjava/src/main/java/com/baelding/rxjava/operator/ToCleanString.java b/rxjava/src/main/java/com/baelding/rxjava/operator/ToCleanString.java index f6cf9fba68..32db92c8fe 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/operator/ToCleanString.java +++ b/rxjava/src/main/java/com/baelding/rxjava/operator/ToCleanString.java @@ -39,5 +39,4 @@ public class ToCleanString implements Operator { } }; } - } \ No newline at end of file