From 3b620d1eca91fdb8bdca9e01b3d1fd9bd6baab47 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Mon, 12 Sep 2011 07:13:46 +0000 Subject: [PATCH] Merge -r 1169620:1169621 from trunk to branch-0.23 to fix MAPREDUCE-2749. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1169623 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../yarn/server/nodemanager/NodeManager.java | 34 ++-- .../nodemanager/NodeStatusUpdaterImpl.java | 16 +- .../ContainerManagerImpl.java | 18 +- .../nodemanager/DummyContainerManager.java | 5 +- .../server/nodemanager/TestEventFlow.java | 6 +- .../nodemanager/TestNodeStatusUpdater.java | 158 +++++++++++++++--- .../BaseContainerManagerTest.java | 6 +- .../TestContainerManager.java | 5 +- .../hadoop/yarn/server/MiniYARNCluster.java | 6 +- .../TestContainerTokenSecretManager.java | 10 +- 11 files changed, 204 insertions(+), 63 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d84b29f2c72..7db5191df84 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1258,6 +1258,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2691. Finishing up the cleanup of distributed cache file resources and related tests. (Siddharth Seth via vinodkv) + MAPREDUCE-2749. Ensure NM registers with RM after starting all its services + correctly. (Thomas Graves via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index f9381e00fcc..b5249440c75 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.NodeHealthCheckerService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnException; @@ -44,21 +45,24 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; public class NodeManager extends CompositeService { private static final Log LOG = LogFactory.getLog(NodeManager.class); protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); + protected ContainerTokenSecretManager containerTokenSecretManager; public NodeManager() { super(NodeManager.class.getName()); } protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + ContainerTokenSecretManager containerTokenSecretManager) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics); + metrics, containerTokenSecretManager); } protected NodeResourceMonitor createNodeResourceMonitor() { @@ -67,9 +71,10 @@ public class NodeManager extends CompositeService { protected ContainerManagerImpl createContainerManager(Context context, ContainerExecutor exec, DeletionService del, - NodeStatusUpdater nodeStatusUpdater) { + NodeStatusUpdater nodeStatusUpdater, ContainerTokenSecretManager + containerTokenSecretManager) { return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, - metrics); + metrics, containerTokenSecretManager); } protected WebServer createWebServer(Context nmContext, @@ -87,6 +92,13 @@ public class NodeManager extends CompositeService { Context context = new NMContext(); + // Create the secretManager if need be. + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("Security is enabled on NodeManager. " + + "Creating ContainerTokenSecretManager"); + this.containerTokenSecretManager = new ContainerTokenSecretManager(); + } + ContainerExecutor exec = ReflectionUtils.newInstance( conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, DefaultContainerExecutor.class, ContainerExecutor.class), conf); @@ -102,18 +114,16 @@ public class NodeManager extends CompositeService { addService(healthChecker); } - // StatusUpdater should be added first so that it can start first. Once it - // contacts RM, does registration and gets tokens, then only - // ContainerManager can start. NodeStatusUpdater nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, healthChecker); - addService(nodeStatusUpdater); + createNodeStatusUpdater(context, dispatcher, healthChecker, + this.containerTokenSecretManager); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); ContainerManagerImpl containerManager = - createContainerManager(context, exec, del, nodeStatusUpdater); + createContainerManager(context, exec, del, nodeStatusUpdater, + this.containerTokenSecretManager); addService(containerManager); Service webServer = @@ -132,6 +142,10 @@ public class NodeManager extends CompositeService { DefaultMetricsSystem.initialize("NodeManager"); + // StatusUpdater should be added last so that it get started last + // so that we make sure everything is up before registering with RM. + addService(nodeStatusUpdater); + super.init(conf); // TODO add local dirs to del } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 40e0ee87a6d..641e74b8018 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.Records; @@ -68,6 +69,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final Context context; private final Dispatcher dispatcher; + private ContainerTokenSecretManager containerTokenSecretManager; private long heartBeatInterval; private ResourceTracker resourceTracker; private String rmAddress; @@ -85,12 +87,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final NodeManagerMetrics metrics; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + ContainerTokenSecretManager containerTokenSecretManager) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; + this.containerTokenSecretManager = containerTokenSecretManager; } @Override @@ -173,8 +177,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.secretKeyBytes = regResponse.getSecretKey().array(); } + // do this now so that its set before we start heartbeating to RM + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("Security enabled - updating secret keys now"); + // It is expected that status updater is started by this point and + // RM gives the shared secret in registration during StatusUpdater#start(). + this.containerTokenSecretManager.setSecretKey( + this.getContainerManagerBindAddress(), + this.getRMNMSharedSecret()); + } LOG.info("Registered with ResourceManager as " + this.containerManagerBindAddress + " with total resource of " + this.totalResource); + } @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e019ed3ffc7..f2d3a7cff51 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -116,7 +116,8 @@ public class ContainerManagerImpl extends CompositeService implements public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, - NodeManagerMetrics metrics) { + NodeManagerMetrics metrics, ContainerTokenSecretManager + containerTokenSecretManager) { super(ContainerManagerImpl.class.getName()); this.context = context; dispatcher = new AsyncDispatcher(); @@ -131,12 +132,7 @@ public class ContainerManagerImpl extends CompositeService implements addService(containersLauncher); this.nodeStatusUpdater = nodeStatusUpdater; - // Create the secretManager if need be. - if (UserGroupInformation.isSecurityEnabled()) { - LOG.info("Security is enabled on NodeManager. " - + "Creating ContainerTokenSecretManager"); - this.containerTokenSecretManager = new ContainerTokenSecretManager(); - } + this.containerTokenSecretManager = containerTokenSecretManager; // Start configurable services auxiluaryServices = new AuxServices(); @@ -196,14 +192,6 @@ public class ContainerManagerImpl extends CompositeService implements // Enqueue user dirs in deletion context YarnRPC rpc = YarnRPC.create(getConfig()); - if (UserGroupInformation.isSecurityEnabled()) { - // This is fine as status updater is started before ContainerManager and - // RM gives the shared secret in registration during StatusUpdter#start() - // itself. - this.containerTokenSecretManager.setSecretKey( - this.nodeStatusUpdater.getContainerManagerBindAddress(), - this.nodeStatusUpdater.getRMNMSharedSecret()); - } Configuration cmConf = new Configuration(getConfig()); cmConf.setClass(YarnConfiguration.YARN_SECURITY_INFO, ContainerManagerSecurityInfo.class, SecurityInfo.class); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 292dba61a42..c4db3b6af93 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -59,8 +60,8 @@ public class DummyContainerManager extends ContainerManagerImpl { public DummyContainerManager(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, - NodeManagerMetrics metrics) { - super(context, exec, deletionContext, nodeStatusUpdater, metrics); + NodeManagerMetrics metrics, ContainerTokenSecretManager containerTokenSecretManager) { + super(context, exec, deletionContext, nodeStatusUpdater, metrics, containerTokenSecretManager); } @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index 1332c0ee3fe..63848f5980d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; @@ -84,8 +85,9 @@ public class TestEventFlow { Dispatcher dispatcher = new AsyncDispatcher(); NodeHealthCheckerService healthChecker = null; NodeManagerMetrics metrics = NodeManagerMetrics.create(); + ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager(); NodeStatusUpdater nodeStatusUpdater = - new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { + new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics, containerTokenSecretManager) { @Override protected ResourceTracker getRMClient() { return new LocalRMInterface(); @@ -98,7 +100,7 @@ public class TestEventFlow { }; DummyContainerManager containerManager = - new DummyContainerManager(context, exec, del, nodeStatusUpdater, metrics); + new DummyContainerManager(context, exec, del, nodeStatusUpdater, metrics, containerTokenSecretManager); containerManager.init(conf); containerManager.start(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index a3042e611ff..9ff888e6142 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.NodeHealthCheckerService; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -52,9 +54,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; +import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service.STATE; import org.junit.After; import org.junit.Assert; @@ -63,24 +68,38 @@ import org.junit.Test; public class TestNodeStatusUpdater { + // temp fix until metrics system can auto-detect itself running in unit test: + static { + DefaultMetricsSystem.setMiniClusterMode(true); + } + static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class); static final Path basedir = new Path("target", TestNodeStatusUpdater.class.getName()); - private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); int heartBeatID = 0; volatile Error nmStartError = null; + private final List registeredNodes = new ArrayList(); + + @After + public void tearDown() { + this.registeredNodes.clear(); + DefaultMetricsSystem.shutdown(); + } private class MyResourceTracker implements ResourceTracker { - private Context context; + private final Context context; public MyResourceTracker(Context context) { this.context = context; } @Override - public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException { + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnRemoteException { NodeId nodeId = request.getNodeId(); Resource resource = request.getResource(); LOG.info("Registering " + nodeId.toString()); @@ -91,17 +110,24 @@ public class TestNodeStatusUpdater { Assert.fail(e.getMessage()); } Assert.assertEquals(5 * 1024, resource.getMemory()); - RegistrationResponse regResponse = recordFactory.newRecordInstance(RegistrationResponse.class); - - RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); + registeredNodes.add(nodeId); + RegistrationResponse regResponse = recordFactory + .newRecordInstance(RegistrationResponse.class); + + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); response.setRegistrationResponse(regResponse); return response; } - ApplicationId applicationID = recordFactory.newRecordInstance(ApplicationId.class); - ApplicationAttemptId appAttemptID = recordFactory.newRecordInstance(ApplicationAttemptId.class); - ContainerId firstContainerID = recordFactory.newRecordInstance(ContainerId.class); - ContainerId secondContainerID = recordFactory.newRecordInstance(ContainerId.class); + ApplicationId applicationID = recordFactory + .newRecordInstance(ApplicationId.class); + ApplicationAttemptId appAttemptID = recordFactory + .newRecordInstance(ApplicationAttemptId.class); + ContainerId firstContainerID = recordFactory + .newRecordInstance(ContainerId.class); + ContainerId secondContainerID = recordFactory + .newRecordInstance(ContainerId.class); private Map> getAppToContainerStatusMap( List containers) { @@ -118,8 +144,10 @@ public class TestNodeStatusUpdater { } return map; } + @Override - public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException { + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnRemoteException { NodeStatus nodeStatus = request.getNodeStatus(); LOG.info("Got heartbeat number " + heartBeatID); nodeStatus.setResponseId(heartBeatID++); @@ -134,7 +162,8 @@ public class TestNodeStatusUpdater { firstContainerID.setAppId(applicationID); firstContainerID.setAppAttemptId(appAttemptID); firstContainerID.setId(heartBeatID); - ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); + ContainerLaunchContext launchContext = recordFactory + .newRecordInstance(ContainerLaunchContext.class); launchContext.setContainerId(firstContainerID); launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); launchContext.getResource().setMemory(2); @@ -158,7 +187,8 @@ public class TestNodeStatusUpdater { secondContainerID.setAppId(applicationID); secondContainerID.setAppAttemptId(appAttemptID); secondContainerID.setId(heartBeatID); - ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); + ContainerLaunchContext launchContext = recordFactory + .newRecordInstance(ContainerLaunchContext.class); launchContext.setContainerId(secondContainerID); launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); launchContext.getResource().setMemory(3); @@ -176,10 +206,12 @@ public class TestNodeStatusUpdater { this.context.getContainers(); Assert.assertEquals(2, activeContainers.size()); } - HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class); + HeartbeatResponse response = recordFactory + .newRecordInstance(HeartbeatResponse.class); response.setResponseId(heartBeatID); - NodeHeartbeatResponse nhResponse = recordFactory.newRecordInstance(NodeHeartbeatResponse.class); + NodeHeartbeatResponse nhResponse = recordFactory + .newRecordInstance(NodeHeartbeatResponse.class); nhResponse.setHeartbeatResponse(response); return nhResponse; } @@ -189,8 +221,10 @@ public class TestNodeStatusUpdater { private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + ContainerTokenSecretManager containerTokenSecretManager) { + super(context, dispatcher, healthChecker, metrics, + containerTokenSecretManager); this.context = context; } @@ -216,21 +250,23 @@ public class TestNodeStatusUpdater { final NodeManager nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + ContainerTokenSecretManager containerTokenSecretManager) { return new MyNodeStatusUpdater(context, dispatcher, healthChecker, - metrics); + metrics, containerTokenSecretManager); } }; - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.NM_VMEM_GB, 5); // 5GB - conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345"); - conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346"); - conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri().getPath()); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir, "remotelogs") - .toUri().getPath()); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0").toUri().getPath()); + YarnConfiguration conf = createNMConfig(); nm.init(conf); + + // verify that the last service is the nodeStatusUpdater (ie registration + // with RM) + Object[] services = nm.getServices().toArray(); + Object lastService = services[services.length-1]; + Assert.assertTrue("last service is NOT the node status updater", + lastService instanceof NodeStatusUpdater); + new Thread() { public void run() { try { @@ -260,7 +296,75 @@ public class TestNodeStatusUpdater { while (heartBeatID <= 3) { Thread.sleep(500); } + Assert.assertEquals("Number of registered NMs is wrong!!", 1, + this.registeredNodes.size()); nm.stop(); } + + /** + * Verifies that if for some reason NM fails to start ContainerManager RPC + * server, RM is oblivious to NM's presence. The behaviour is like this + * because otherwise, NM will report to RM even if all its servers are not + * started properly, RM will think that the NM is alive and will retire the NM + * only after NM_EXPIRY interval. See MAPREDUCE-2749. + */ + @Test + public void testNoRegistrationWhenNMServicesFail() { + + final NodeManager nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + ContainerTokenSecretManager containerTokenSecretManager) { + return new MyNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, containerTokenSecretManager); + } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, + ContainerTokenSecretManager containerTokenSecretManager) { + return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, + metrics, containerTokenSecretManager) { + @Override + public void start() { + // Simulating failure of starting RPC server + throw new YarnException("Starting of RPC Server failed"); + } + }; + } + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + try { + nm.start(); + Assert.fail("NM should have failed to start. Didn't get exception!!"); + } catch (Exception e) { + Assert.assertEquals("Starting of RPC Server failed", e.getCause() + .getMessage()); + } + + Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm + .getServiceState()); + + Assert.assertEquals("Number of registered nodes is wrong!", 0, + this.registeredNodes.size()); + } + + private YarnConfiguration createNMConfig() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.NM_VMEM_GB, 5); // 5GB + conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345"); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346"); + conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri() + .getPath()); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir, + "remotelogs").toUri().getPath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0") + .toUri().getPath()); + return conf; + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index d77512c89aa..2a366ec2fc2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -67,6 +68,7 @@ public abstract class BaseContainerManagerTest { protected static File localLogDir; protected static File remoteLogDir; protected static File tmpDir; + protected ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager(); protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); @@ -94,7 +96,7 @@ public abstract class BaseContainerManagerTest { protected String user = "nobody"; protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( - context, new AsyncDispatcher(), null, metrics) { + context, new AsyncDispatcher(), null, metrics, this.containerTokenSecretManager) { @Override protected ResourceTracker getRMClient() { return new LocalRMInterface(); @@ -147,7 +149,7 @@ public abstract class BaseContainerManagerTest { exec = createContainerExecutor(); containerManager = new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, - metrics); + metrics, this.containerTokenSecretManager); containerManager.init(conf); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 702100386b7..97890819c8b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -281,8 +282,10 @@ public class TestContainerManager extends BaseContainerManagerTest { // Real del service delSrvc = new DeletionService(exec); delSrvc.init(conf); + ContainerTokenSecretManager containerTokenSecretManager = new + ContainerTokenSecretManager(); containerManager = new ContainerManagerImpl(context, exec, delSrvc, - nodeStatusUpdater, metrics); + nodeStatusUpdater, metrics, containerTokenSecretManager); containerManager.init(conf); containerManager.start(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 0dcc5629b7c..ead8675fe39 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; @@ -177,9 +178,10 @@ public class MiniYARNCluster extends CompositeService { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + ContainerTokenSecretManager containerTokenSecretManager) { return new NodeStatusUpdaterImpl(context, dispatcher, - healthChecker, metrics) { + healthChecker, metrics, containerTokenSecretManager) { @Override protected ResourceTracker getRMClient() { final ResourceTrackerService rt = resourceManager diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java index fdc1c7bd880..66a582435ab 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.BeforeClass; +import org.junit.AfterClass; import org.junit.Test; public class TestContainerTokenSecretManager { @@ -94,6 +95,7 @@ public class TestContainerTokenSecretManager { private static final File localDir = new File("target", TestContainerTokenSecretManager.class.getName() + "-localDir") .getAbsoluteFile(); + private static MiniYARNCluster yarnCluster; @BeforeClass public static void setup() throws AccessControlException, @@ -103,6 +105,12 @@ public class TestContainerTokenSecretManager { localDir.mkdir(); } + @AfterClass + public static void teardown() { + yarnCluster.stop(); + } + + @Test public void test() throws IOException, InterruptedException { @@ -116,7 +124,7 @@ public class TestContainerTokenSecretManager { // Set AM expiry interval to be very long. conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L); UserGroupInformation.setConfiguration(conf); - MiniYARNCluster yarnCluster = + yarnCluster = new MiniYARNCluster(TestContainerTokenSecretManager.class.getName()); yarnCluster.init(conf); yarnCluster.start();