YARN-4740. AM may not receive the container complete msg when it restarts. Contributed by Jun Gong

This commit is contained in:
Jian He 2016-04-08 11:19:36 -07:00
parent ce5b4812f0
commit 9cb0c963d2
2 changed files with 126 additions and 5 deletions

View File

@ -927,6 +927,20 @@ public void transferStateFromAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainersReference(); this.justFinishedContainers = attempt.getJustFinishedContainersReference();
this.finishedContainersSentToAM = this.finishedContainersSentToAM =
attempt.getFinishedContainersSentToAMReference(); attempt.getFinishedContainersSentToAMReference();
// container complete msg was moved from justFinishedContainers to
// finishedContainersSentToAM in ApplicationMasterService#allocate,
// if am crashed and not received this response, we should resend
// this msg again after am restart
if (!this.finishedContainersSentToAM.isEmpty()) {
for (NodeId nodeId : this.finishedContainersSentToAM.keySet()) {
List<ContainerStatus> containerStatuses =
this.finishedContainersSentToAM.get(nodeId);
this.justFinishedContainers.putIfAbsent(nodeId,
new ArrayList<ContainerStatus>());
this.justFinishedContainers.get(nodeId).addAll(containerStatuses);
}
this.finishedContainersSentToAM.clear();
}
} }
private void recoverAppAttemptCredentials(Credentials appAttemptTokens, private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
@ -1845,13 +1859,13 @@ private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
} else { } else {
LOG.warn("No ContainerStatus in containerFinishedEvent"); LOG.warn("No ContainerStatus in containerFinishedEvent");
} }
finishedContainersSentToAM.putIfAbsent(nodeId,
new ArrayList<ContainerStatus>());
appAttempt.finishedContainersSentToAM.get(nodeId).add(
containerFinishedEvent.getContainerStatus());
if (!appAttempt.getSubmissionContext() if (!appAttempt.getSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) { .getKeepContainersAcrossApplicationAttempts()) {
finishedContainersSentToAM.putIfAbsent(nodeId,
new ArrayList<ContainerStatus>());
appAttempt.finishedContainersSentToAM.get(nodeId).add(
containerFinishedEvent.getContainerStatus());
appAttempt.sendFinishedContainersToNM(); appAttempt.sendFinishedContainersToNM();
} else { } else {
appAttempt.sendFinishedAMContainerToNM(nodeId, appAttempt.sendFinishedAMContainerToNM(nodeId,

View File

@ -907,4 +907,111 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
rm1.stop(); rm1.stop();
rm2.stop(); rm2.stop();
} }
private boolean isContainerIdInContainerStatus(
List<ContainerStatus> containerStatuses, ContainerId containerId) {
for (ContainerStatus status : containerStatuses) {
if (status.getContainerId().equals(containerId)) {
return true;
}
}
return false;
}
@Test(timeout = 30000)
public void testAMRestartNotLostContainerCompleteMsg() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MockRM rm1 = new MockRM(conf);
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null, "MAPREDUCE", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
allocateContainers(nm1, am1, 1);
nm1.nodeHeartbeat(
am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// container complete
nm1.nodeHeartbeat(
am1.getApplicationAttemptId(), 2, ContainerState.COMPLETE);
rm1.waitForState(nm1, containerId2, RMContainerState.COMPLETED);
// make sure allocate() get complete container,
// before this msg pass to AM, AM may crash
while (true) {
AllocateResponse response = am1.allocate(
new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
List<ContainerStatus> containerStatuses =
response.getCompletedContainersStatuses();
if (isContainerIdInContainerStatus(
containerStatuses, containerId2) == false) {
Thread.sleep(100);
continue;
}
// is containerId still in justFinishedContainer?
containerStatuses =
app1.getCurrentAppAttempt().getJustFinishedContainers();
if (isContainerIdInContainerStatus(containerStatuses,
containerId2)) {
Assert.fail();
}
break;
}
// fail the AM by sending CONTAINER_FINISHED event without registering.
nm1.nodeHeartbeat(
am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am1.waitForState(RMAppAttemptState.FAILED);
// wait for app to start a new attempt.
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// assert this is a new AM.
ApplicationAttemptId newAttemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
// launch the new AM
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
nm1.nodeHeartbeat(true);
MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId());
am2.registerAppAttempt();
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
// whether new AM could get container complete msg
AllocateResponse allocateResponse = am2.allocate(
new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
List<ContainerStatus> containerStatuses =
allocateResponse.getCompletedContainersStatuses();
if (isContainerIdInContainerStatus(containerStatuses,
containerId2) == false) {
Assert.fail();
}
containerStatuses = attempt2.getJustFinishedContainers();
if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
Assert.fail();
}
// the second allocate should not get container complete msg
allocateResponse = am2.allocate(
new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
containerStatuses =
allocateResponse.getCompletedContainersStatuses();
if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
Assert.fail();
}
rm1.stop();
}
} }