parent
5ba675be35
commit
43ce43cc36
|
@ -3,9 +3,13 @@ package com.baelding.rxjava.operator;
|
||||||
import rx.Observable.Operator;
|
import rx.Observable.Operator;
|
||||||
import rx.Subscriber;
|
import rx.Subscriber;
|
||||||
|
|
||||||
public class CleanString implements Operator<String, String> {
|
public class ToCleanString implements Operator<String, String> {
|
||||||
|
|
||||||
public CleanString() {
|
public static ToCleanString toCleanString() {
|
||||||
|
return new ToCleanString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ToCleanString() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,10 +2,14 @@ package com.baelding.rxjava.operator;
|
||||||
|
|
||||||
import rx.Observable;
|
import rx.Observable;
|
||||||
import rx.Observable.Transformer;
|
import rx.Observable.Transformer;
|
||||||
import rx.functions.Func1;
|
|
||||||
|
|
||||||
public class ToLength implements Transformer<String, Integer> {
|
public class ToLength implements Transformer<String, Integer> {
|
||||||
public ToLength() {
|
|
||||||
|
public static ToLength toLength() {
|
||||||
|
return new ToLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ToLength() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,7 @@ public class RxJavaBackpressureLongRunningUnitTest {
|
||||||
public void givenHotObservable_whenBackpressureNotDefined_shouldTrowException() {
|
public void givenHotObservable_whenBackpressureNotDefined_shouldTrowException() {
|
||||||
// given
|
// given
|
||||||
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
|
||||||
PublishSubject<Integer> source = PublishSubject.<Integer> create();
|
PublishSubject<Integer> source = PublishSubject.create();
|
||||||
|
|
||||||
source.observeOn(Schedulers.computation()).subscribe(testSubscriber);
|
source.observeOn(Schedulers.computation()).subscribe(testSubscriber);
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ public class RxJavaBackpressureLongRunningUnitTest {
|
||||||
public void givenHotObservable_whenWindowIsDefined_shouldNotThrowException() {
|
public void givenHotObservable_whenWindowIsDefined_shouldNotThrowException() {
|
||||||
// given
|
// given
|
||||||
TestSubscriber<Observable<Integer>> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<Observable<Integer>> testSubscriber = new TestSubscriber<>();
|
||||||
PublishSubject<Integer> source = PublishSubject.<Integer> create();
|
PublishSubject<Integer> source = PublishSubject.create();
|
||||||
|
|
||||||
// when
|
// when
|
||||||
source.window(500).observeOn(Schedulers.computation()).subscribe(testSubscriber);
|
source.window(500).observeOn(Schedulers.computation()).subscribe(testSubscriber);
|
||||||
|
@ -67,7 +67,7 @@ public class RxJavaBackpressureLongRunningUnitTest {
|
||||||
public void givenHotObservable_whenBufferIsDefined_shouldNotThrowException() {
|
public void givenHotObservable_whenBufferIsDefined_shouldNotThrowException() {
|
||||||
// given
|
// given
|
||||||
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<>();
|
||||||
PublishSubject<Integer> source = PublishSubject.<Integer> create();
|
PublishSubject<Integer> source = PublishSubject.create();
|
||||||
|
|
||||||
// when
|
// when
|
||||||
source.buffer(1024).observeOn(Schedulers.computation()).subscribe(testSubscriber);
|
source.buffer(1024).observeOn(Schedulers.computation()).subscribe(testSubscriber);
|
||||||
|
@ -84,7 +84,7 @@ public class RxJavaBackpressureLongRunningUnitTest {
|
||||||
public void givenHotObservable_whenSkippingOperationIsDefined_shouldNotThrowException() {
|
public void givenHotObservable_whenSkippingOperationIsDefined_shouldNotThrowException() {
|
||||||
// given
|
// given
|
||||||
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
|
||||||
PublishSubject<Integer> source = PublishSubject.<Integer> create();
|
PublishSubject<Integer> source = PublishSubject.create();
|
||||||
|
|
||||||
// when
|
// when
|
||||||
source.sample(100, TimeUnit.MILLISECONDS)
|
source.sample(100, TimeUnit.MILLISECONDS)
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package com.baeldung.rxjava;
|
package com.baeldung.rxjava;
|
||||||
|
|
||||||
|
import static com.baelding.rxjava.operator.ToCleanString.toCleanString;
|
||||||
|
import static com.baelding.rxjava.operator.ToLength.toLength;
|
||||||
import static org.hamcrest.Matchers.hasItems;
|
import static org.hamcrest.Matchers.hasItems;
|
||||||
import static org.hamcrest.Matchers.hasSize;
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
@ -16,7 +18,7 @@ import rx.Observable.Operator;
|
||||||
import rx.Observable.Transformer;
|
import rx.Observable.Transformer;
|
||||||
import rx.Subscriber;
|
import rx.Subscriber;
|
||||||
|
|
||||||
import com.baelding.rxjava.operator.CleanString;
|
import com.baelding.rxjava.operator.ToCleanString;
|
||||||
import com.baelding.rxjava.operator.ToLength;
|
import com.baelding.rxjava.operator.ToLength;
|
||||||
|
|
||||||
public class RxJavaCustomOperatorUnitTest {
|
public class RxJavaCustomOperatorUnitTest {
|
||||||
|
@ -24,10 +26,10 @@ public class RxJavaCustomOperatorUnitTest {
|
||||||
@Test
|
@Test
|
||||||
public void whenUseCleanStringOperator_thenSuccess() {
|
public void whenUseCleanStringOperator_thenSuccess() {
|
||||||
final List<String> list = Arrays.asList("john_1", "tom-3");
|
final List<String> list = Arrays.asList("john_1", "tom-3");
|
||||||
final List<String> results = new ArrayList<String>();
|
final List<String> results = new ArrayList<>();
|
||||||
|
|
||||||
final Observable<String> observable = Observable.from(list)
|
final Observable<String> observable = Observable.from(list)
|
||||||
.lift(new CleanString());
|
.lift(toCleanString());
|
||||||
|
|
||||||
// when
|
// when
|
||||||
observable.subscribe(results::add);
|
observable.subscribe(results::add);
|
||||||
|
@ -41,10 +43,10 @@ public class RxJavaCustomOperatorUnitTest {
|
||||||
@Test
|
@Test
|
||||||
public void whenUseToLengthOperator_thenSuccess() {
|
public void whenUseToLengthOperator_thenSuccess() {
|
||||||
final List<String> list = Arrays.asList("john", "tom");
|
final List<String> list = Arrays.asList("john", "tom");
|
||||||
final List<Integer> results = new ArrayList<Integer>();
|
final List<Integer> results = new ArrayList<>();
|
||||||
|
|
||||||
final Observable<Integer> observable = Observable.from(list)
|
final Observable<Integer> observable = Observable.from(list)
|
||||||
.compose(new ToLength());
|
.compose(toLength());
|
||||||
|
|
||||||
// when
|
// when
|
||||||
observable.subscribe(results::add);
|
observable.subscribe(results::add);
|
||||||
|
@ -57,33 +59,31 @@ public class RxJavaCustomOperatorUnitTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void whenUseFunctionOperator_thenSuccess() {
|
public void whenUseFunctionOperator_thenSuccess() {
|
||||||
final Operator<String, String> cleanStringFn = subscriber -> {
|
final Operator<String, String> cleanStringFn = subscriber -> new Subscriber<String>(subscriber) {
|
||||||
return new Subscriber<String>(subscriber) {
|
@Override
|
||||||
@Override
|
public void onCompleted() {
|
||||||
public void onCompleted() {
|
if (!subscriber.isUnsubscribed()) {
|
||||||
if (!subscriber.isUnsubscribed()) {
|
subscriber.onCompleted();
|
||||||
subscriber.onCompleted();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable t) {
|
public void onError(Throwable t) {
|
||||||
if (!subscriber.isUnsubscribed()) {
|
if (!subscriber.isUnsubscribed()) {
|
||||||
subscriber.onError(t);
|
subscriber.onError(t);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onNext(String str) {
|
public void onNext(String str) {
|
||||||
if (!subscriber.isUnsubscribed()) {
|
if (!subscriber.isUnsubscribed()) {
|
||||||
final String result = str.replaceAll("[^A-Za-z0-9]", "");
|
final String result = str.replaceAll("[^A-Za-z0-9]", "");
|
||||||
subscriber.onNext(result);
|
subscriber.onNext(result);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final List<String> results = new ArrayList<String>();
|
final List<String> results = new ArrayList<>();
|
||||||
Observable.from(Arrays.asList("ap_p-l@e", "or-an?ge"))
|
Observable.from(Arrays.asList("ap_p-l@e", "or-an?ge"))
|
||||||
.lift(cleanStringFn)
|
.lift(cleanStringFn)
|
||||||
.subscribe(results::add);
|
.subscribe(results::add);
|
||||||
|
@ -97,7 +97,7 @@ public class RxJavaCustomOperatorUnitTest {
|
||||||
public void whenUseFunctionTransformer_thenSuccess() {
|
public void whenUseFunctionTransformer_thenSuccess() {
|
||||||
final Transformer<String, Integer> toLengthFn = source -> source.map(String::length);
|
final Transformer<String, Integer> toLengthFn = source -> source.map(String::length);
|
||||||
|
|
||||||
final List<Integer> results = new ArrayList<Integer>();
|
final List<Integer> results = new ArrayList<>();
|
||||||
Observable.from(Arrays.asList("apple", "orange"))
|
Observable.from(Arrays.asList("apple", "orange"))
|
||||||
.compose(toLengthFn)
|
.compose(toLengthFn)
|
||||||
.subscribe(results::add);
|
.subscribe(results::add);
|
||||||
|
|
Loading…
Reference in New Issue