Refactor RxJava (#2564)

This commit is contained in:
Grzegorz Piwowarek 2017-09-05 20:43:00 +02:00 committed by GitHub
parent 0d9068c040
commit 830db34334
6 changed files with 83 additions and 136 deletions

View File

@ -15,7 +15,7 @@ public class ConnectableObservableImpl {
System.out.println("Connect"); System.out.println("Connect");
connectable.connect(); connectable.connect();
Thread.currentThread().sleep(500); Thread.sleep(500);
System.out.println("Sleep"); System.out.println("Sleep");
} }
} }

View File

@ -5,7 +5,6 @@ import rx.observables.BlockingObservable;
public class ObservableImpl { public class ObservableImpl {
public static void main(String[] args) { public static void main(String[] args) {
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
@ -14,12 +13,9 @@ public class ObservableImpl {
System.out.println("-------Just-----------"); System.out.println("-------Just-----------");
Observable<String> observable = Observable.just("Hello"); Observable<String> observable = Observable.just("Hello");
observable.subscribe( observable.subscribe(
//onNext System.out::println, //onNext
System.out::println, Throwable::printStackTrace, //onError
//onError () -> System.out.println("onCompleted") //onCompleted
Throwable::printStackTrace,
//onCompleted
() -> System.out.println("onCompleted")
); );
BlockingObservable<String> blockingObservable = observable.toBlocking(); BlockingObservable<String> blockingObservable = observable.toBlocking();
@ -27,79 +23,58 @@ public class ObservableImpl {
System.out.println(); System.out.println();
System.out.println("-------Map-----------"); System.out.println("-------Map-----------");
Observable.from(letters) Observable.from(letters)
.map((letter) -> { .map(String::toUpperCase)
return letter.toUpperCase(); .subscribe(System.out::print);
})
.subscribe(
System.out::print
);
System.out.println(); System.out.println();
System.out.println("-------FlatMap-----------"); System.out.println("-------FlatMap-----------");
Observable.from(letters) Observable.from(letters)
.flatMap((letter) -> { .flatMap((letter) -> {
String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()}; String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()};
return Observable.from(returnStrings); return Observable.from(returnStrings);
}) })
.subscribe( .subscribe(
System.out::print System.out::print
); );
System.out.println(); System.out.println();
System.out.println("--------Scan----------"); System.out.println("--------Scan----------");
Observable.from(letters) Observable.from(letters)
.scan(new StringBuilder(), (buffer, nextLetter) -> { .scan(new StringBuilder(), StringBuilder::append)
return buffer.append(nextLetter); .subscribe(System.out::println);
})
.subscribe((total) -> {
System.out.println(total.toString());
});
System.out.println(); System.out.println();
System.out.println("------GroubBy------------"); System.out.println("------GroubBy------------");
Observable.from(numbers) Observable.from(numbers)
.groupBy((i) -> { .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
return 0 == (i % 2) ? "EVEN" : "ODD"; .subscribe((group) -> group.subscribe((number) -> {
}) System.out.println(group.getKey() + " : " + number);
.subscribe((group) -> { }));
group.subscribe((number) -> {
System.out.println(group.getKey() + " : " + number);
});
});
System.out.println(); System.out.println();
System.out.println("-------Filter-----------"); System.out.println("-------Filter-----------");
Observable.from(numbers) Observable.from(numbers)
.filter((i) -> { .filter(i -> (i % 2 == 1))
return (i % 2 == 1); .subscribe(
}) System.out::println
.subscribe( );
System.out::println
);
System.out.println("------DefaultIfEmpty------------"); System.out.println("------DefaultIfEmpty------------");
Observable.empty() Observable.empty()
.defaultIfEmpty("Observable is empty") .defaultIfEmpty("Observable is empty")
.subscribe( .subscribe(
System.out::println System.out::println
); );
System.out.println("------DefaultIfEmpty-2-----------"); System.out.println("------DefaultIfEmpty-2-----------");
Observable.from(letters) Observable.from(letters)
.defaultIfEmpty("Observable is empty") .defaultIfEmpty("Observable is empty")
.first() .first()
.subscribe(System.out::println); .subscribe(System.out::println);
System.out.println("-------TakeWhile-----------"); System.out.println("-------TakeWhile-----------");
Observable.from(numbers) Observable.from(numbers)
.takeWhile((i) -> { .takeWhile(i -> i < 5)
return i < 5; .subscribe(System.out::println);
})
.subscribe(System.out::println);
} }
} }

