BAEL-1058 (#2606)
This commit is contained in:
parent
19f39c141e
commit
de07e1f926
@ -19,6 +19,12 @@
|
|||||||
<artifactId>rxjava</artifactId>
|
<artifactId>rxjava</artifactId>
|
||||||
<version>${rx.java.version}</version>
|
<version>${rx.java.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.reactivex.rxjava2</groupId>
|
||||||
|
<artifactId>rxjava</artifactId>
|
||||||
|
<version>2.1.3</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
@ -0,0 +1,141 @@
|
|||||||
|
package com.baeldung.rxjava.onerror;
|
||||||
|
|
||||||
|
import io.reactivex.Observable;
|
||||||
|
import io.reactivex.exceptions.CompositeException;
|
||||||
|
import io.reactivex.observers.TestObserver;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author aiet
|
||||||
|
*/
|
||||||
|
public class ExceptionHandlingTest {
|
||||||
|
|
||||||
|
private Error UNKNOWN_ERROR = new Error("unknown error");
|
||||||
|
private Exception UNKNOWN_EXCEPTION = new Exception("unknown exception");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenHandleOnErrorReturn_thenResumed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.onErrorReturn(Throwable::getMessage)
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertNoErrors();
|
||||||
|
testObserver.assertComplete();
|
||||||
|
testObserver.assertValueCount(1);
|
||||||
|
testObserver.assertValue("unknown error");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenHandleOnErrorResume_thenResumed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.onErrorResumeNext(Observable.just("one", "two"))
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertNoErrors();
|
||||||
|
testObserver.assertComplete();
|
||||||
|
testObserver.assertValueCount(2);
|
||||||
|
testObserver.assertValues("one", "two");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenHandleOnErrorResumeItem_thenResumed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.onErrorReturnItem("singleValue")
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertNoErrors();
|
||||||
|
testObserver.assertComplete();
|
||||||
|
testObserver.assertValueCount(1);
|
||||||
|
testObserver.assertValue("singleValue");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenHandleOnErrorResumeFunc_thenResumed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.onErrorResumeNext(throwable -> {
|
||||||
|
return Observable.just(throwable.getMessage(), "nextValue");
|
||||||
|
})
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertNoErrors();
|
||||||
|
testObserver.assertComplete();
|
||||||
|
testObserver.assertValueCount(2);
|
||||||
|
testObserver.assertValues("unknown error", "nextValue");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenChangeStateOnError_thenErrorThrown() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
final AtomicBoolean state = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.doOnError(throwable -> state.set(true))
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertError(UNKNOWN_ERROR);
|
||||||
|
testObserver.assertNotComplete();
|
||||||
|
testObserver.assertNoValues();
|
||||||
|
assertTrue("state should be changed", state.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenExceptionOccurOnError_thenCompositeExceptionThrown() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.doOnError(throwable -> {
|
||||||
|
throw new RuntimeException("unexcepted");
|
||||||
|
})
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertError(CompositeException.class);
|
||||||
|
testObserver.assertNotComplete();
|
||||||
|
testObserver.assertNoValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndException_whenHandleOnException_thenResumed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_EXCEPTION)
|
||||||
|
.onExceptionResumeNext(Observable.just("exceptionResumed"))
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertNoErrors();
|
||||||
|
testObserver.assertComplete();
|
||||||
|
testObserver.assertValueCount(1);
|
||||||
|
testObserver.assertValue("exceptionResumed");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenHandleOnException_thenNotResumed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.onExceptionResumeNext(Observable.just("exceptionResumed"))
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertError(UNKNOWN_ERROR);
|
||||||
|
testObserver.assertNotComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,149 @@
|
|||||||
|
package com.baeldung.rxjava.onerror;
|
||||||
|
|
||||||
|
import io.reactivex.Observable;
|
||||||
|
import io.reactivex.observers.TestObserver;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author aiet
|
||||||
|
*/
|
||||||
|
public class OnErrorRetryTest {
|
||||||
|
|
||||||
|
private Error UNKNOWN_ERROR = new Error("unknown error");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenRetryOnError_thenRetryConfirmed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
AtomicInteger atomicCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(() -> {
|
||||||
|
atomicCounter.incrementAndGet();
|
||||||
|
return UNKNOWN_ERROR;
|
||||||
|
})
|
||||||
|
.retry(1)
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertError(UNKNOWN_ERROR);
|
||||||
|
testObserver.assertNotComplete();
|
||||||
|
testObserver.assertNoValues();
|
||||||
|
assertTrue("should call twice", atomicCounter.get() == 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenRetryConditionallyOnError_thenRetryConfirmed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
|
||||||
|
AtomicInteger atomicCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(() -> {
|
||||||
|
atomicCounter.incrementAndGet();
|
||||||
|
return UNKNOWN_ERROR;
|
||||||
|
})
|
||||||
|
.retry((integer, throwable) -> integer < 4)
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertError(UNKNOWN_ERROR);
|
||||||
|
testObserver.assertNotComplete();
|
||||||
|
testObserver.assertNoValues();
|
||||||
|
assertTrue("should call 4 times", atomicCounter.get() == 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenRetryUntilOnError_thenRetryConfirmed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
AtomicInteger atomicCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.retryUntil(() -> atomicCounter.incrementAndGet() > 3)
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertError(UNKNOWN_ERROR);
|
||||||
|
testObserver.assertNotComplete();
|
||||||
|
testObserver.assertNoValues();
|
||||||
|
assertTrue("should call 4 times", atomicCounter.get() == 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenRetryWhenOnError_thenRetryConfirmed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
Exception noretryException = new Exception("don't retry");
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.retryWhen(throwableObservable -> Observable.error(noretryException))
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertError(noretryException);
|
||||||
|
testObserver.assertNotComplete();
|
||||||
|
testObserver.assertNoValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenRetryWhenOnError_thenCompleted() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
AtomicInteger atomicCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(() -> {
|
||||||
|
atomicCounter.incrementAndGet();
|
||||||
|
return UNKNOWN_ERROR;
|
||||||
|
})
|
||||||
|
.retryWhen(throwableObservable -> Observable.empty())
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertNoErrors();
|
||||||
|
testObserver.assertComplete();
|
||||||
|
testObserver.assertNoValues();
|
||||||
|
assertTrue("should not retry", atomicCounter.get() == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenRetryWhenOnError_thenResubscribed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
AtomicInteger atomicCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(() -> {
|
||||||
|
atomicCounter.incrementAndGet();
|
||||||
|
return UNKNOWN_ERROR;
|
||||||
|
})
|
||||||
|
.retryWhen(throwableObservable -> Observable.just("anything"))
|
||||||
|
.subscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertNoErrors();
|
||||||
|
testObserver.assertComplete();
|
||||||
|
testObserver.assertNoValues();
|
||||||
|
assertTrue("should retry once", atomicCounter.get() == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenSubscriberAndError_whenRetryWhenForMultipleTimesOnError_thenResumed() {
|
||||||
|
TestObserver testObserver = new TestObserver();
|
||||||
|
long before = System.currentTimeMillis();
|
||||||
|
|
||||||
|
Observable
|
||||||
|
.error(UNKNOWN_ERROR)
|
||||||
|
.retryWhen(throwableObservable -> throwableObservable
|
||||||
|
.zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
|
||||||
|
.flatMap(integer -> {
|
||||||
|
System.out.println("retried " + integer + " times");
|
||||||
|
return Observable.timer(integer, TimeUnit.SECONDS);
|
||||||
|
}))
|
||||||
|
.blockingSubscribe(testObserver);
|
||||||
|
|
||||||
|
testObserver.assertNoErrors();
|
||||||
|
testObserver.assertComplete();
|
||||||
|
testObserver.assertNoValues();
|
||||||
|
long secondsElapsed = (System.currentTimeMillis() - before) / 1000;
|
||||||
|
assertTrue("6 seconds should elapse", secondsElapsed == 6);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user