YARN-3695. ServerProxy (NMProxy, etc.) shouldn't retry forever for non network exception. Contributed by Raju Bairishetti
(cherry picked from commit 62e583c7dc
)
This commit is contained in:
parent
5ff14f2342
commit
0100995c5e
|
@ -520,6 +520,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3860. rmadmin -transitionToActive should check the state of non-target node.
|
YARN-3860. rmadmin -transitionToActive should check the state of non-target node.
|
||||||
(Masatake Iwasaki via junping_du)
|
(Masatake Iwasaki via junping_du)
|
||||||
|
|
||||||
|
YARN-3695. ServerProxy (NMProxy, etc.) shouldn't retry forever for non
|
||||||
|
network exception. (Raju Bairishetti via jianhe)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -53,19 +53,22 @@ public class ServerProxy {
|
||||||
long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
|
long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
|
||||||
long retryIntervalMS =
|
long retryIntervalMS =
|
||||||
conf.getLong(connectRetryIntervalStr, defRetryInterval);
|
conf.getLong(connectRetryIntervalStr, defRetryInterval);
|
||||||
if (maxWaitTime == -1) {
|
|
||||||
// wait forever.
|
|
||||||
return RetryPolicies.RETRY_FOREVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. "
|
Preconditions.checkArgument((maxWaitTime == -1 || maxWaitTime > 0),
|
||||||
+ maxWaitTimeStr + " should be a positive value.");
|
"Invalid Configuration. " + maxWaitTimeStr + " should be either"
|
||||||
|
+ " positive value or -1.");
|
||||||
Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
|
Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
|
||||||
+ connectRetryIntervalStr + "should be a positive value.");
|
+ connectRetryIntervalStr + "should be a positive value.");
|
||||||
|
|
||||||
RetryPolicy retryPolicy =
|
RetryPolicy retryPolicy = null;
|
||||||
|
if (maxWaitTime == -1) {
|
||||||
|
// wait forever.
|
||||||
|
retryPolicy = RetryPolicies.RETRY_FOREVER;
|
||||||
|
} else {
|
||||||
|
retryPolicy =
|
||||||
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
|
RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
|
||||||
retryIntervalMS, TimeUnit.MILLISECONDS);
|
retryIntervalMS, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
||||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
import org.apache.hadoop.io.retry.UnreliableInterface;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -58,8 +59,8 @@ public class TestNMProxy extends BaseContainerManagerTest {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 10000);
|
containerManager.start();
|
||||||
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 100);
|
containerManager.setBlockNewContainerRequests(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -76,9 +77,15 @@ public class TestNMProxy extends BaseContainerManagerTest {
|
||||||
if (shouldThrowNMNotYetReadyException) {
|
if (shouldThrowNMNotYetReadyException) {
|
||||||
// This causes super to throw an NMNotYetReadyException
|
// This causes super to throw an NMNotYetReadyException
|
||||||
containerManager.setBlockNewContainerRequests(true);
|
containerManager.setBlockNewContainerRequests(true);
|
||||||
|
} else {
|
||||||
|
if (isRetryPolicyRetryForEver()) {
|
||||||
|
// Throw non network exception
|
||||||
|
throw new IOException(
|
||||||
|
new UnreliableInterface.UnreliableException());
|
||||||
} else {
|
} else {
|
||||||
throw new java.net.ConnectException("start container exception");
|
throw new java.net.ConnectException("start container exception");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// This stops super from throwing an NMNotYetReadyException
|
// This stops super from throwing an NMNotYetReadyException
|
||||||
containerManager.setBlockNewContainerRequests(false);
|
containerManager.setBlockNewContainerRequests(false);
|
||||||
|
@ -86,6 +93,11 @@ public class TestNMProxy extends BaseContainerManagerTest {
|
||||||
return super.startContainers(requests);
|
return super.startContainers(requests);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isRetryPolicyRetryForEver() {
|
||||||
|
return conf.getLong(
|
||||||
|
YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 1000) == -1;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StopContainersResponse stopContainers(
|
public StopContainersResponse stopContainers(
|
||||||
StopContainersRequest requests) throws YarnException, IOException {
|
StopContainersRequest requests) throws YarnException, IOException {
|
||||||
|
@ -111,29 +123,12 @@ public class TestNMProxy extends BaseContainerManagerTest {
|
||||||
|
|
||||||
@Test(timeout = 20000)
|
@Test(timeout = 20000)
|
||||||
public void testNMProxyRetry() throws Exception {
|
public void testNMProxyRetry() throws Exception {
|
||||||
containerManager.start();
|
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 10000);
|
||||||
containerManager.setBlockNewContainerRequests(false);
|
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 100);
|
||||||
StartContainersRequest allRequests =
|
StartContainersRequest allRequests =
|
||||||
Records.newRecord(StartContainersRequest.class);
|
Records.newRecord(StartContainersRequest.class);
|
||||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
||||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
|
||||||
|
|
||||||
org.apache.hadoop.yarn.api.records.Token nmToken =
|
ContainerManagementProtocol proxy = getNMProxy();
|
||||||
context.getNMTokenSecretManager().createNMToken(attemptId,
|
|
||||||
context.getNodeId(), user);
|
|
||||||
final InetSocketAddress address =
|
|
||||||
conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
|
|
||||||
YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_NM_PORT);
|
|
||||||
Token<NMTokenIdentifier> token =
|
|
||||||
ConverterUtils.convertFromYarn(nmToken,
|
|
||||||
SecurityUtil.buildTokenService(address));
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
|
||||||
ugi.addToken(token);
|
|
||||||
|
|
||||||
ContainerManagementProtocol proxy =
|
|
||||||
NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
|
|
||||||
YarnRPC.create(conf), address);
|
|
||||||
|
|
||||||
retryCount = 0;
|
retryCount = 0;
|
||||||
shouldThrowNMNotYetReadyException = false;
|
shouldThrowNMNotYetReadyException = false;
|
||||||
|
@ -156,4 +151,38 @@ public class TestNMProxy extends BaseContainerManagerTest {
|
||||||
proxy.startContainers(allRequests);
|
proxy.startContainers(allRequests);
|
||||||
Assert.assertEquals(5, retryCount);
|
Assert.assertEquals(5, retryCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000, expected = IOException.class)
|
||||||
|
public void testShouldNotRetryForeverForNonNetworkExceptionsOnNMConnections()
|
||||||
|
throws Exception {
|
||||||
|
conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, -1);
|
||||||
|
StartContainersRequest allRequests =
|
||||||
|
Records.newRecord(StartContainersRequest.class);
|
||||||
|
|
||||||
|
ContainerManagementProtocol proxy = getNMProxy();
|
||||||
|
|
||||||
|
shouldThrowNMNotYetReadyException = false;
|
||||||
|
retryCount = 0;
|
||||||
|
proxy.startContainers(allRequests);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerManagementProtocol getNMProxy() {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||||
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
|
||||||
|
org.apache.hadoop.yarn.api.records.Token nmToken =
|
||||||
|
context.getNMTokenSecretManager().createNMToken(attemptId,
|
||||||
|
context.getNodeId(), user);
|
||||||
|
final InetSocketAddress address =
|
||||||
|
conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
|
||||||
|
YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_PORT);
|
||||||
|
Token<NMTokenIdentifier> token =
|
||||||
|
ConverterUtils.convertFromYarn(nmToken,
|
||||||
|
SecurityUtil.buildTokenService(address));
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
||||||
|
ugi.addToken(token);
|
||||||
|
return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
|
||||||
|
YarnRPC.create(conf), address);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue