YARN-3695. ServerProxy (NMProxy, etc.) shouldn't retry forever for non network exception. Contributed by Raju Bairishetti

This commit is contained in:
Jian He 2015-06-29 13:37:32 -07:00
parent fad291ea6d
commit 62e583c7dc
3 changed files with 70 additions and 35 deletions

View File

@ -568,6 +568,9 @@ Release 2.8.0 - UNRELEASED
YARN-3860. rmadmin -transitionToActive should check the state of non-target node. YARN-3860. rmadmin -transitionToActive should check the state of non-target node.
(Masatake Iwasaki via junping_du) (Masatake Iwasaki via junping_du)
YARN-3695. ServerProxy (NMProxy, etc.) shouldn't retry forever for non
network exception. (Raju Bairishetti via jianhe)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -53,19 +53,22 @@ public class ServerProxy {
long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime); long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
long retryIntervalMS = long retryIntervalMS =
conf.getLong(connectRetryIntervalStr, defRetryInterval); conf.getLong(connectRetryIntervalStr, defRetryInterval);
if (maxWaitTime == -1) {
// wait forever.
return RetryPolicies.RETRY_FOREVER;
}
Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. " Preconditions.checkArgument((maxWaitTime == -1 || maxWaitTime > 0),
+ maxWaitTimeStr + " should be a positive value."); "Invalid Configuration. " + maxWaitTimeStr + " should be either"
+ " positive value or -1.");
Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. " Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
+ connectRetryIntervalStr + "should be a positive value."); + connectRetryIntervalStr + "should be a positive value.");
RetryPolicy retryPolicy = RetryPolicy retryPolicy = null;
if (maxWaitTime == -1) {
// wait forever.
retryPolicy = RetryPolicies.RETRY_FOREVER;
} else {
retryPolicy =
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime, RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
retryIntervalMS, TimeUnit.MILLISECONDS); retryIntervalMS, TimeUnit.MILLISECONDS);
}
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>(); new HashMap<Class<? extends Exception>, RetryPolicy>();

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.retry.UnreliableInterface;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -58,8 +59,8 @@ public class TestNMProxy extends BaseContainerManagerTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 10000); containerManager.start();
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 100); containerManager.setBlockNewContainerRequests(false);
} }
@Override @Override
@ -76,9 +77,15 @@ public class TestNMProxy extends BaseContainerManagerTest {
if (shouldThrowNMNotYetReadyException) { if (shouldThrowNMNotYetReadyException) {
// This causes super to throw an NMNotYetReadyException // This causes super to throw an NMNotYetReadyException
containerManager.setBlockNewContainerRequests(true); containerManager.setBlockNewContainerRequests(true);
} else {
if (isRetryPolicyRetryForEver()) {
// Throw non network exception
throw new IOException(
new UnreliableInterface.UnreliableException());
} else { } else {
throw new java.net.ConnectException("start container exception"); throw new java.net.ConnectException("start container exception");
} }
}
} else { } else {
// This stops super from throwing an NMNotYetReadyException // This stops super from throwing an NMNotYetReadyException
containerManager.setBlockNewContainerRequests(false); containerManager.setBlockNewContainerRequests(false);
@ -86,6 +93,11 @@ public class TestNMProxy extends BaseContainerManagerTest {
return super.startContainers(requests); return super.startContainers(requests);
} }
private boolean isRetryPolicyRetryForEver() {
return conf.getLong(
YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 1000) == -1;
}
@Override @Override
public StopContainersResponse stopContainers( public StopContainersResponse stopContainers(
StopContainersRequest requests) throws YarnException, IOException { StopContainersRequest requests) throws YarnException, IOException {
@ -111,29 +123,12 @@ public class TestNMProxy extends BaseContainerManagerTest {
@Test(timeout = 20000) @Test(timeout = 20000)
public void testNMProxyRetry() throws Exception { public void testNMProxyRetry() throws Exception {
containerManager.start(); conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 10000);
containerManager.setBlockNewContainerRequests(false); conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 100);
StartContainersRequest allRequests = StartContainersRequest allRequests =
Records.newRecord(StartContainersRequest.class); Records.newRecord(StartContainersRequest.class);
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
org.apache.hadoop.yarn.api.records.Token nmToken = ContainerManagementProtocol proxy = getNMProxy();
context.getNMTokenSecretManager().createNMToken(attemptId,
context.getNodeId(), user);
final InetSocketAddress address =
conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS,
YarnConfiguration.DEFAULT_NM_PORT);
Token<NMTokenIdentifier> token =
ConverterUtils.convertFromYarn(nmToken,
SecurityUtil.buildTokenService(address));
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.addToken(token);
ContainerManagementProtocol proxy =
NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
YarnRPC.create(conf), address);
retryCount = 0; retryCount = 0;
shouldThrowNMNotYetReadyException = false; shouldThrowNMNotYetReadyException = false;
@ -156,4 +151,38 @@ public class TestNMProxy extends BaseContainerManagerTest {
proxy.startContainers(allRequests); proxy.startContainers(allRequests);
Assert.assertEquals(5, retryCount); Assert.assertEquals(5, retryCount);
} }
@Test(timeout = 20000, expected = IOException.class)
public void testShouldNotRetryForeverForNonNetworkExceptionsOnNMConnections()
throws Exception {
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, -1);
StartContainersRequest allRequests =
Records.newRecord(StartContainersRequest.class);
ContainerManagementProtocol proxy = getNMProxy();
shouldThrowNMNotYetReadyException = false;
retryCount = 0;
proxy.startContainers(allRequests);
}
private ContainerManagementProtocol getNMProxy() {
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
org.apache.hadoop.yarn.api.records.Token nmToken =
context.getNMTokenSecretManager().createNMToken(attemptId,
context.getNodeId(), user);
final InetSocketAddress address =
conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS,
YarnConfiguration.DEFAULT_NM_PORT);
Token<NMTokenIdentifier> token =
ConverterUtils.convertFromYarn(nmToken,
SecurityUtil.buildTokenService(address));
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.addToken(token);
return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
YarnRPC.create(conf), address);
}
} }