A Guide to RxJava (#2557)
* alin.cojanu25@gmail.com * add junit tests * core-java - sneaky throws * delete project spring_sample_annotations and spring_sample_xml ->not intended for this article * RxJava * delete old project * refactored lambdas * repaired subject test
This commit is contained in:
parent
cafdf51bd7
commit
0d9068c040
|
@ -0,0 +1,21 @@
|
|||
package com.baelding.rxjava;
|
||||
|
||||
import rx.Observable;
|
||||
import rx.observables.ConnectableObservable;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ConnectableObservableImpl {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
||||
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
|
||||
connectable.subscribe(System.out::println);
|
||||
|
||||
System.out.println("Connect");
|
||||
connectable.connect();
|
||||
|
||||
Thread.currentThread().sleep(500);
|
||||
System.out.println("Sleep");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
package com.baelding.rxjava;
|
||||
|
||||
import rx.Observable;
|
||||
import rx.observables.BlockingObservable;
|
||||
|
||||
public class ObservableImpl {
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
||||
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
|
||||
|
||||
System.out.println("-------Just-----------");
|
||||
Observable<String> observable = Observable.just("Hello");
|
||||
observable.subscribe(
|
||||
//onNext
|
||||
System.out::println,
|
||||
//onError
|
||||
Throwable::printStackTrace,
|
||||
//onCompleted
|
||||
() -> System.out.println("onCompleted")
|
||||
);
|
||||
|
||||
BlockingObservable<String> blockingObservable = observable.toBlocking();
|
||||
|
||||
System.out.println();
|
||||
System.out.println("-------Map-----------");
|
||||
Observable.from(letters)
|
||||
.map((letter) -> {
|
||||
return letter.toUpperCase();
|
||||
})
|
||||
.subscribe(
|
||||
System.out::print
|
||||
);
|
||||
|
||||
System.out.println();
|
||||
System.out.println("-------FlatMap-----------");
|
||||
Observable.from(letters)
|
||||
.flatMap((letter) -> {
|
||||
String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()};
|
||||
return Observable.from(returnStrings);
|
||||
})
|
||||
.subscribe(
|
||||
System.out::print
|
||||
);
|
||||
|
||||
System.out.println();
|
||||
System.out.println("--------Scan----------");
|
||||
Observable.from(letters)
|
||||
.scan(new StringBuilder(), (buffer, nextLetter) -> {
|
||||
return buffer.append(nextLetter);
|
||||
})
|
||||
.subscribe((total) -> {
|
||||
System.out.println(total.toString());
|
||||
});
|
||||
|
||||
System.out.println();
|
||||
System.out.println("------GroubBy------------");
|
||||
Observable.from(numbers)
|
||||
.groupBy((i) -> {
|
||||
return 0 == (i % 2) ? "EVEN" : "ODD";
|
||||
})
|
||||
.subscribe((group) -> {
|
||||
group.subscribe((number) -> {
|
||||
System.out.println(group.getKey() + " : " + number);
|
||||
});
|
||||
});
|
||||
|
||||
System.out.println();
|
||||
System.out.println("-------Filter-----------");
|
||||
Observable.from(numbers)
|
||||
.filter((i) -> {
|
||||
return (i % 2 == 1);
|
||||
})
|
||||
.subscribe(
|
||||
System.out::println
|
||||
);
|
||||
|
||||
|
||||
System.out.println("------DefaultIfEmpty------------");
|
||||
Observable.empty()
|
||||
.defaultIfEmpty("Observable is empty")
|
||||
.subscribe(
|
||||
System.out::println
|
||||
);
|
||||
|
||||
|
||||
|
||||
System.out.println("------DefaultIfEmpty-2-----------");
|
||||
Observable.from(letters)
|
||||
.defaultIfEmpty("Observable is empty")
|
||||
.first()
|
||||
.subscribe(System.out::println);
|
||||
|
||||
System.out.println("-------TakeWhile-----------");
|
||||
Observable.from(numbers)
|
||||
.takeWhile((i) -> {
|
||||
return i < 5;
|
||||
})
|
||||
.subscribe(System.out::println);
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package com.baelding.rxjava;
|
||||
|
||||
|
||||
import rx.Observable;
|
||||
|
||||
public class ResourceManagement {
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
Observable<Character> values = Observable.using(
|
||||
//a factory function that creates a disposable resource
|
||||
() -> {
|
||||
String resource = "MyResource";
|
||||
System.out.println("Leased: " + resource);
|
||||
return resource;
|
||||
},
|
||||
//a factory function that creates an Observable
|
||||
(resource) -> {
|
||||
return Observable.create(o -> {
|
||||
for (Character c : resource.toCharArray())
|
||||
o.onNext(c);
|
||||
o.onCompleted();
|
||||
});
|
||||
},
|
||||
//a function that disposes of the resource
|
||||
(resource) -> System.out.println("Disposed: " + resource)
|
||||
);
|
||||
|
||||
values.subscribe(
|
||||
v -> System.out.println(v),
|
||||
e -> System.out.println(e)
|
||||
);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package com.baelding.rxjava;
|
||||
|
||||
import rx.Observable;
|
||||
import rx.Single;
|
||||
|
||||
public class SingleImpl {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
Single<String> single = Observable.just("Hello")
|
||||
.toSingle()
|
||||
.doOnSuccess(
|
||||
System.out::print
|
||||
)
|
||||
.doOnError(
|
||||
(error) -> {
|
||||
throw new RuntimeException(error.getMessage());
|
||||
});
|
||||
single.subscribe();
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
package com.baelding.rxjava;
|
||||
|
||||
import rx.Observable;
|
||||
import rx.schedulers.Schedulers;
|
||||
import rx.subjects.PublishSubject;
|
||||
|
||||
|
||||
public class SubjectImpl {
|
||||
|
||||
public static final String[] subscriber1 = {""};
|
||||
public static final String[] subscriber2 = {""};
|
||||
|
||||
public static String subjectMethod() throws InterruptedException {
|
||||
|
||||
|
||||
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
|
||||
Long signal = new Long(500L);
|
||||
PublishSubject<String> subject;
|
||||
|
||||
synchronized (signal) {
|
||||
subject = PublishSubject.create();
|
||||
subject.subscribe(
|
||||
(letter) -> {
|
||||
subscriber1[0] += letter;
|
||||
System.out.println("Subscriber 1: " + subscriber1[0]);
|
||||
try {
|
||||
Thread.currentThread().sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
if (letter.equals("c")) {
|
||||
synchronized (signal) {
|
||||
signal.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
Observable.from(letters)
|
||||
.subscribeOn(Schedulers.computation())
|
||||
.subscribe(
|
||||
(letter) -> {
|
||||
subject.onNext(letter);
|
||||
},
|
||||
(t) -> {
|
||||
subject.onError(t);
|
||||
},
|
||||
() -> {
|
||||
System.out.println("Subscriber 1 completed ");
|
||||
subject.onCompleted();
|
||||
synchronized (signal) {
|
||||
signal.notify();
|
||||
}
|
||||
|
||||
}
|
||||
);
|
||||
|
||||
synchronized (signal) {
|
||||
signal.wait();
|
||||
subject.subscribe(
|
||||
(letter) -> {
|
||||
subscriber2[0] += letter;
|
||||
System.out.println("Subscriber 2: " + subscriber2[0]);
|
||||
},
|
||||
(t) -> {
|
||||
subject.onError(t);
|
||||
},
|
||||
() -> {
|
||||
System.out.println("Subscriber 2 completed ");
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
synchronized (signal) {
|
||||
signal.wait();
|
||||
return subscriber1[0] + subscriber2[0];
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package com.baeldung.rxjava;
|
||||
|
||||
import org.junit.Test;
|
||||
import rx.Observable;
|
||||
import rx.observables.ConnectableObservable;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static junit.framework.Assert.assertFalse;
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
|
||||
public class ConnectableObservableTest {
|
||||
|
||||
@Test
|
||||
public void givenConnectableObservable_whenConnect_thenGetMessage() throws InterruptedException {
|
||||
final String[] result = {""};
|
||||
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
|
||||
connectable.subscribe(i -> result[0] += i);
|
||||
|
||||
assertFalse(result[0].equals("01"));
|
||||
|
||||
connectable.connect();
|
||||
Thread.currentThread().sleep(500);
|
||||
|
||||
assertTrue(result[0].equals("01"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,193 @@
|
|||
package com.baeldung.rxjava;
|
||||
|
||||
import org.junit.Test;
|
||||
import rx.Observable;
|
||||
import rx.observables.ConnectableObservable;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
|
||||
public class ObservableTest {
|
||||
|
||||
String result = "";
|
||||
|
||||
@Test
|
||||
public void givenString_whenJustAndSubscribe_thenEmitsSingleItem() {
|
||||
Observable<String> observable = Observable.just("Hello");
|
||||
observable.subscribe(s -> result = s);
|
||||
assertTrue(result.equals("Hello"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenArray_whenFromAndSubscribe_thenEmitsItems() {
|
||||
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
|
||||
Observable<String> observable = Observable.from(letters);
|
||||
observable.subscribe(
|
||||
//onNext
|
||||
(i) -> {
|
||||
result += i;
|
||||
},
|
||||
//onError
|
||||
(t) -> {
|
||||
t.printStackTrace();
|
||||
},
|
||||
//onCompleted
|
||||
() -> {
|
||||
result += "Complete";
|
||||
}
|
||||
);
|
||||
assertTrue(result.equals("abcdefgComplete"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenArray_whenConvertsObservabletoBlockingObservable_thenReturnFirstElement() {
|
||||
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
|
||||
Observable<String> observable = Observable.from(letters);
|
||||
String blockingObservable = observable.toBlocking().first();
|
||||
|
||||
observable.subscribe(
|
||||
//onNext
|
||||
(i) -> {
|
||||
result += i;
|
||||
},
|
||||
//onError
|
||||
(t) -> {
|
||||
t.printStackTrace();
|
||||
},
|
||||
//onCompleted
|
||||
() -> {
|
||||
result += "Complete";
|
||||
}
|
||||
);
|
||||
assertTrue(String.valueOf(result.charAt(0)).equals(blockingObservable));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenArray_whenMapAndSubscribe_thenReturnCapitalLetters() {
|
||||
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
|
||||
|
||||
Observable.from(letters)
|
||||
.map((letter) -> {
|
||||
return letter.toUpperCase();
|
||||
})
|
||||
.subscribe((letter) -> {
|
||||
result += letter;
|
||||
});
|
||||
|
||||
assertTrue(result.equals("ABCDEFG"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenArray_whenFlatMapAndSubscribe_thenReturnUpperAndLowerCaseLetters() {
|
||||
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
|
||||
|
||||
Observable.from(letters)
|
||||
.flatMap((letter) -> {
|
||||
String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()};
|
||||
return Observable.from(returnStrings);
|
||||
})
|
||||
.subscribe((letter) -> {
|
||||
result += letter;
|
||||
});
|
||||
|
||||
assertTrue(result.equals("AaBbCcDdEeFfGg"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenArray_whenScanAndSubscribe_thenReturnTheSumOfAllLetters() {
|
||||
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
|
||||
|
||||
Observable.from(letters)
|
||||
.scan(new StringBuilder(), (buffer, nextLetter) -> {
|
||||
return buffer.append(nextLetter);
|
||||
})
|
||||
.subscribe((total) -> {
|
||||
result = total.toString();
|
||||
});
|
||||
|
||||
assertTrue(result.equals("abcdefg"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenArrayOfNumbers_whenGroupBy_thenCreateTwoGroupsBasedOnParity() {
|
||||
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
||||
String[] EVEN = {""};
|
||||
String[] ODD = {""};
|
||||
|
||||
Observable.from(numbers)
|
||||
.groupBy((i) -> {
|
||||
return 0 == (i % 2) ? "EVEN" : "ODD";
|
||||
})
|
||||
.subscribe((group) -> {
|
||||
group.subscribe((number) -> {
|
||||
if (group.getKey().toString().equals("EVEN")) {
|
||||
EVEN[0] += number;
|
||||
} else {
|
||||
ODD[0] += number;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
assertTrue(EVEN[0].equals("0246810"));
|
||||
assertTrue(ODD[0].equals("13579"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenArrayOfNumbers_whenFilter_thenGetAllOddNumbers() {
|
||||
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
||||
|
||||
Observable.from(numbers)
|
||||
.filter((i) -> {
|
||||
return (i % 2 == 1);
|
||||
})
|
||||
.subscribe((i) -> {
|
||||
result += i;
|
||||
});
|
||||
|
||||
assertTrue(result.equals("13579"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenEmptyObservable_whenDefaultIfEmpty_thenGetDefaultMessage() {
|
||||
|
||||
Observable.empty()
|
||||
.defaultIfEmpty("Observable is empty")
|
||||
.subscribe((s) -> {
|
||||
result += s;
|
||||
});
|
||||
|
||||
assertTrue(result.equals("Observable is empty"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservableFromArray_whenDefaultIfEmptyAndFirst_thenGetFirstLetterFromArray() {
|
||||
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
|
||||
|
||||
Observable.from(letters)
|
||||
.defaultIfEmpty("Observable is empty")
|
||||
.first()
|
||||
.subscribe((s) -> {
|
||||
result += s;
|
||||
});
|
||||
|
||||
assertTrue(result.equals("a"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservableFromArray_whenTakeWhile_thenGetSumOfNumbersFromCondition() {
|
||||
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
||||
final Integer[] sum = {0};
|
||||
|
||||
Observable.from(numbers)
|
||||
.takeWhile((i) -> {
|
||||
return i < 5;
|
||||
})
|
||||
.subscribe((s) -> {
|
||||
sum[0] += s;
|
||||
});
|
||||
|
||||
assertTrue(sum[0] == 10);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package com.baeldung.rxjava;
|
||||
|
||||
import org.junit.Test;
|
||||
import rx.Observable;
|
||||
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
|
||||
public class ResourceManagementTest {
|
||||
|
||||
@Test
|
||||
public void givenResource_whenUsingOberservable_thenCreatePrintDisposeResource() throws InterruptedException {
|
||||
|
||||
final String[] result = {""};
|
||||
|
||||
Observable<Character> values = Observable.using(
|
||||
//a factory function that creates a disposable resource
|
||||
() -> {
|
||||
String resource = "MyResource";
|
||||
return resource;
|
||||
},
|
||||
//a factory function that creates an Observable
|
||||
(resource) -> {
|
||||
return Observable.create(o -> {
|
||||
for (Character c : resource.toCharArray())
|
||||
o.onNext(c);
|
||||
o.onCompleted();
|
||||
});
|
||||
},
|
||||
//a function that disposes of the resource
|
||||
(resource) -> System.out.println("Disposed: " + resource)
|
||||
);
|
||||
|
||||
values.subscribe(
|
||||
v -> result[0] += v,
|
||||
e -> result[0] += e
|
||||
);
|
||||
|
||||
assertTrue(result[0].equals("MyResource"));
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package com.baeldung.rxjava;
|
||||
|
||||
import org.junit.Test;
|
||||
import rx.Observable;
|
||||
import rx.Single;
|
||||
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
|
||||
public class SingleTest {
|
||||
|
||||
@Test
|
||||
public void givenSingleObservable_whenSuccess_thenGetMessage() throws InterruptedException {
|
||||
final String[] result = {""};
|
||||
Single<String> single = Observable.just("Hello").toSingle()
|
||||
.doOnSuccess(
|
||||
(i) -> {
|
||||
result[0] += i;
|
||||
})
|
||||
.doOnError(
|
||||
(error) -> {
|
||||
throw new RuntimeException(error.getMessage());
|
||||
});
|
||||
single.subscribe();
|
||||
|
||||
assertTrue(result[0].equals("Hello"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package com.baeldung.rxjava;
|
||||
|
||||
import com.baelding.rxjava.SubjectImpl;
|
||||
import org.junit.Test;
|
||||
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
|
||||
public class SubjectTest {
|
||||
|
||||
@Test
|
||||
public void givenSubjectAndTwoSubscribers_whenSubscribeOnSubjectAfterLetterC_thenSecondSubscriberBeginsToPrint() throws InterruptedException {
|
||||
String result = SubjectImpl.subjectMethod();
|
||||
String subscribers = SubjectImpl.subscriber1[0] + SubjectImpl.subscriber2[0];
|
||||
|
||||
assertTrue(subscribers.equals(result));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue