diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 600d34f29c4..0c92640c50f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -153,6 +153,9 @@ Release 2.0.5-beta - UNRELEASED YARN-112. Fixed a race condition during localization that fails containers. (Omkar Vinit Joshi via vinodkv) + YARN-534. Change RM restart recovery to also account for AM max-attempts + configuration after the restart. (Jian He via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 70fd2576ab0..7193a5998f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -57,6 +57,7 @@ public class RMAppManager implements EventHandler, private static final Log LOG = LogFactory.getLog(RMAppManager.class); private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS; + private int globalMaxAppAttempts; private LinkedList completedApps = new LinkedList(); private final RMContext rmContext; @@ -76,6 +77,8 @@ public RMAppManager(RMContext context, setCompletedAppsMax(conf.getInt( YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS)); + globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); } /** @@ -308,6 +311,7 @@ public void recover(RMState state) throws Exception { Map appStates = state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); for(ApplicationState appState : appStates.values()) { + boolean shouldRecover = true; // re-submit the application // this is going to send an app start event but since the async dispatcher // has not started that event will be queued until we have completed re @@ -318,16 +322,39 @@ public void recover(RMState state) throws Exception { // This will need to be changed in work preserving recovery in which // RM will re-connect with the running AM's instead of restarting them LOG.info("Not recovering unmanaged application " + appState.getAppId()); - store.removeApplication(appState); + shouldRecover = false; + } + int individualMaxAppAttempts = appState.getApplicationSubmissionContext() + .getMaxAppAttempts(); + int maxAppAttempts; + if (individualMaxAppAttempts <= 0 || + individualMaxAppAttempts > globalMaxAppAttempts) { + maxAppAttempts = globalMaxAppAttempts; + LOG.warn("The specific max attempts: " + individualMaxAppAttempts + + " for application: " + appState.getAppId() + + " is invalid, because it is out of the range [1, " + + globalMaxAppAttempts + "]. Use the global max attempts instead."); } else { + maxAppAttempts = individualMaxAppAttempts; + } + if(appState.getAttemptCount() >= maxAppAttempts) { + LOG.info("Not recovering application " + appState.getAppId() + + " due to recovering attempt is beyond maxAppAttempt limit"); + shouldRecover = false; + } + + if(shouldRecover) { LOG.info("Recovering application " + appState.getAppId()); submitApplication(appState.getApplicationSubmissionContext(), - appState.getSubmitTime()); + appState.getSubmitTime()); // re-populate attempt information in application RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get( - appState.getAppId()); + appState.getAppId()); appImpl.recover(state); } + else { + store.removeApplication(appState); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 12391c60978..bfed3a7e06e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -128,21 +128,28 @@ public RMApp submitApp(int masterMemory) throws Exception { // client public RMApp submitApp(int masterMemory, String name, String user) throws Exception { - return submitApp(masterMemory, name, user, null, false, null); + return submitApp(masterMemory, name, user, null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); } public RMApp submitApp(int masterMemory, String name, String user, Map acls) throws Exception { - return submitApp(masterMemory, name, user, acls, false, null); + return submitApp(masterMemory, name, user, acls, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); } public RMApp submitApp(int masterMemory, String name, String user, Map acls, String queue) throws Exception { - return submitApp(masterMemory, name, user, acls, false, queue); - } + return submitApp(masterMemory, name, user, acls, false, queue, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + } public RMApp submitApp(int masterMemory, String name, String user, - Map acls, boolean unmanaged, String queue) throws Exception { + Map acls, boolean unmanaged, String queue, + int maxAppAttempts) throws Exception { ClientRMProtocol client = getClientRMService(); GetNewApplicationResponse resp = client.getNewApplication(Records .newRecord(GetNewApplicationRequest.class)); @@ -155,6 +162,7 @@ public RMApp submitApp(int masterMemory, String name, String user, sub.setApplicationId(appId); sub.setApplicationName(name); sub.setUser(user); + sub.setMaxAppAttempts(maxAppAttempts); if(unmanaged) { sub.setUnmanagedAM(true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 2057d8ab419..d19879c0a3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -62,6 +64,7 @@ public void testRMRestart() throws Exception { "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); conf.set(YarnConfiguration.RM_SCHEDULER, "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); @@ -152,7 +155,9 @@ public void testRMRestart() throws Exception { .getApplicationId()); // create unmanaged app - RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, null); + RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, + null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); ApplicationAttemptId unmanagedAttemptId = appUnmanaged.getCurrentAppAttempt().getAppAttemptId(); // assert appUnmanaged info is saved @@ -306,4 +311,74 @@ public void testRMRestart() throws Exception { Assert.assertEquals(0, rmAppState.size()); } + @Test + public void testRMRestartOnMaxAppAttempts() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, + "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit an app with maxAppAttempts equals to 1 + RMApp app1 = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1); + // submit an app with maxAppAttempts equals to -1 + RMApp app2 = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", -1); + + // assert app1 info is saved + ApplicationState appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app1.getApplicationSubmissionContext() + .getApplicationId()); + + // Allocate the AM + nm1.nodeHeartbeat(true); + RMAppAttempt attempt = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + Assert.assertEquals(1, appState.getAttemptCount()); + ApplicationAttemptState attemptState = + appState.getAttempt(attemptId1); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), + attemptState.getMasterContainer().getId()); + rm1.stop(); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // verify that maxAppAttempts is set to global value + Assert.assertEquals(2, + rm2.getRMContext().getRMApps().get(app2.getApplicationId()) + .getMaxAppAttempts()); + + // verify that app2 exists app1 is removed + Assert.assertEquals(1, rm2.getRMContext().getRMApps().size()); + Assert.assertNotNull(rm2.getRMContext().getRMApps() + .get(app2.getApplicationId())); + Assert.assertNull(rm2.getRMContext().getRMApps() + .get(app1.getApplicationId())); + + // stop the RM + rm2.stop(); + } }