YARN-4288. Fixed RMProxy to retry on IOException from local host. Contributed by Junping Du

(cherry picked from commit c41699965e)

Conflicts:

	hadoop-yarn-project/CHANGES.txt
This commit is contained in:
Jason Lowe 2016-06-08 17:57:59 +00:00
parent fdea6ee0d1
commit ab0679ed3f
6 changed files with 94 additions and 6 deletions

View File

@ -135,6 +135,16 @@ public class RetryPolicies {
return new RemoteExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap); return new RemoteExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap);
} }
/**
* A retry policy for exceptions other than RemoteException.
*/
public static final RetryPolicy retryOtherThanRemoteException(
RetryPolicy defaultPolicy,
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap) {
return new OtherThanRemoteExceptionDependentRetry(defaultPolicy,
exceptionToPolicyMap);
}
public static final RetryPolicy failoverOnNetworkException(int maxFailovers) { public static final RetryPolicy failoverOnNetworkException(int maxFailovers) {
return failoverOnNetworkException(TRY_ONCE_THEN_FAIL, maxFailovers); return failoverOnNetworkException(TRY_ONCE_THEN_FAIL, maxFailovers);
} }
@ -484,6 +494,36 @@ public class RetryPolicies {
} }
} }
static class OtherThanRemoteExceptionDependentRetry implements RetryPolicy {
private RetryPolicy defaultPolicy;
private Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap;
public OtherThanRemoteExceptionDependentRetry(RetryPolicy defaultPolicy,
Map<Class<? extends Exception>,
RetryPolicy> exceptionToPolicyMap) {
this.defaultPolicy = defaultPolicy;
this.exceptionToPolicyMap = exceptionToPolicyMap;
}
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isIdempotentOrAtMostOnce) throws Exception {
RetryPolicy policy = null;
// ignore Remote Exception
if (e instanceof RemoteException) {
// do nothing
} else {
policy = exceptionToPolicyMap.get(e.getClass());
}
if (policy == null) {
policy = defaultPolicy;
}
return policy.shouldRetry(
e, retries, failovers, isIdempotentOrAtMostOnce);
}
}
static class ExponentialBackoffRetry extends RetryLimited { static class ExponentialBackoffRetry extends RetryLimited {
public ExponentialBackoffRetry( public ExponentialBackoffRetry(

View File

@ -22,12 +22,14 @@ import static org.apache.hadoop.io.retry.RetryPolicies.RETRY_FOREVER;
import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL; import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL;
import static org.apache.hadoop.io.retry.RetryPolicies.retryByException; import static org.apache.hadoop.io.retry.RetryPolicies.retryByException;
import static org.apache.hadoop.io.retry.RetryPolicies.retryByRemoteException; import static org.apache.hadoop.io.retry.RetryPolicies.retryByRemoteException;
import static org.apache.hadoop.io.retry.RetryPolicies.retryOtherThanRemoteException;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep; import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep; import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep; import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry; import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -204,6 +206,27 @@ public class TestRetryProxy {
} }
} }
@Test
public void testRetryOtherThanRemoteException() throws Throwable {
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
Collections.<Class<? extends Exception>, RetryPolicy>singletonMap(
IOException.class, RETRY_FOREVER);
UnreliableInterface unreliable = (UnreliableInterface)
RetryProxy.create(UnreliableInterface.class, unreliableImpl,
retryOtherThanRemoteException(TRY_ONCE_THEN_FAIL,
exceptionToPolicyMap));
// should retry with local IOException.
unreliable.failsOnceWithIOException();
try {
// won't get retry on remote exception
unreliable.failsOnceWithRemoteException();
fail("Should fail");
} catch (RemoteException e) {
// expected
}
}
@Test @Test
public void testRetryInterruptible() throws Throwable { public void testRetryInterruptible() throws Throwable {
final UnreliableInterface unreliable = (UnreliableInterface) final UnreliableInterface unreliable = (UnreliableInterface)

View File

@ -26,6 +26,8 @@ class UnreliableImplementation implements UnreliableInterface {
private int failsOnceInvocationCount, private int failsOnceInvocationCount,
failsOnceWithValueInvocationCount, failsOnceWithValueInvocationCount,
failsOnceIOExceptionInvocationCount,
failsOnceRemoteExceptionInvocationCount,
failsTenTimesInvocationCount, failsTenTimesInvocationCount,
succeedsOnceThenFailsCount, succeedsOnceThenFailsCount,
succeedsOnceThenFailsIdempotentCount, succeedsOnceThenFailsIdempotentCount,
@ -89,6 +91,21 @@ class UnreliableImplementation implements UnreliableInterface {
return true; return true;
} }
@Override
public void failsOnceWithIOException() throws IOException {
if (failsOnceIOExceptionInvocationCount++ == 0) {
throw new IOException("test exception for failsOnceWithIOException");
}
}
@Override
public void failsOnceWithRemoteException() throws RemoteException {
if (failsOnceRemoteExceptionInvocationCount++ == 0) {
throw new RemoteException(IOException.class.getName(),
"test exception for failsOnceWithRemoteException");
}
}
@Override @Override
public void failsTenTimesThenSucceeds() throws UnreliableException { public void failsTenTimesThenSucceeds() throws UnreliableException {
if (failsTenTimesInvocationCount++ < 10) { if (failsTenTimesInvocationCount++ < 10) {

View File

@ -54,6 +54,9 @@ public interface UnreliableInterface {
void alwaysFailsWithFatalException() throws FatalException; void alwaysFailsWithFatalException() throws FatalException;
void alwaysFailsWithRemoteFatalException() throws RemoteException; void alwaysFailsWithRemoteFatalException() throws RemoteException;
void failsOnceWithIOException() throws IOException;
void failsOnceWithRemoteException() throws RemoteException;
void failsOnceThenSucceeds() throws UnreliableException; void failsOnceThenSucceeds() throws UnreliableException;
boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException; boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException;

View File

@ -148,6 +148,9 @@ Release 2.7.3 - UNRELEASED
YARN-5206. RegistrySecurity includes id:pass in exception text if YARN-5206. RegistrySecurity includes id:pass in exception text if
considered invalid (Steve Loughran via jlowe) considered invalid (Steve Loughran via jlowe)
YARN-4288. Fixed RMProxy to retry on IOException from local host.
(Junping Du via jianhe)
Release 2.7.2 - 2016-01-25 Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -249,8 +249,10 @@ public class RMProxy<T> {
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy); exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy); exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy); exceptionToPolicyMap.put(SocketException.class, retryPolicy);
// YARN-4288: local IOException is also possible.
return RetryPolicies.retryByException( exceptionToPolicyMap.put(IOException.class, retryPolicy);
// Not retry on remote IO exception.
return RetryPolicies.retryOtherThanRemoteException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
} }
} }