YARN-3839. Quit throwing NMNotYetReadyException. Contributed by Manikandan R

This commit is contained in:
Jason Lowe 2017-05-08 17:14:37 -05:00
parent cef2815cf4
commit 424887ecb7
14 changed files with 21 additions and 273 deletions

View File

@ -24,10 +24,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
@ -101,9 +100,6 @@ public interface ContainerManagementProtocol {
* a allServicesMetaData map.
* @throws YarnException
* @throws IOException
* @throws NMNotYetReadyException
* This exception is thrown when NM starts from scratch but has not
* yet connected with RM.
*/
@Public
@Stable

View File

@ -80,6 +80,11 @@ public class ServerProxy {
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
/*
* Still keeping this to cover case like newer client talking
* to an older version of server
*/
exceptionToPolicyMap.put(NMNotYetReadyException.class, retryPolicy);
return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,

View File

@ -460,8 +460,6 @@ public class NodeManager extends CompositeService
@Override
public void run() {
try {
LOG.info("Notifying ContainerManager to block new container-requests");
containerManager.setBlockNewContainerRequests(true);
if (!rmWorkPreservingRestartEnabled) {
LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync();

View File

@ -428,8 +428,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
.verifyRMRegistrationResponseForNodeLabels(regNMResponse));
LOG.info(successfullRegistrationMsg);
LOG.info("Notifying ContainerManager to unblock new container-requests");
this.context.getContainerManager().setBlockNewContainerRequests(false);
}
private List<ApplicationId> createKeepAliveApplicationList() {

View File

@ -42,8 +42,6 @@ public interface ContainerManager extends ServiceStateChangeListener,
void updateQueuingLimit(ContainerQueuingLimit queuingLimit);
void setBlockNewContainerRequests(boolean blockNewContainerRequests);
ContainerScheduler getContainerScheduler();
}

View File

@ -166,7 +166,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -204,7 +203,6 @@ public class ContainerManagerImpl extends CompositeService implements
protected final AsyncDispatcher dispatcher;
private final DeletionService deletionService;
private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false);
private boolean serviceStopped = false;
private final ReadLock readLock;
private final WriteLock writeLock;
@ -550,10 +548,6 @@ public class ContainerManagerImpl extends CompositeService implements
refreshServiceAcls(conf, new NMPolicyProvider());
}
LOG.info("Blocking new container-requests as container manager rpc" +
" server is still starting.");
this.setBlockNewContainerRequests(true);
String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);
String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);
String hostOverride = null;
@ -617,7 +611,6 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public void serviceStop() throws Exception {
setBlockNewContainerRequests(true);
this.writeLock.lock();
try {
serviceStopped = true;
@ -852,11 +845,6 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public StartContainersResponse startContainers(
StartContainersRequest requests) throws YarnException, IOException {
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
"Rejecting new containers as NodeManager has not"
+ " yet connected with ResourceManager");
}
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi, nmTokenIdentifier);
@ -1113,11 +1101,6 @@ public class ContainerManagerImpl extends CompositeService implements
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest requests)
throws YarnException, IOException {
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
"Rejecting container resource increase as NodeManager has not"
+ " yet connected with ResourceManager");
}
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi, nmTokenIdentifier);
@ -1559,17 +1542,6 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
@Override
public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
this.blockNewContainerRequests.set(blockNewContainerRequests);
}
@Private
@VisibleForTesting
public boolean getBlockNewContainerRequestsStatus() {
return this.blockNewContainerRequests.get();
}
@Override
public void stateChanged(Service service) {
// TODO Auto-generated method stub

View File

@ -190,11 +190,6 @@ public class DummyContainerManager extends ContainerManagerImpl {
};
}
@Override
public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected void authorizeStartAndResourceIncreaseRequest(
NMTokenIdentifier nmTokenIdentifier,

View File

@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -87,7 +86,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -188,34 +186,6 @@ public class TestNodeManagerResync {
}
}
// This test tests new container requests are blocked when NM starts from
// scratch until it register with RM AND while NM is resyncing with RM
@SuppressWarnings("unchecked")
@Test(timeout=60000)
public void testBlockNewContainerRequestsOnStartAndResync()
throws IOException, InterruptedException, YarnException {
NodeManager nm = new TestNodeManager2();
int port = ServerSocketUtil.getPort(49154, 10);
YarnConfiguration conf = createNMConfig(port);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
nm.init(conf);
nm.start();
// Start the container in running state
ContainerId cId = TestNodeManagerShutdown.createContainerId();
TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
processStartFile, port);
nm.getNMDispatcher().getEventHandler()
.handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
}
Assert.assertFalse(assertionFailedInThread.get());
nm.stop();
}
@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testNMshutdownWhenResyncThrowException() throws IOException,
@ -493,135 +463,6 @@ public class TestNodeManagerResync {
}
}
class TestNodeManager2 extends NodeManager {
Thread launchContainersThread = null;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new TestNodeStatusUpdaterImpl2(context, dispatcher,
healthChecker, metrics);
}
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler){
@Override
public void setBlockNewContainerRequests(
boolean blockNewContainerRequests) {
if (blockNewContainerRequests) {
// start test thread right after blockNewContainerRequests is set
// true
super.setBlockNewContainerRequests(blockNewContainerRequests);
launchContainersThread = new RejectedContainersLauncherThread();
launchContainersThread.start();
} else {
// join the test thread right before blockNewContainerRequests is
// reset
try {
// stop the test thread
((RejectedContainersLauncherThread) launchContainersThread)
.setStopThreadFlag(true);
launchContainersThread.join();
((RejectedContainersLauncherThread) launchContainersThread)
.setStopThreadFlag(false);
super.setBlockNewContainerRequests(blockNewContainerRequests);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
}
class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater {
public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
}
@Override
protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers =
getNMContext().getContainers();
try {
// ensure that containers are empty before restart nodeStatusUpdater
if (!containers.isEmpty()) {
for (Container container: containers.values()) {
Assert.assertEquals(ContainerState.COMPLETE,
container.cloneAndGetContainerStatus().getState());
}
}
super.rebootNodeStatusUpdaterAndRegisterWithRM();
// After this point new containers are free to be launched, except
// containers from previous RM
// Wait here so as to sync with the main test thread.
syncBarrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
} catch (AssertionError ae) {
ae.printStackTrace();
assertionFailedInThread.set(true);
}
}
}
class RejectedContainersLauncherThread extends Thread {
boolean isStopped = false;
public void setStopThreadFlag(boolean isStopped) {
this.isStopped = isStopped;
}
@Override
public void run() {
int numContainers = 0;
int numContainersRejected = 0;
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
try {
while (!isStopped && numContainers < 10) {
StartContainerRequest scRequest =
StartContainerRequest.newInstance(containerLaunchContext,
null);
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
System.out.println("no. of containers to be launched: "
+ numContainers);
numContainers++;
try {
getContainerManager().startContainers(allRequests);
} catch (YarnException e) {
numContainersRejected++;
Assert.assertTrue(e.getMessage().contains(
"Rejecting new containers as NodeManager has not" +
" yet connected with ResourceManager"));
Assert.assertEquals(NMNotYetReadyException.class.getName(), e
.getClass().getName());
} catch (IOException e) {
e.printStackTrace();
assertionFailedInThread.set(true);
}
}
// no. of containers to be launched should equal to no. of
// containers rejected
Assert.assertEquals(numContainers, numContainersRejected);
} catch (AssertionError ae) {
assertionFailedInThread.set(true);
}
}
}
}
class TestNodeManager3 extends NodeManager {
private int registrationCount = 0;
@ -681,11 +522,6 @@ public class TestNodeManagerResync {
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler){
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected void authorizeGetAndStopContainerRequest(

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import static org.mockito.Mockito.spy;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -28,8 +30,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
@ -82,10 +83,9 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerIn
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import static org.mockito.Mockito.spy;
public abstract class BaseContainerManagerTest {
protected static RecordFactory recordFactory = RecordFactoryProvider
@ -214,11 +214,6 @@ public abstract class BaseContainerManagerTest {
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, dirsHandler) {
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected void authorizeGetAndStopContainerRequest(ContainerId containerId,

View File

@ -127,11 +127,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
createContainerManager(DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, dirsHandler) {
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected UserGroupInformation getRemoteUgi() throws YarnException {

View File

@ -545,11 +545,6 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
return new ContainerManagerImpl(context, exec, delSrvc,
mock(NodeStatusUpdater.class), metrics, dirsHandler) {
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected void authorizeGetAndStopContainerRequest(
ContainerId containerId, Container container,
boolean stopRequest, NMTokenIdentifier identifier)
@ -756,12 +751,6 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
return launcher;
}
@Override
public void setBlockNewContainerRequests(
boolean blockNewContainerRequests) {
// do nothing
}
@Override
public NMTimelinePublisher
createNMTimelinePublisher(Context context) {

View File

@ -56,12 +56,10 @@ public class TestNMProxy extends BaseContainerManagerTest {
}
int retryCount = 0;
boolean shouldThrowNMNotYetReadyException = false;
@Before
public void setUp() throws Exception {
containerManager.start();
containerManager.setBlockNewContainerRequests(false);
}
@Override
@ -75,21 +73,13 @@ public class TestNMProxy extends BaseContainerManagerTest {
StartContainersRequest requests) throws YarnException, IOException {
if (retryCount < 5) {
retryCount++;
if (shouldThrowNMNotYetReadyException) {
// This causes super to throw an NMNotYetReadyException
containerManager.setBlockNewContainerRequests(true);
if (isRetryPolicyRetryForEver()) {
// Throw non network exception
throw new IOException(
new UnreliableInterface.UnreliableException());
} else {
if (isRetryPolicyRetryForEver()) {
// Throw non network exception
throw new IOException(
new UnreliableInterface.UnreliableException());
} else {
throw new java.net.ConnectException("start container exception");
}
throw new java.net.ConnectException("start container exception");
}
} else {
// This stops super from throwing an NMNotYetReadyException
containerManager.setBlockNewContainerRequests(false);
}
return super.startContainers(requests);
}
@ -131,26 +121,17 @@ public class TestNMProxy extends BaseContainerManagerTest {
ContainerManagementProtocol proxy = getNMProxy(conf);
retryCount = 0;
shouldThrowNMNotYetReadyException = false;
proxy.startContainers(allRequests);
Assert.assertEquals(5, retryCount);
retryCount = 0;
shouldThrowNMNotYetReadyException = false;
proxy.stopContainers(Records.newRecord(StopContainersRequest.class));
Assert.assertEquals(5, retryCount);
retryCount = 0;
shouldThrowNMNotYetReadyException = false;
proxy.getContainerStatuses(Records
.newRecord(GetContainerStatusesRequest.class));
Assert.assertEquals(5, retryCount);
retryCount = 0;
shouldThrowNMNotYetReadyException = true;
proxy.startContainers(allRequests);
Assert.assertEquals(5, retryCount);
}
@Test(timeout = 20000, expected = IOException.class)
@ -162,7 +143,6 @@ public class TestNMProxy extends BaseContainerManagerTest {
ContainerManagementProtocol proxy = getNMProxy(conf);
shouldThrowNMNotYetReadyException = false;
retryCount = 0;
proxy.startContainers(allRequests);
}

View File

@ -77,11 +77,6 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc,
nodeStatusUpdater, metrics, dirsHandler) {
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected UserGroupInformation getRemoteUgi() throws YarnException {

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -178,7 +177,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
NodeManager nm = yarnCluster.getNodeManager(0);
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM, nm);
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM);
// Both id should be equal.
Assert.assertEquals(nmTokenSecretManagerNM.getCurrentKey().getKeyId(),
@ -412,13 +411,10 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
}
protected void waitForNMToReceiveNMTokenKey(
NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm)
NMTokenSecretManagerInNM nmTokenSecretManagerNM)
throws InterruptedException {
int attempt = 60;
ContainerManagerImpl cm =
((ContainerManagerImpl) nm.getNMContext().getContainerManager());
while ((cm.getBlockNewContainerRequestsStatus() || nmTokenSecretManagerNM
.getNodeId() == null) && attempt-- > 0) {
while (nmTokenSecretManagerNM.getNodeId() == null && attempt-- > 0) {
Thread.sleep(2000);
}
}
@ -627,7 +623,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
nm.getNMContext().getNMTokenSecretManager();
String user = "test";
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM);
NodeId nodeId = nm.getNMContext().getNodeId();
@ -722,7 +718,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
nm.getNMContext().getNMTokenSecretManager();
String user = "test";
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM);
NodeId nodeId = nm.getNMContext().getNodeId();