View File

@ -1,37 +1,29 @@
package com.baelding.rxjava; package com.baelding.rxjava;
import rx.Observable; import rx.Observable;
public class ResourceManagement { public class ResourceManagement {
public static void main(String[] args) { public static void main(String[] args) {
Observable<Character> values = Observable.using( Observable<Character> values = Observable.using(
//a factory function that creates a disposable resource () -> {
() -> { String resource = "MyResource";
String resource = "MyResource"; System.out.println("Leased: " + resource);
System.out.println("Leased: " + resource); return resource;
return resource; },
}, r -> Observable.create(o -> {
//a factory function that creates an Observable for (Character c : r.toCharArray())
(resource) -> { o.onNext(c);
return Observable.create(o -> { o.onCompleted();
for (Character c : resource.toCharArray()) }),
o.onNext(c); r -> System.out.println("Disposed: " + r)
o.onCompleted();
});
},
//a function that disposes of the resource
(resource) -> System.out.println("Disposed: " + resource)
); );
values.subscribe( values.subscribe(
v -> System.out.println(v), System.out::println,
e -> System.out.println(e) System.out::println
); );
} }
} }

View File

@ -8,17 +8,11 @@ public class SingleImpl {
public static void main(String[] args) { public static void main(String[] args) {
Single<String> single = Observable.just("Hello") Single<String> single = Observable.just("Hello")
.toSingle() .toSingle()
.doOnSuccess( .doOnSuccess(System.out::print)
System.out::print .doOnError(e -> {
) throw new RuntimeException(e.getMessage());
.doOnError( });
(error) -> {
throw new RuntimeException(error.getMessage());
});
single.subscribe(); single.subscribe();
} }
} }

View File

@ -4,7 +4,6 @@ import rx.Observable;
import rx.schedulers.Schedulers; import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject; import rx.subjects.PublishSubject;
public class SubjectImpl { public class SubjectImpl {
public static final String[] subscriber1 = {""}; public static final String[] subscriber1 = {""};
@ -12,63 +11,53 @@ public class SubjectImpl {
public static String subjectMethod() throws InterruptedException { public static String subjectMethod() throws InterruptedException {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Long signal = new Long(500L); Long signal = 500L;
PublishSubject<String> subject; PublishSubject<String> subject;
synchronized (signal) { synchronized (signal) {
subject = PublishSubject.create(); subject = PublishSubject.create();
subject.subscribe( subject.subscribe(
(letter) -> { (letter) -> {
subscriber1[0] += letter; subscriber1[0] += letter;
System.out.println("Subscriber 1: " + subscriber1[0]); System.out.println("Subscriber 1: " + subscriber1[0]);
try { try {
Thread.currentThread().sleep(500); Thread.sleep(500);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
if (letter.equals("c")) { if (letter.equals("c")) {
synchronized (signal) { synchronized (signal) {
signal.notify(); signal.notify();
} }
} }
} }
); );
} }
Observable.from(letters) Observable.from(letters)
.subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.computation())
.subscribe( .subscribe(
(letter) -> { subject::onNext,
subject.onNext(letter); subject::onError,
}, () -> {
(t) -> { System.out.println("Subscriber 1 completed ");
subject.onError(t); subject.onCompleted();
}, synchronized (signal) {
() -> { signal.notify();
System.out.println("Subscriber 1 completed "); }
subject.onCompleted(); }
synchronized (signal) { );
signal.notify();
}
}
);
synchronized (signal) { synchronized (signal) {
signal.wait(); signal.wait();
subject.subscribe( subject.subscribe(
(letter) -> { (letter) -> {
subscriber2[0] += letter; subscriber2[0] += letter;
System.out.println("Subscriber 2: " + subscriber2[0]); System.out.println("Subscriber 2: " + subscriber2[0]);
}, },
(t) -> { subject::onError,
subject.onError(t); () -> System.out.println("Subscriber 2 completed ")
},
() -> {
System.out.println("Subscriber 2 completed ");
}
); );
} }
@ -76,7 +65,5 @@ public class SubjectImpl {
signal.wait(); signal.wait();
return subscriber1[0] + subscriber2[0]; return subscriber1[0] + subscriber2[0];
} }
} }
} }

View File

@ -39,5 +39,4 @@ public class ToCleanString implements Operator<String, String> {
} }
}; };
} }
} }