Guide to RxJava (#2597)

* alin.cojanu25@gmail.com

* add junit tests

* core-java - sneaky throws

* delete project spring_sample_annotations and spring_sample_xml ->not intended for this article

* RxJava

* delete old project

* refactored lambdas

* repaired subject test

* guide to rxjava
This commit is contained in:
alincojanu 2017-09-10 19:58:09 +03:00 committed by Grzegorz Piwowarek
parent 07d7ec51e8
commit d4d0305f3e
8 changed files with 155 additions and 200 deletions

View File

@ -9,7 +9,8 @@ public class ConnectableObservableImpl {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); ConnectableObservable<Long> connectable
= Observable.interval(200, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(System.out::println); connectable.subscribe(System.out::println);
System.out.println("Connect"); System.out.println("Connect");

View File

@ -3,12 +3,22 @@ package com.baelding.rxjava;
import rx.Observable; import rx.Observable;
import rx.observables.BlockingObservable; import rx.observables.BlockingObservable;
import java.util.Arrays;
import java.util.List;
public class ObservableImpl { public class ObservableImpl {
public static void main(String[] args) { static Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; static String[] letters = {"a", "b", "c", "d", "e", "f", "g", "h", "i"};
String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; static String[] titles = {"title"};
public static List<String> titleList = Arrays.asList(titles);
public static Observable<String> getTitle() {
return Observable.from(titleList);
}
public static void main(String[] args) {
System.out.println("-------Just-----------"); System.out.println("-------Just-----------");
Observable<String> observable = Observable.just("Hello"); Observable<String> observable = Observable.just("Hello");
@ -28,14 +38,9 @@ public class ObservableImpl {
System.out.println(); System.out.println();
System.out.println("-------FlatMap-----------"); System.out.println("-------FlatMap-----------");
Observable.from(letters) Observable.just("book1", "book2")
.flatMap((letter) -> { .flatMap(s -> getTitle())
String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()}; .subscribe(System.out::print);
return Observable.from(returnStrings);
})
.subscribe(
System.out::print
);
System.out.println(); System.out.println();
System.out.println("--------Scan----------"); System.out.println("--------Scan----------");
@ -55,16 +60,12 @@ public class ObservableImpl {
System.out.println("-------Filter-----------"); System.out.println("-------Filter-----------");
Observable.from(numbers) Observable.from(numbers)
.filter(i -> (i % 2 == 1)) .filter(i -> (i % 2 == 1))
.subscribe( .subscribe(System.out::println);
System.out::println
);
System.out.println("------DefaultIfEmpty------------"); System.out.println("------DefaultIfEmpty------------");
Observable.empty() Observable.empty()
.defaultIfEmpty("Observable is empty") .defaultIfEmpty("Observable is empty")
.subscribe( .subscribe(System.out::println);
System.out::println
);
System.out.println("------DefaultIfEmpty-2-----------"); System.out.println("------DefaultIfEmpty-2-----------");
Observable.from(letters) Observable.from(letters)

View File

@ -1,69 +1,73 @@
package com.baelding.rxjava; package com.baelding.rxjava;
import rx.Observable; import rx.Observer;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject; import rx.subjects.PublishSubject;
public class SubjectImpl { public class SubjectImpl {
public static final String[] subscriber1 = {""}; public static Integer subscriber1 = 0;
public static final String[] subscriber2 = {""}; public static Integer subscriber2 = 0;
public static String subjectMethod() throws InterruptedException { public static Integer subjectMethod() {
PublishSubject<Integer> subject = PublishSubject.create();
String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; subject.subscribe(getFirstObserver());
Long signal = 500L;
PublishSubject<String> subject;
synchronized (signal) { subject.onNext(1);
subject = PublishSubject.create(); subject.onNext(2);
subject.subscribe( subject.onNext(3);
(letter) -> {
subscriber1[0] += letter;
System.out.println("Subscriber 1: " + subscriber1[0]);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (letter.equals("c")) {
synchronized (signal) {
signal.notify();
}
}
}
);
}
Observable.from(letters) subject.subscribe(getSecondObserver());
.subscribeOn(Schedulers.computation())
.subscribe( subject.onNext(4);
subject::onNext, subject.onCompleted();
subject::onError, return subscriber1 + subscriber2;
() -> { }
System.out.println("Subscriber 1 completed ");
subject.onCompleted();
synchronized (signal) { public static Observer<Integer> getFirstObserver() {
signal.notify(); return new Observer<Integer>() {
}
@Override
public void onNext(Integer value) {
subscriber1 += value;
System.out.println("Subscriber1: " + value);
} }
);
synchronized (signal) { @Override
signal.wait(); public void onError(Throwable e) {
subject.subscribe( System.out.println("error");
(letter) -> { }
subscriber2[0] += letter;
System.out.println("Subscriber 2: " + subscriber2[0]);
},
subject::onError,
() -> System.out.println("Subscriber 2 completed ")
);
}
synchronized (signal) { @Override
signal.wait(); public void onCompleted() {
return subscriber1[0] + subscriber2[0]; System.out.println("Subscriber1 completed");
} }
};
}
public static Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onNext(Integer value) {
subscriber2 += value;
System.out.println("Subscriber2: " + value);
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("Subscriber2 completed");
}
};
}
public static void main(String[] args) throws InterruptedException {
System.out.println(subjectMethod());
} }
} }

View File

@ -13,15 +13,14 @@ public class ConnectableObservableTest {
@Test @Test
public void givenConnectableObservable_whenConnect_thenGetMessage() throws InterruptedException { public void givenConnectableObservable_whenConnect_thenGetMessage() throws InterruptedException {
final String[] result = {""}; String[] result = {""};
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); ConnectableObservable<Long> connectable
= Observable.interval(200, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(i -> result[0] += i); connectable.subscribe(i -> result[0] += i);
assertFalse(result[0].equals("01")); assertFalse(result[0].equals("01"));
connectable.connect(); connectable.connect();
Thread.currentThread().sleep(500); Thread.sleep(500);
assertTrue(result[0].equals("01")); assertTrue(result[0].equals("01"));
} }
} }

View File

@ -2,10 +2,8 @@ package com.baeldung.rxjava;
import org.junit.Test; import org.junit.Test;
import rx.Observable; import rx.Observable;
import rx.observables.ConnectableObservable;
import java.util.concurrent.TimeUnit;
import static com.baelding.rxjava.ObservableImpl.getTitle;
import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.assertTrue;
public class ObservableTest { public class ObservableTest {
@ -24,41 +22,23 @@ public class ObservableTest {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters); Observable<String> observable = Observable.from(letters);
observable.subscribe( observable.subscribe(
//onNext i -> result += i,
(i) -> { Throwable::printStackTrace,
result += i; () -> result += "_Complete"
},
//onError
(t) -> {
t.printStackTrace();
},
//onCompleted
() -> {
result += "Complete";
}
); );
assertTrue(result.equals("abcdefgComplete")); assertTrue(result.equals("abcdefg_Complete"));
} }
@Test @Test
public void givenArray_whenConvertsObservabletoBlockingObservable_thenReturnFirstElement() { public void givenArray_whenConvertsObservabletoBlockingObservable_thenReturnFirstElement() {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters); Observable<String> observable = Observable.from(letters);
String blockingObservable = observable.toBlocking().first(); String blockingObservable = observable.toBlocking().first();
observable.subscribe( observable.subscribe(
//onNext i -> result += i,
(i) -> { Throwable::printStackTrace,
result += i; () -> result += "_Completed"
},
//onError
(t) -> {
t.printStackTrace();
},
//onCompleted
() -> {
result += "Complete";
}
); );
assertTrue(String.valueOf(result.charAt(0)).equals(blockingObservable)); assertTrue(String.valueOf(result.charAt(0)).equals(blockingObservable));
} }
@ -68,45 +48,31 @@ public class ObservableTest {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable.from(letters) Observable.from(letters)
.map((letter) -> { .map(String::toUpperCase)
return letter.toUpperCase(); .subscribe(letter -> result += letter);
})
.subscribe((letter) -> {
result += letter;
});
assertTrue(result.equals("ABCDEFG")); assertTrue(result.equals("ABCDEFG"));
} }
@Test @Test
public void givenArray_whenFlatMapAndSubscribe_thenReturnUpperAndLowerCaseLetters() { public void givenArray_whenFlatMapAndSubscribe_thenReturnUpperAndLowerCaseLetters() {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable.from(letters) Observable.just("book1", "book2")
.flatMap((letter) -> { .flatMap(s -> getTitle())
String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()}; .subscribe(l -> result += l);
return Observable.from(returnStrings);
})
.subscribe((letter) -> {
result += letter;
});
assertTrue(result.equals("AaBbCcDdEeFfGg")); assertTrue(result.equals("titletitle"));
} }
@Test @Test
public void givenArray_whenScanAndSubscribe_thenReturnTheSumOfAllLetters() { public void givenArray_whenScanAndSubscribe_thenReturnTheSumOfAllLetters() {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; String[] letters = {"a", "b", "c"};
Observable.from(letters) Observable.from(letters)
.scan(new StringBuilder(), (buffer, nextLetter) -> { .scan(new StringBuilder(), StringBuilder::append)
return buffer.append(nextLetter); .subscribe(total -> result += total.toString());
})
.subscribe((total) -> {
result = total.toString();
});
assertTrue(result.equals("abcdefg")); assertTrue(result.equals("aababc"));
} }
@Test @Test
@ -116,18 +82,16 @@ public class ObservableTest {
String[] ODD = {""}; String[] ODD = {""};
Observable.from(numbers) Observable.from(numbers)
.groupBy((i) -> { .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
return 0 == (i % 2) ? "EVEN" : "ODD"; .subscribe(group ->
}) group.subscribe((number) -> {
.subscribe((group) -> { if (group.getKey().toString().equals("EVEN")) {
group.subscribe((number) -> { EVEN[0] += number;
if (group.getKey().toString().equals("EVEN")) { } else {
EVEN[0] += number; ODD[0] += number;
} else { }
ODD[0] += number; })
} );
});
});
assertTrue(EVEN[0].equals("0246810")); assertTrue(EVEN[0].equals("0246810"));
assertTrue(ODD[0].equals("13579")); assertTrue(ODD[0].equals("13579"));
@ -138,12 +102,8 @@ public class ObservableTest {
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
Observable.from(numbers) Observable.from(numbers)
.filter((i) -> { .filter(i -> (i % 2 == 1))
return (i % 2 == 1); .subscribe(i -> result += i);
})
.subscribe((i) -> {
result += i;
});
assertTrue(result.equals("13579")); assertTrue(result.equals("13579"));
} }
@ -152,10 +112,8 @@ public class ObservableTest {
public void givenEmptyObservable_whenDefaultIfEmpty_thenGetDefaultMessage() { public void givenEmptyObservable_whenDefaultIfEmpty_thenGetDefaultMessage() {
Observable.empty() Observable.empty()
.defaultIfEmpty("Observable is empty") .defaultIfEmpty("Observable is empty")
.subscribe((s) -> { .subscribe(s -> result += s);
result += s;
});
assertTrue(result.equals("Observable is empty")); assertTrue(result.equals("Observable is empty"));
} }
@ -165,11 +123,9 @@ public class ObservableTest {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable.from(letters) Observable.from(letters)
.defaultIfEmpty("Observable is empty") .defaultIfEmpty("Observable is empty")
.first() .first()
.subscribe((s) -> { .subscribe(s -> result += s);
result += s;
});
assertTrue(result.equals("a")); assertTrue(result.equals("a"));
} }
@ -180,12 +136,8 @@ public class ObservableTest {
final Integer[] sum = {0}; final Integer[] sum = {0};
Observable.from(numbers) Observable.from(numbers)
.takeWhile((i) -> { .takeWhile(i -> i < 5)
return i < 5; .subscribe(s -> sum[0] += s);
})
.subscribe((s) -> {
sum[0] += s;
});
assertTrue(sum[0] == 10); assertTrue(sum[0] == 10);
} }

