YARN-1839. Fixed handling of NMTokens in ResourceManager such that containers launched by AMs running on the same machine as the AM are correctly propagated. Contributed by Jian He.

svn merge --ignore-ancestry -c 1578631 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1578632 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-17 22:22:58 +00:00
parent 597c7d054f
commit 80f3726024
7 changed files with 93 additions and 21 deletions

View File

@ -490,6 +490,10 @@ Release 2.4.0 - UNRELEASED
manner and thus fix failure of TestResourceTrackerService. (Tsuyoshi Ozawa
via vinodkv)
YARN-1839. Fixed handling of NMTokens in ResourceManager such that containers
launched by AMs running on the same machine as the AM are correctly
propagated. (Jian He via vinodkv)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -294,13 +294,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
for (NMToken token : nmTokens) {
String nodeId = token.getNodeId().toString();
if (getNMTokenCache().containsToken(nodeId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Replacing token for : " + nodeId);
}
LOG.info("Replacing token for : " + nodeId);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Received new token for : " + nodeId);
}
LOG.info("Received new token for : " + nodeId);
}
getNMTokenCache().setToken(nodeId, token.getToken());
}

View File

@ -831,9 +831,18 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.retryFetchingAMContainer(appAttempt);
return RMAppAttemptState.SCHEDULED;
}
// Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers()
.get(0));
// The node set in NMTokenSecrentManager is used for marking whether the
// NMToken has been issued for this node to the AM.
// When AM container was allocated to RM itself, the node which allocates
// this AM container was marked as the NMToken already sent. Thus,
// clear this node set so that the following allocate requests from AM are
// able to retrieve the corresponding NMToken.
appAttempt.rmContext.getNMTokenSecretManager()
.clearNodeSetForAttempt(appAttempt.applicationAttemptId);
appAttempt.getSubmissionContext().setResource(
appAttempt.getMasterContainer().getResource());
appAttempt.storeAttempt();

View File

@ -902,7 +902,8 @@ public class CapacityScheduler extends AbstractYarnScheduler
}
@Lock(Lock.NoLock.class)
FiCaSchedulerApp getApplicationAttempt(
@VisibleForTesting
public FiCaSchedulerApp getApplicationAttempt(
ApplicationAttemptId applicationAttemptId) {
SchedulerApplication app =
applications.get(applicationAttemptId.getApplicationId());

View File

@ -138,6 +138,19 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
}
}
public void clearNodeSetForAttempt(ApplicationAttemptId attemptId) {
super.writeLock.lock();
try {
HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(attemptId);
if (nodeSet != null) {
LOG.info("Clear node set for " + attemptId);
nodeSet.clear();
}
} finally {
super.writeLock.unlock();
}
}
private void clearApplicationNMTokenKeys() {
// We should clear all node entries from this set.
// TODO : Once we have per node master key then it will change to only
@ -184,22 +197,13 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
NMToken nmToken = null;
if (nodeSet != null) {
if (!nodeSet.contains(container.getNodeId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending NMToken for nodeId : "
+ container.getNodeId().toString()
+ " for application attempt : " + appAttemptId.toString());
}
LOG.info("Sending NMToken for nodeId : " + container.getNodeId()
+ " for container : " + container.getId());
Token token =
createNMToken(container.getId().getApplicationAttemptId(),
container.getNodeId(), applicationSubmitter);
nmToken = NMToken.newInstance(container.getNodeId(), token);
// The node set here is used for differentiating whether the NMToken
// has been issued for this node from the client's perspective. If
// this is an AM container, the NMToken is issued only for RM and so
// we should not update the node set.
if (container.getId().getId() != 1) {
nodeSet.add(container.getNodeId());
}
nodeSet.add(container.getNodeId());
}
}
return nmToken;

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@ -171,7 +172,60 @@ public class TestRM {
rm.stop();
}
// Test even if AM container is allocated with containerId not equal to 1, the
// following allocate requests from AM should be able to retrieve the
// corresponding NM Token.
@Test (timeout = 20000)
public void testNMTokenSentForNormalContainer() throws Exception {
MockRM rm = new MockRM();
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
RMApp app = rm.submitApp(2000);
RMAppAttempt attempt = app.getCurrentAppAttempt();
// Call getNewContainerId to increase container Id so that the AM container
// Id doesn't equal to one.
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
cs.getApplicationAttempt(attempt.getAppAttemptId()).getNewContainerId();
// kick the scheduling
nm1.nodeHeartbeat(true);
MockAM am = MockRM.launchAM(app, rm, nm1);
// am container Id not equal to 1.
Assert.assertTrue(attempt.getMasterContainer().getId().getId() != 1);
// NMSecretManager doesn't record the node on which the am is allocated.
Assert.assertFalse(rm.getRMNMTokenSecretManager()
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm1.getNodeId()));
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
int NUM_CONTAINERS = 1;
List<Container> containers = new ArrayList<Container>();
// nmTokens keeps track of all the nmTokens issued in the allocate call.
List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
// am1 allocate 1 container on nm1.
while (true) {
AllocateResponse response =
am.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
containers.addAll(response.getAllocatedContainers());
expectedNMTokens.addAll(response.getNMTokens());
if (containers.size() == NUM_CONTAINERS) {
break;
}
Thread.sleep(200);
System.out.println("Waiting for container to be allocated.");
}
NodeId nodeId = expectedNMTokens.get(0).getNodeId();
// NMToken is sent for the allocated container.
Assert.assertEquals(nm1.getNodeId(), nodeId);
}
@Test (timeout = 40000)
public void testNMToken() throws Exception {
MockRM rm = new MockRM();

View File

@ -133,6 +133,8 @@ public class TestRMAppAttemptTransitions {
private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
spy(new ClientToAMTokenSecretManagerInRM());
private NMTokenSecretManagerInRM nmTokenManager =
spy(new NMTokenSecretManagerInRM(conf));
private boolean transferStateFromPreviousAttempt = false;
private final class TestApplicationAttemptEventDispatcher implements
@ -224,7 +226,7 @@ public class TestRMAppAttemptTransitions {
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, amRMTokenManager,
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
nmTokenManager,
clientToAMTokenManager,
writer);
@ -443,6 +445,8 @@ public class TestRMAppAttemptTransitions {
any(
ApplicationAttemptId.class), any(List.class), any(List.class),
any(List.class), any(List.class));
verify(nmTokenManager).clearNodeSetForAttempt(
applicationAttempt.getAppAttemptId());
}
/**