From 73563bc9b66e737aec56bb4f3d5579babda8f737 Mon Sep 17 00:00:00 2001 From: Eric Payne Date: Wed, 26 Apr 2017 14:33:27 -0500 Subject: [PATCH] YARN-5617. AMs only intended to run one attempt can be run more than once? Contributed by Jason Lowe. --- .../resourcemanager/rmapp/RMAppImpl.java | 19 +- .../applicationsmanager/TestAMRestart.java | 177 ++++++++++++++---- 2 files changed, 152 insertions(+), 44 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index a4feb5f1c5b..197283f11f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1357,11 +1357,20 @@ public AttemptFailedTransition(RMAppState initialState) { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { int numberOfFailure = app.getNumFailedAppAttempts(); - LOG.info("The number of failed attempts" - + (app.attemptFailuresValidityInterval > 0 ? " in previous " - + app.attemptFailuresValidityInterval + " milliseconds " : " ") - + "is " + numberOfFailure + ". The max attempts is " - + app.maxAppAttempts); + if (app.maxAppAttempts == 1) { + // If the user explicitly set the attempts to 1 then there are likely + // correctness issues if the AM restarts for any reason. + LOG.info("Max app attempts is 1 for " + app.applicationId + + ", preventing further attempts."); + numberOfFailure = app.maxAppAttempts; + } else { + LOG.info("The number of failed attempts" + + (app.attemptFailuresValidityInterval > 0 ? " in previous " + + app.attemptFailuresValidityInterval + " milliseconds " : " ") + + "is " + numberOfFailure + ". The max attempts is " + + app.maxAppAttempts); + } + if (!app.submissionContext.getUnmanagedAM() && numberOfFailure < app.maxAppAttempts) { if (initialState.equals(RMAppState.KILLING)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index fd478f684ea..c5c286a3ff7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -379,8 +379,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - // explicitly set max-am-retry count as 1. - conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); MemoryRMStateStore memStore = new MemoryRMStateStore(); @@ -397,7 +396,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { (CapacityScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); - // Preempt the first attempt; + // Preempt the next attempt; scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); @@ -405,11 +404,10 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); ApplicationStateData appState = memStore.getState().getApplicationState().get(app1.getApplicationId()); - // AM should be restarted even though max-am-attempt is 1. MockAM am2 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + Assert.assertFalse(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); // Preempt the second attempt. ContainerId amContainer2 = @@ -422,7 +420,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { MockAM am3 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt()); + Assert.assertFalse(((RMAppAttemptImpl) attempt3).mayBeLastAttempt()); // mimic NM disk_failure ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); @@ -446,7 +444,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { MockAM am4 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1); RMAppAttempt attempt4 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); + Assert.assertFalse(((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); // create second NM, and register to rm1 MockNM nm2 = @@ -456,7 +454,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { // This will mimic ContainerExitStatus.ABORT nm1.nodeHeartbeat(false); am4.waitForState(RMAppAttemptState.FAILED); - Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry()); + Assert.assertFalse(attempt4.shouldCountTowardsMaxAttemptRetry()); Assert.assertEquals(ContainerExitStatus.ABORTED, appState.getAttempt(am4.getApplicationAttemptId()) .getAMContainerExitStatus()); @@ -464,23 +462,72 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { MockAM am5 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2); RMAppAttempt attempt5 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt()); + Assert.assertFalse(((RMAppAttemptImpl) attempt5).mayBeLastAttempt()); // fail the AM normally nm2 .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am5.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry()); + // launch next AM in nm2 + MockAM am6 = + rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 6, nm2); + RMAppAttempt attempt6 = app1.getCurrentAppAttempt(); + + // fail the AM normally + nm2 + .nodeHeartbeat(am6.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(am6.getApplicationAttemptId(), RMAppAttemptState.FAILED); + waitSchedulerApplicationAttemptStopped(scheduler, + am6.getApplicationAttemptId()); + + Assert.assertTrue(attempt6.shouldCountTowardsMaxAttemptRetry()); + // AM should not be restarted. rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); - Assert.assertEquals(5, app1.getAppAttempts().size()); + Assert.assertEquals(6, app1.getAppAttempts().size()); + rm1.stop(); + } + + @Test(timeout = 100000) + public void testMaxAttemptOneMeansOne() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + // Preempt the attempt; + scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + waitSchedulerApplicationAttemptStopped(scheduler, + am1.getApplicationAttemptId()); + + // AM should not be restarted. + rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); + Assert.assertEquals(1, app1.getAppAttempts().size()); rm1.stop(); } // Test RM restarts after AM container is preempted, new RM should not count // AM preemption failure towards the max-retry-account and should be able to // re-launch the AM. - @Test(timeout = 60000) + @Test//(timeout = 60000) public void testPreemptedAMRestartOnRMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, @@ -489,8 +536,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - // explicitly set max-am-retry count as 1. - conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); @@ -507,21 +553,36 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + // fail the AM normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, + ContainerState.COMPLETE); + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + waitSchedulerApplicationAttemptStopped(scheduler, + am1.getApplicationAttemptId()); + Assert.assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry()); + + // wait for the next AM to start + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am2 = + rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + // Forcibly preempt the am container; + amContainer = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); - am1.waitForState(RMAppAttemptState.FAILED); - Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); + am2.waitForState(RMAppAttemptState.FAILED); + Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); - // state store has 1 attempt stored. + // state store has 2 attempts stored. ApplicationStateData appState = memStore.getState().getApplicationState().get(app1.getApplicationId()); - Assert.assertEquals(1, appState.getAttemptCount()); + Assert.assertEquals(2, appState.getAttemptCount()); // attempt stored has the preempted container exit status. Assert.assertEquals(ContainerExitStatus.PREEMPTED, - appState.getAttempt(am1.getApplicationAttemptId()) - .getAMContainerExitStatus()); + appState.getAttempt(am2.getApplicationAttemptId()) + .getAMContainerExitStatus()); // Restart rm. MockRM rm2 = new MockRM(conf, memStore); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -529,16 +590,16 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { rm2.start(); // Restarted RM should re-launch the am. - MockAM am2 = - rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); - MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2); - RMAppAttempt attempt2 = + MockAM am3 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am3); + RMAppAttempt attempt3 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()) .getCurrentAppAttempt(); - Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry()); + Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry()); Assert.assertEquals(ContainerExitStatus.INVALID, - appState.getAttempt(am2.getApplicationAttemptId()) - .getAMContainerExitStatus()); + appState.getAttempt(am3.getApplicationAttemptId()) + .getAMContainerExitStatus()); rm1.stop(); rm2.stop(); } @@ -556,21 +617,36 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - // explicitly set max-am-retry count as 1. - conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + // explicitly set max-am-retry count as 2. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); MockRM rm1 = new MockRM(conf, memStore); rm1.start(); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); - // AM should be restarted even though max-am-attempt is 1. MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); - Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt()); + Assert.assertFalse(((RMAppAttemptImpl) attempt1).mayBeLastAttempt()); + + // fail the AM normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, + ContainerState.COMPLETE); + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + waitSchedulerApplicationAttemptStopped(scheduler, + am1.getApplicationAttemptId()); + Assert.assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry()); + + // wait for the next AM to start + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am2 = + rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); // Restart rm. MockRM rm2 = new MockRM(conf, memStore); @@ -582,27 +658,27 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() NMContainerStatus status = Records.newRecord(NMContainerStatus.class); status .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); - status.setContainerId(attempt1.getMasterContainer().getId()); + status.setContainerId(attempt2.getMasterContainer().getId()); status.setContainerState(ContainerState.COMPLETE); status.setDiagnostics(""); nm1.registerNode(Collections.singletonList(status), null); - rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED); + rm2.waitForState(attempt2.getAppAttemptId(), RMAppAttemptState.FAILED); Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, - appState.getAttempt(am1.getApplicationAttemptId()) - .getAMContainerExitStatus()); + appState.getAttempt(am2.getApplicationAttemptId()) + .getAMContainerExitStatus()); // Will automatically start a new AppAttempt in rm2 rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); - MockAM am2 = - rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); - MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2); + MockAM am3 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am3); RMAppAttempt attempt3 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()) .getCurrentAppAttempt(); Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry()); Assert.assertEquals(ContainerExitStatus.INVALID, - appState.getAttempt(am2.getApplicationAttemptId()) - .getAMContainerExitStatus()); + appState.getAttempt(am3.getApplicationAttemptId()) + .getAMContainerExitStatus()); rm1.stop(); rm2.stop(); @@ -837,4 +913,27 @@ public void testAMRestartNotLostContainerCompleteMsg() throws Exception { rm1.stop(); } + + static void waitSchedulerApplicationAttemptStopped( + AbstractYarnScheduler ys, + ApplicationAttemptId attemptId) throws InterruptedException { + SchedulerApplicationAttempt schedulerApp = + ys.getApplicationAttempt(attemptId); + if (null == schedulerApp) { + return; + } + + // Wait at most 5 secs to make sure SchedulerApplicationAttempt stopped + int tick = 0; + while (tick < 100) { + if (schedulerApp.isStopped()) { + return; + } + tick++; + Thread.sleep(50); + } + + // Only print, don't throw exception + System.err.println("Failed to wait scheduler application attempt stopped."); + } }