From e30668106dc246f68db36fbd1f2db6ec08cd96f2 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Fri, 22 Jan 2016 10:14:46 +0530 Subject: [PATCH] YARN-4584. RM startup failure when AM attempts greater than max-attempts. (Bibin A Chundatt via rohithsharmaks) --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/rmapp/RMAppImpl.java | 23 +++++--- .../server/resourcemanager/TestRMRestart.java | 58 +++++++++++++++++++ 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f789bcb1b3a..a7a63b1829e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -137,6 +137,9 @@ Release 2.9.0 - UNRELEASED YARN-4611. Fix scheduler load simulator to support multi-layer network location. (Ming Ma via xgong) + YARN-4584. RM startup failure when AM attempts greater than max-attempts. + (Bibin A Chundatt via rohithsharmaks) + Release 2.8.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 6ecc7d321ce..1a390df2180 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -841,7 +841,7 @@ public class RMAppImpl implements RMApp, Recoverable { this.startTime = appState.getStartTime(); this.callerContext = appState.getCallerContext(); // If interval > 0, some attempts might have been deleted. - if (submissionContext.getAttemptFailuresValidityInterval() > 0) { + if (this.attemptFailuresValidityInterval > 0) { this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); this.nextAttemptId = firstAttemptIdInStateStore; } @@ -1341,7 +1341,9 @@ public class RMAppImpl implements RMApp, Recoverable { + "is " + numberOfFailure + ". The max attempts is " + app.maxAppAttempts); - removeExcessAttempts(app); + if (app.attemptFailuresValidityInterval > 0) { + removeExcessAttempts(app); + } if (!app.submissionContext.getUnmanagedAM() && numberOfFailure < app.maxAppAttempts) { @@ -1381,15 +1383,22 @@ public class RMAppImpl implements RMApp, Recoverable { } private void removeExcessAttempts(RMAppImpl app) { - while (app.nextAttemptId - app.firstAttemptIdInStateStore - > app.maxAppAttempts) { + while (app.nextAttemptId + - app.firstAttemptIdInStateStore > app.maxAppAttempts) { // attempts' first element is oldest attempt because it is a // LinkedHashMap ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( app.getApplicationId(), app.firstAttemptIdInStateStore); - app.firstAttemptIdInStateStore++; - LOG.info("Remove attempt from state store : " + attemptId); - app.rmContext.getStateStore().removeApplicationAttempt(attemptId); + RMAppAttempt rmAppAttempt = app.getRMAppAttempt(attemptId); + long endTime = app.systemClock.getTime(); + if (rmAppAttempt.getFinishTime() < (endTime + - app.attemptFailuresValidityInterval)) { + app.firstAttemptIdInStateStore++; + LOG.info("Remove attempt from state store : " + attemptId); + app.rmContext.getStateStore().removeApplicationAttempt(attemptId); + } else { + break; + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index bad68f4f9e8..625c3b890f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -42,6 +42,8 @@ import java.util.Map; import java.util.Set; import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; @@ -104,6 +106,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -121,6 +125,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; public class TestRMRestart extends ParameterizedSchedulerTestBase { + private static final Log LOG = LogFactory.getLog(TestRMRestart.class); private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); @@ -2321,4 +2326,57 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { rm2.stop(); } + @Test(timeout = 120000) + public void testRMRestartAfterPreemption() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + if (!getSchedulerType().equals(SchedulerType.CAPACITY)) { + return; + } + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + int CONTAINER_MEMORY = 1024; + // create app and launch the AM + RMApp app0 = rm1.submitApp(CONTAINER_MEMORY); + MockAM am0 = MockRM.launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, + ContainerState.COMPLETE); + am0.waitForState(RMAppAttemptState.FAILED); + for (int i = 0; i < 4; i++) { + am0 = MockRM.launchAM(app0, rm1, nm1); + am0.registerAppAttempt(); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + // get scheduler app + FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() + .get(app0.getApplicationId()).getCurrentAppAttempt(); + // kill app0-attempt + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer( + app0.getCurrentAppAttempt().getMasterContainer().getId())); + } + am0 = MockRM.launchAM(app0, rm1, nm1); + am0.registerAppAttempt(); + rm1.killApp(app0.getApplicationId()); + rm1.waitForState(app0.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.KILLED); + + MockRM rm2 = null; + // start RM2 + try { + rm2 = new MockRM(conf, memStore); + rm2.start(); + Assert.assertTrue("RM start successfully", true); + } catch (Exception e) { + LOG.debug("Exception on start", e); + Assert.fail("RM should start with out any issue"); + } finally { + rm1.stop(); + } + } + }