View File

@ -10,32 +10,25 @@ public class ResourceManagementTest {
@Test @Test
public void givenResource_whenUsingOberservable_thenCreatePrintDisposeResource() throws InterruptedException { public void givenResource_whenUsingOberservable_thenCreatePrintDisposeResource() throws InterruptedException {
final String[] result = {""}; String[] result = {""};
Observable<Character> values = Observable.using( Observable<Character> values = Observable.using(
//a factory function that creates a disposable resource () -> {
() -> { return "MyResource";
String resource = "MyResource"; },
return resource; r -> {
}, return Observable.create(o -> {
//a factory function that creates an Observable for (Character c : r.toCharArray())
(resource) -> { o.onNext(c);
return Observable.create(o -> { o.onCompleted();
for (Character c : resource.toCharArray()) });
o.onNext(c); },
o.onCompleted(); r -> System.out.println("Disposed: " + r)
});
},
//a function that disposes of the resource
(resource) -> System.out.println("Disposed: " + resource)
); );
values.subscribe( values.subscribe(
v -> result[0] += v, v -> result[0] += v,
e -> result[0] += e e -> result[0] += e
); );
assertTrue(result[0].equals("MyResource")); assertTrue(result[0].equals("MyResource"));
} }
} }

View File

@ -10,19 +10,15 @@ public class SingleTest {
@Test @Test
public void givenSingleObservable_whenSuccess_thenGetMessage() throws InterruptedException { public void givenSingleObservable_whenSuccess_thenGetMessage() throws InterruptedException {
final String[] result = {""}; String[] result = {""};
Single<String> single = Observable.just("Hello").toSingle() Single<String> single = Observable.just("Hello")
.doOnSuccess( .toSingle()
(i) -> { .doOnSuccess(i -> result[0] += i)
result[0] += i; .doOnError(error -> {
}) throw new RuntimeException(error.getMessage());
.doOnError( });
(error) -> {
throw new RuntimeException(error.getMessage());
});
single.subscribe(); single.subscribe();
assertTrue(result[0].equals("Hello")); assertTrue(result[0].equals("Hello"));
} }
} }

View File

@ -2,16 +2,25 @@ package com.baeldung.rxjava;
import com.baelding.rxjava.SubjectImpl; import com.baelding.rxjava.SubjectImpl;
import org.junit.Test; import org.junit.Test;
import rx.subjects.PublishSubject;
import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.assertTrue;
public class SubjectTest { public class SubjectTest {
@Test @Test
public void givenSubjectAndTwoSubscribers_whenSubscribeOnSubjectAfterLetterC_thenSecondSubscriberBeginsToPrint() throws InterruptedException { public void givenSubjectAndTwoSubscribers_whenSubscribeOnSubject_thenSubscriberBeginsToAdd(){
String result = SubjectImpl.subjectMethod(); PublishSubject<Integer> subject = PublishSubject.create();
String subscribers = SubjectImpl.subscriber1[0] + SubjectImpl.subscriber2[0];
assertTrue(subscribers.equals(result)); subject.subscribe(SubjectImpl.getFirstObserver());
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(SubjectImpl.getSecondObserver());
subject.onNext(4);
subject.onCompleted();
assertTrue(SubjectImpl.subscriber1 + SubjectImpl.subscriber2 == 14);
} }
} }