diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3311a2e2dde..19e3e27cbbb 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -262,6 +262,9 @@ Release 2.8.0 - UNRELEASED YARN-3516. killing ContainerLocalizer action doesn't take effect when private localizer receives FETCH_FAILURE status.(zhihai xu via xgong) + YARN-3387. Previous AM's container completed status couldn't pass to current + AM if AM and RM restarted during the same time. (sandflee via jianhe) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index b4e4965c3b9..8abc47802f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1273,7 +1273,7 @@ public class RMAppImpl implements RMApp, Recoverable { // finished containers so that they can be acked to NM, // but when pulling finished container we will check this flag again. ((RMAppAttemptImpl) app.currentAttempt) - .transferStateFromPreviousAttempt(oldAttempt); + .transferStateFromAttempt(oldAttempt); return initialState; } else { if (numberOfFailure >= app.maxAppAttempts) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 913d06bd15f..8abc65adb6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -845,7 +845,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { attemptState.getMemorySeconds(),attemptState.getVcoreSeconds()); } - public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { + public void transferStateFromAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainersReference(); this.finishedContainersSentToAM = attempt.getFinishedContainersSentToAMReference(); @@ -1044,6 +1044,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.progress = 1.0f; RMApp rmApp =appAttempt.rmContext.getRMApps().get( appAttempt.getAppAttemptId().getApplicationId()); + + if (appAttempt.submissionContext + .getKeepContainersAcrossApplicationAttempts() + && !appAttempt.submissionContext.getUnmanagedAM() + && rmApp.getCurrentAppAttempt() != appAttempt) { + appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt()); + } // We will replay the final attempt only if last attempt is in final // state but application is not in final state. if (rmApp.getCurrentAppAttempt() == appAttempt diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 0566f3d5f62..c6fe37128a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1037,4 +1037,64 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase nm1.setResourceTrackerService(rm2.getResourceTrackerService()); rm2.start(); } + + @Test(timeout = 20000) + public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit app with keepContainersAcrossApplicationAttempts true + RMApp app0 = rm1.submitApp(200, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, + null, null, true, true, false, null, 0, null, true); + MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + am0.allocate("127.0.0.1", 1000, 2, new ArrayList()); + nm1.nodeHeartbeat(true); + List conts = am0.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (conts.size() == 0) { + nm1.nodeHeartbeat(true); + conts.addAll(am0.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(500); + } + + // am failed,and relaunch it + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + MockAM am1 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + // rm failover + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // container launched by first am completed + NMContainerStatus amContainer = + TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 1, + ContainerState.RUNNING); + NMContainerStatus completedContainer= + TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 2, + ContainerState.COMPLETE); + NMContainerStatus runningContainer = + TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 3, + ContainerState.RUNNING); + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer), null); + Thread.sleep(200); + + // check whether current am could get containerCompleteMsg + RMApp recoveredApp0 = + rm2.getRMContext().getRMApps().get(app0.getApplicationId()); + RMAppAttempt loadedAttempt1 = recoveredApp0.getCurrentAppAttempt(); + assertEquals(1,loadedAttempt1.getJustFinishedContainers().size()); + } + }