Merge r1595116 from trunk. YARN-2053. Fixed a bug in AMS to not add null NMToken into NMTokens list from previous attempts for work-preserving AM restart. Contributed by Wangda Tan

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1595117 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-05-16 06:24:10 +00:00
parent c8ba7039b7
commit f442377539
3 changed files with 22 additions and 11 deletions

View File

@ -70,6 +70,9 @@ Release 2.5.0 - UNRELEASED
YARN-1981. Nodemanager version is not updated when a node reconnects (Jason YARN-1981. Nodemanager version is not updated when a node reconnects (Jason
Lowe via jeagles) Lowe via jeagles)
YARN-2053. Fixed a bug in AMS to not add null NMToken into NMTokens list from
previous attempts for work-preserving AM restart. (Wangda Tan via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -298,9 +298,12 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
List<NMToken> nmTokens = new ArrayList<NMToken>(); List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) { for (Container container : transferredContainers) {
try { try {
nmTokens.add(rmContext.getNMTokenSecretManager() NMToken token = rmContext.getNMTokenSecretManager()
.createAndGetNMToken(app.getUser(), applicationAttemptId, .createAndGetNMToken(app.getUser(), applicationAttemptId,
container)); container);
if (null != token) {
nmTokens.add(token);
}
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// if it's a DNS issue, throw UnknowHostException directly and that // if it's a DNS issue, throw UnknowHostException directly and that
// will be automatically retried by RMProxy in RPC layer. // will be automatically retried by RMProxy in RPC layer.

View File

@ -264,30 +264,35 @@ public void testNMTokensRebindOnAMRestart() throws Exception {
nm2.registerNode(); nm2.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 1;
List<Container> containers = new ArrayList<Container>(); List<Container> containers = new ArrayList<Container>();
// nmTokens keeps track of all the nmTokens issued in the allocate call. // nmTokens keeps track of all the nmTokens issued in the allocate call.
List<NMToken> expectedNMTokens = new ArrayList<NMToken>(); List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
// am1 allocate 1 container on nm1. // am1 allocate 2 container on nm1.
// first container
while (true) { while (true) {
AllocateResponse response = AllocateResponse response =
am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS, am1.allocate("127.0.0.1", 2000, 2,
new ArrayList<ContainerId>()); new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
containers.addAll(response.getAllocatedContainers()); containers.addAll(response.getAllocatedContainers());
expectedNMTokens.addAll(response.getNMTokens()); expectedNMTokens.addAll(response.getNMTokens());
if (containers.size() == NUM_CONTAINERS) { if (containers.size() == 2) {
break; break;
} }
Thread.sleep(200); Thread.sleep(200);
System.out.println("Waiting for container to be allocated."); System.out.println("Waiting for container to be allocated.");
} }
// launch the container // launch the container-2
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 = ContainerId containerId2 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 2); ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// launch the container-3
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
ContainerId containerId3 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 3);
rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
// fail am1 // fail am1
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
@ -308,12 +313,12 @@ public void testNMTokensRebindOnAMRestart() throws Exception {
containers = new ArrayList<Container>(); containers = new ArrayList<Container>();
while (true) { while (true) {
AllocateResponse allocateResponse = AllocateResponse allocateResponse =
am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS, am2.allocate("127.1.1.1", 4000, 1,
new ArrayList<ContainerId>()); new ArrayList<ContainerId>());
nm2.nodeHeartbeat(true); nm2.nodeHeartbeat(true);
containers.addAll(allocateResponse.getAllocatedContainers()); containers.addAll(allocateResponse.getAllocatedContainers());
expectedNMTokens.addAll(allocateResponse.getNMTokens()); expectedNMTokens.addAll(allocateResponse.getNMTokens());
if (containers.size() == NUM_CONTAINERS) { if (containers.size() == 1) {
break; break;
} }
Thread.sleep(200); Thread.sleep(200);