YARN-3387. Previous AM's container completed status couldn't pass to current AM if AM and RM restarted during the same time. Contributed by Sandflee

(cherry picked from commit d03dcb9635)
This commit is contained in:
Jian He 2015-04-24 12:12:28 -07:00
parent 89a15d6074
commit 0583c27fb1
4 changed files with 72 additions and 2 deletions

View File

@ -214,6 +214,9 @@ Release 2.8.0 - UNRELEASED
YARN-3516. killing ContainerLocalizer action doesn't take effect when
private localizer receives FETCH_FAILURE status.(zhihai xu via xgong)
YARN-3387. Previous AM's container completed status couldn't pass to current
AM if AM and RM restarted during the same time. (sandflee via jianhe)
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1273,7 +1273,7 @@ public class RMAppImpl implements RMApp, Recoverable {
// finished containers so that they can be acked to NM,
// but when pulling finished container we will check this flag again.
((RMAppAttemptImpl) app.currentAttempt)
.transferStateFromPreviousAttempt(oldAttempt);
.transferStateFromAttempt(oldAttempt);
return initialState;
} else {
if (numberOfFailure >= app.maxAppAttempts) {

View File

@ -845,7 +845,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
}
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
public void transferStateFromAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainersReference();
this.finishedContainersSentToAM =
attempt.getFinishedContainersSentToAMReference();
@ -1044,6 +1044,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.progress = 1.0f;
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
appAttempt.getAppAttemptId().getApplicationId());
if (appAttempt.submissionContext
.getKeepContainersAcrossApplicationAttempts()
&& !appAttempt.submissionContext.getUnmanagedAM()
&& rmApp.getCurrentAppAttempt() != appAttempt) {
appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt());
}
// We will replay the final attempt only if last attempt is in final
// state but application is not in final state.
if (rmApp.getCurrentAppAttempt() == appAttempt

View File

@ -1037,4 +1037,64 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
rm2.start();
}
@Test(timeout = 20000)
public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
// submit app with keepContainersAcrossApplicationAttempts true
RMApp app0 = rm1.submitApp(200, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS,
null, null, true, true, false, null, 0, null, true);
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
am0.allocate("127.0.0.1", 1000, 2, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
List<Container> conts = am0.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (conts.size() == 0) {
nm1.nodeHeartbeat(true);
conts.addAll(am0.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(500);
}
// am failed,and relaunch it
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
MockAM am1 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
// rm failover
rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// container launched by first am completed
NMContainerStatus amContainer =
TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 1,
ContainerState.RUNNING);
NMContainerStatus completedContainer=
TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 2,
ContainerState.COMPLETE);
NMContainerStatus runningContainer =
TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 3,
ContainerState.RUNNING);
nm1.registerNode(Arrays.asList(amContainer, runningContainer,
completedContainer), null);
Thread.sleep(200);
// check whether current am could get containerCompleteMsg
RMApp recoveredApp0 =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
RMAppAttempt loadedAttempt1 = recoveredApp0.getCurrentAppAttempt();
assertEquals(1,loadedAttempt1.getJustFinishedContainers().size());
}
}