YARN-4288. Fixed RMProxy to retry on IOException from local host. Contributed by Junping Du

(cherry picked from commit c41699965e)
This commit is contained in:
Jian He 2015-10-29 00:00:48 -07:00
parent e3fcb711d6
commit 7d109ef5b5
6 changed files with 94 additions and 6 deletions

View File

@ -140,6 +140,16 @@ public class RetryPolicies {
return new RemoteExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap); return new RemoteExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap);
} }
/**
* A retry policy for exceptions other than RemoteException.
*/
public static final RetryPolicy retryOtherThanRemoteException(
RetryPolicy defaultPolicy,
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap) {
return new OtherThanRemoteExceptionDependentRetry(defaultPolicy,
exceptionToPolicyMap);
}
public static final RetryPolicy failoverOnNetworkException(int maxFailovers) { public static final RetryPolicy failoverOnNetworkException(int maxFailovers) {
return failoverOnNetworkException(TRY_ONCE_THEN_FAIL, maxFailovers); return failoverOnNetworkException(TRY_ONCE_THEN_FAIL, maxFailovers);
} }
@ -490,6 +500,36 @@ public class RetryPolicies {
} }
} }
static class OtherThanRemoteExceptionDependentRetry implements RetryPolicy {
private RetryPolicy defaultPolicy;
private Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap;
public OtherThanRemoteExceptionDependentRetry(RetryPolicy defaultPolicy,
Map<Class<? extends Exception>,
RetryPolicy> exceptionToPolicyMap) {
this.defaultPolicy = defaultPolicy;
this.exceptionToPolicyMap = exceptionToPolicyMap;
}
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isIdempotentOrAtMostOnce) throws Exception {
RetryPolicy policy = null;
// ignore Remote Exception
if (e instanceof RemoteException) {
// do nothing
} else {
policy = exceptionToPolicyMap.get(e.getClass());
}
if (policy == null) {
policy = defaultPolicy;
}
return policy.shouldRetry(
e, retries, failovers, isIdempotentOrAtMostOnce);
}
}
static class ExponentialBackoffRetry extends RetryLimited { static class ExponentialBackoffRetry extends RetryLimited {
public ExponentialBackoffRetry( public ExponentialBackoffRetry(

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.io.retry.RetryPolicies.RETRY_FOREVER;
import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL; import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL;
import static org.apache.hadoop.io.retry.RetryPolicies.retryByException; import static org.apache.hadoop.io.retry.RetryPolicies.retryByException;
import static org.apache.hadoop.io.retry.RetryPolicies.retryByRemoteException; import static org.apache.hadoop.io.retry.RetryPolicies.retryByRemoteException;
import static org.apache.hadoop.io.retry.RetryPolicies.retryOtherThanRemoteException;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep; import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep; import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep; import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
@ -29,6 +30,7 @@ import static org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSlee
import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry; import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -215,6 +217,27 @@ public class TestRetryProxy {
} }
} }
@Test
public void testRetryOtherThanRemoteException() throws Throwable {
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
Collections.<Class<? extends Exception>, RetryPolicy>singletonMap(
IOException.class, RETRY_FOREVER);
UnreliableInterface unreliable = (UnreliableInterface)
RetryProxy.create(UnreliableInterface.class, unreliableImpl,
retryOtherThanRemoteException(TRY_ONCE_THEN_FAIL,
exceptionToPolicyMap));
// should retry with local IOException.
unreliable.failsOnceWithIOException();
try {
// won't get retry on remote exception
unreliable.failsOnceWithRemoteException();
fail("Should fail");
} catch (RemoteException e) {
// expected
}
}
@Test @Test
public void testRetryInterruptible() throws Throwable { public void testRetryInterruptible() throws Throwable {
final UnreliableInterface unreliable = (UnreliableInterface) final UnreliableInterface unreliable = (UnreliableInterface)

View File

@ -26,6 +26,8 @@ class UnreliableImplementation implements UnreliableInterface {
private int failsOnceInvocationCount, private int failsOnceInvocationCount,
failsOnceWithValueInvocationCount, failsOnceWithValueInvocationCount,
failsOnceIOExceptionInvocationCount,
failsOnceRemoteExceptionInvocationCount,
failsTenTimesInvocationCount, failsTenTimesInvocationCount,
succeedsOnceThenFailsCount, succeedsOnceThenFailsCount,
succeedsOnceThenFailsIdempotentCount, succeedsOnceThenFailsIdempotentCount,
@ -89,6 +91,21 @@ class UnreliableImplementation implements UnreliableInterface {
return true; return true;
} }
@Override
public void failsOnceWithIOException() throws IOException {
if (failsOnceIOExceptionInvocationCount++ == 0) {
throw new IOException("test exception for failsOnceWithIOException");
}
}
@Override
public void failsOnceWithRemoteException() throws RemoteException {
if (failsOnceRemoteExceptionInvocationCount++ == 0) {
throw new RemoteException(IOException.class.getName(),
"test exception for failsOnceWithRemoteException");
}
}
@Override @Override
public void failsTenTimesThenSucceeds() throws UnreliableException { public void failsTenTimesThenSucceeds() throws UnreliableException {
if (failsTenTimesInvocationCount++ < 10) { if (failsTenTimesInvocationCount++ < 10) {

View File

@ -54,6 +54,9 @@ public interface UnreliableInterface {
void alwaysFailsWithFatalException() throws FatalException; void alwaysFailsWithFatalException() throws FatalException;
void alwaysFailsWithRemoteFatalException() throws RemoteException; void alwaysFailsWithRemoteFatalException() throws RemoteException;
void failsOnceWithIOException() throws IOException;
void failsOnceWithRemoteException() throws RemoteException;
void failsOnceThenSucceeds() throws UnreliableException; void failsOnceThenSucceeds() throws UnreliableException;
boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException; boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException;

View File

@ -982,6 +982,9 @@ Release 2.8.0 - UNRELEASED
YARN-4130. Duplicate declaration of ApplicationId in RMAppManager#submitApplication method. YARN-4130. Duplicate declaration of ApplicationId in RMAppManager#submitApplication method.
(Kai Sasaki via rohithsharmaks) (Kai Sasaki via rohithsharmaks)
YARN-4288. Fixed RMProxy to retry on IOException from local host.
(Junping Du via jianhe)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -250,8 +250,10 @@ public class RMProxy<T> {
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy); exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy); exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy); exceptionToPolicyMap.put(SocketException.class, retryPolicy);
// YARN-4288: local IOException is also possible.
return RetryPolicies.retryByException( exceptionToPolicyMap.put(IOException.class, retryPolicy);
// Not retry on remote IO exception.
return RetryPolicies.retryOtherThanRemoteException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
} }
} }