YARN-4740. AM may not receive the container complete msg when it restarts. Contributed by Jun Gong
This commit is contained in:
parent
bc148530d4
commit
4cf0b1932f
|
@ -924,6 +924,20 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
this.justFinishedContainers = attempt.getJustFinishedContainersReference();
|
this.justFinishedContainers = attempt.getJustFinishedContainersReference();
|
||||||
this.finishedContainersSentToAM =
|
this.finishedContainersSentToAM =
|
||||||
attempt.getFinishedContainersSentToAMReference();
|
attempt.getFinishedContainersSentToAMReference();
|
||||||
|
// container complete msg was moved from justFinishedContainers to
|
||||||
|
// finishedContainersSentToAM in ApplicationMasterService#allocate,
|
||||||
|
// if am crashed and not received this response, we should resend
|
||||||
|
// this msg again after am restart
|
||||||
|
if (!this.finishedContainersSentToAM.isEmpty()) {
|
||||||
|
for (NodeId nodeId : this.finishedContainersSentToAM.keySet()) {
|
||||||
|
List<ContainerStatus> containerStatuses =
|
||||||
|
this.finishedContainersSentToAM.get(nodeId);
|
||||||
|
this.justFinishedContainers.putIfAbsent(nodeId,
|
||||||
|
new ArrayList<ContainerStatus>());
|
||||||
|
this.justFinishedContainers.get(nodeId).addAll(containerStatuses);
|
||||||
|
}
|
||||||
|
this.finishedContainersSentToAM.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
|
private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
|
||||||
|
@ -1842,13 +1856,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("No ContainerStatus in containerFinishedEvent");
|
LOG.warn("No ContainerStatus in containerFinishedEvent");
|
||||||
}
|
}
|
||||||
finishedContainersSentToAM.putIfAbsent(nodeId,
|
|
||||||
new ArrayList<ContainerStatus>());
|
|
||||||
appAttempt.finishedContainersSentToAM.get(nodeId).add(
|
|
||||||
containerFinishedEvent.getContainerStatus());
|
|
||||||
|
|
||||||
if (!appAttempt.getSubmissionContext()
|
if (!appAttempt.getSubmissionContext()
|
||||||
.getKeepContainersAcrossApplicationAttempts()) {
|
.getKeepContainersAcrossApplicationAttempts()) {
|
||||||
|
finishedContainersSentToAM.putIfAbsent(nodeId,
|
||||||
|
new ArrayList<ContainerStatus>());
|
||||||
|
appAttempt.finishedContainersSentToAM.get(nodeId).add(
|
||||||
|
containerFinishedEvent.getContainerStatus());
|
||||||
appAttempt.sendFinishedContainersToNM();
|
appAttempt.sendFinishedContainersToNM();
|
||||||
} else {
|
} else {
|
||||||
appAttempt.sendFinishedAMContainerToNM(nodeId,
|
appAttempt.sendFinishedAMContainerToNM(nodeId,
|
||||||
|
|
|
@ -900,4 +900,111 @@ public class TestAMRestart {
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isContainerIdInContainerStatus(
|
||||||
|
List<ContainerStatus> containerStatuses, ContainerId containerId) {
|
||||||
|
for (ContainerStatus status : containerStatuses) {
|
||||||
|
if (status.getContainerId().equals(containerId)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testAMRestartNotLostContainerCompleteMsg() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
RMApp app1 =
|
||||||
|
rm1.submitApp(200, "name", "user",
|
||||||
|
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
|
||||||
|
null, "MAPREDUCE", false, true);
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
allocateContainers(nm1, am1, 1);
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
|
||||||
|
ContainerId containerId2 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
||||||
|
|
||||||
|
// container complete
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
am1.getApplicationAttemptId(), 2, ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.COMPLETED);
|
||||||
|
|
||||||
|
// make sure allocate() get complete container,
|
||||||
|
// before this msg pass to AM, AM may crash
|
||||||
|
while (true) {
|
||||||
|
AllocateResponse response = am1.allocate(
|
||||||
|
new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
|
||||||
|
List<ContainerStatus> containerStatuses =
|
||||||
|
response.getCompletedContainersStatuses();
|
||||||
|
if (isContainerIdInContainerStatus(
|
||||||
|
containerStatuses, containerId2) == false) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// is containerId still in justFinishedContainer?
|
||||||
|
containerStatuses =
|
||||||
|
app1.getCurrentAppAttempt().getJustFinishedContainers();
|
||||||
|
if (isContainerIdInContainerStatus(containerStatuses,
|
||||||
|
containerId2)) {
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fail the AM by sending CONTAINER_FINISHED event without registering.
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
am1.waitForState(RMAppAttemptState.FAILED);
|
||||||
|
|
||||||
|
// wait for app to start a new attempt.
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
// assert this is a new AM.
|
||||||
|
ApplicationAttemptId newAttemptId =
|
||||||
|
app1.getCurrentAppAttempt().getAppAttemptId();
|
||||||
|
Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
|
||||||
|
|
||||||
|
// launch the new AM
|
||||||
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId());
|
||||||
|
am2.registerAppAttempt();
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||||
|
|
||||||
|
// whether new AM could get container complete msg
|
||||||
|
AllocateResponse allocateResponse = am2.allocate(
|
||||||
|
new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
|
||||||
|
List<ContainerStatus> containerStatuses =
|
||||||
|
allocateResponse.getCompletedContainersStatuses();
|
||||||
|
if (isContainerIdInContainerStatus(containerStatuses,
|
||||||
|
containerId2) == false) {
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
containerStatuses = attempt2.getJustFinishedContainers();
|
||||||
|
if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
// the second allocate should not get container complete msg
|
||||||
|
allocateResponse = am2.allocate(
|
||||||
|
new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
|
||||||
|
containerStatuses =
|
||||||
|
allocateResponse.getCompletedContainersStatuses();
|
||||||
|
if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue