diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 74e5c74f17a..e56a45cc79a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -505,6 +505,10 @@ Release 2.4.0 - UNRELEASED manner and thus fix failure of TestResourceTrackerService. (Tsuyoshi Ozawa via vinodkv) + YARN-1839. Fixed handling of NMTokens in ResourceManager such that containers + launched by AMs running on the same machine as the AM are correctly + propagated. (Jian He via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 8c90d398389..1eebaaca713 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -294,13 +294,9 @@ public class AMRMClientImpl extends AMRMClient { for (NMToken token : nmTokens) { String nodeId = token.getNodeId().toString(); if (getNMTokenCache().containsToken(nodeId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Replacing token for : " + nodeId); - } + LOG.info("Replacing token for : " + nodeId); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Received new token for : " + nodeId); - } + LOG.info("Received new token for : " + nodeId); } getNMTokenCache().setToken(nodeId, token.getToken()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 3c3f2bb734b..697a18099b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -831,9 +831,18 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.retryFetchingAMContainer(appAttempt); return RMAppAttemptState.SCHEDULED; } + // Set the masterContainer appAttempt.setMasterContainer(amContainerAllocation.getContainers() .get(0)); + // The node set in NMTokenSecrentManager is used for marking whether the + // NMToken has been issued for this node to the AM. + // When AM container was allocated to RM itself, the node which allocates + // this AM container was marked as the NMToken already sent. Thus, + // clear this node set so that the following allocate requests from AM are + // able to retrieve the corresponding NMToken. + appAttempt.rmContext.getNMTokenSecretManager() + .clearNodeSetForAttempt(appAttempt.applicationAttemptId); appAttempt.getSubmissionContext().setResource( appAttempt.getMasterContainer().getResource()); appAttempt.storeAttempt(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6c392b5a728..30b2fd6fb30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -902,7 +902,8 @@ public class CapacityScheduler extends AbstractYarnScheduler } @Lock(Lock.NoLock.class) - FiCaSchedulerApp getApplicationAttempt( + @VisibleForTesting + public FiCaSchedulerApp getApplicationAttempt( ApplicationAttemptId applicationAttemptId) { SchedulerApplication app = applications.get(applicationAttemptId.getApplicationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index 9ec7b690b57..b068a60e0e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -138,6 +138,19 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager { } } + public void clearNodeSetForAttempt(ApplicationAttemptId attemptId) { + super.writeLock.lock(); + try { + HashSet nodeSet = this.appAttemptToNodeKeyMap.get(attemptId); + if (nodeSet != null) { + LOG.info("Clear node set for " + attemptId); + nodeSet.clear(); + } + } finally { + super.writeLock.unlock(); + } + } + private void clearApplicationNMTokenKeys() { // We should clear all node entries from this set. // TODO : Once we have per node master key then it will change to only @@ -184,22 +197,13 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager { NMToken nmToken = null; if (nodeSet != null) { if (!nodeSet.contains(container.getNodeId())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending NMToken for nodeId : " - + container.getNodeId().toString() - + " for application attempt : " + appAttemptId.toString()); - } + LOG.info("Sending NMToken for nodeId : " + container.getNodeId() + + " for container : " + container.getId()); Token token = createNMToken(container.getId().getApplicationAttemptId(), container.getNodeId(), applicationSubmitter); nmToken = NMToken.newInstance(container.getNodeId(), token); - // The node set here is used for differentiating whether the NMToken - // has been issued for this node from the client's perspective. If - // this is an AM container, the NMToken is issued only for RM and so - // we should not update the node set. - if (container.getId().getId() != 1) { - nodeSet.add(container.getNodeId()); - } + nodeSet.add(container.getNodeId()); } } return nmToken; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index ec63c43fee4..2e9c57e203f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -171,7 +172,60 @@ public class TestRM { rm.stop(); } - + + // Test even if AM container is allocated with containerId not equal to 1, the + // following allocate requests from AM should be able to retrieve the + // corresponding NM Token. + @Test (timeout = 20000) + public void testNMTokenSentForNormalContainer() throws Exception { + + MockRM rm = new MockRM(); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 5120); + RMApp app = rm.submitApp(2000); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + + // Call getNewContainerId to increase container Id so that the AM container + // Id doesn't equal to one. + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + cs.getApplicationAttempt(attempt.getAppAttemptId()).getNewContainerId(); + + // kick the scheduling + nm1.nodeHeartbeat(true); + MockAM am = MockRM.launchAM(app, rm, nm1); + // am container Id not equal to 1. + Assert.assertTrue(attempt.getMasterContainer().getId().getId() != 1); + // NMSecretManager doesn't record the node on which the am is allocated. + Assert.assertFalse(rm.getRMNMTokenSecretManager() + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm1.getNodeId())); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + + int NUM_CONTAINERS = 1; + List containers = new ArrayList(); + // nmTokens keeps track of all the nmTokens issued in the allocate call. + List expectedNMTokens = new ArrayList(); + + // am1 allocate 1 container on nm1. + while (true) { + AllocateResponse response = + am.allocate("127.0.0.1", 2000, NUM_CONTAINERS, + new ArrayList()); + nm1.nodeHeartbeat(true); + containers.addAll(response.getAllocatedContainers()); + expectedNMTokens.addAll(response.getNMTokens()); + if (containers.size() == NUM_CONTAINERS) { + break; + } + Thread.sleep(200); + System.out.println("Waiting for container to be allocated."); + } + NodeId nodeId = expectedNMTokens.get(0).getNodeId(); + // NMToken is sent for the allocated container. + Assert.assertEquals(nm1.getNodeId(), nodeId); + } + @Test (timeout = 40000) public void testNMToken() throws Exception { MockRM rm = new MockRM(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index dd57cf4ace0..7868fa04db1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -133,6 +133,8 @@ public class TestRMAppAttemptTransitions { private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf)); private ClientToAMTokenSecretManagerInRM clientToAMTokenManager = spy(new ClientToAMTokenSecretManagerInRM()); + private NMTokenSecretManagerInRM nmTokenManager = + spy(new NMTokenSecretManagerInRM(conf)); private boolean transferStateFromPreviousAttempt = false; private final class TestApplicationAttemptEventDispatcher implements @@ -224,7 +226,7 @@ public class TestRMAppAttemptTransitions { containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, amRMTokenManager, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), + nmTokenManager, clientToAMTokenManager, writer); @@ -443,6 +445,8 @@ public class TestRMAppAttemptTransitions { any( ApplicationAttemptId.class), any(List.class), any(List.class), any(List.class), any(List.class)); + verify(nmTokenManager).clearNodeSetForAttempt( + applicationAttempt.getAppAttemptId()); } /**