YARN-5617. AMs only intended to run one attempt can be run more than once? Contributed by Jason Lowe.

This commit is contained in:
Eric Payne 2017-04-26 14:33:27 -05:00
parent 19e94bfd1b
commit 73563bc9b6
2 changed files with 152 additions and 44 deletions

View File

@ -1357,11 +1357,20 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override @Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) { public RMAppState transition(RMAppImpl app, RMAppEvent event) {
int numberOfFailure = app.getNumFailedAppAttempts(); int numberOfFailure = app.getNumFailedAppAttempts();
LOG.info("The number of failed attempts" if (app.maxAppAttempts == 1) {
+ (app.attemptFailuresValidityInterval > 0 ? " in previous " // If the user explicitly set the attempts to 1 then there are likely
+ app.attemptFailuresValidityInterval + " milliseconds " : " ") // correctness issues if the AM restarts for any reason.
+ "is " + numberOfFailure + ". The max attempts is " LOG.info("Max app attempts is 1 for " + app.applicationId
+ app.maxAppAttempts); + ", preventing further attempts.");
numberOfFailure = app.maxAppAttempts;
} else {
LOG.info("The number of failed attempts"
+ (app.attemptFailuresValidityInterval > 0 ? " in previous "
+ app.attemptFailuresValidityInterval + " milliseconds " : " ")
+ "is " + numberOfFailure + ". The max attempts is "
+ app.maxAppAttempts);
}
if (!app.submissionContext.getUnmanagedAM() if (!app.submissionContext.getUnmanagedAM()
&& numberOfFailure < app.maxAppAttempts) { && numberOfFailure < app.maxAppAttempts) {
if (initialState.equals(RMAppState.KILLING)) { if (initialState.equals(RMAppState.KILLING)) {

View File

@ -379,8 +379,7 @@ public class TestAMRestart {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class); ResourceScheduler.class);
// explicitly set max-am-retry count as 1. conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MemoryRMStateStore memStore = new MemoryRMStateStore(); MemoryRMStateStore memStore = new MemoryRMStateStore();
@ -397,7 +396,7 @@ public class TestAMRestart {
(CapacityScheduler) rm1.getResourceScheduler(); (CapacityScheduler) rm1.getResourceScheduler();
ContainerId amContainer = ContainerId amContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Preempt the first attempt; // Preempt the next attempt;
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED); am1.waitForState(RMAppAttemptState.FAILED);
@ -405,11 +404,10 @@ public class TestAMRestart {
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
ApplicationStateData appState = ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId()); memStore.getState().getApplicationState().get(app1.getApplicationId());
// AM should be restarted even though max-am-attempt is 1.
MockAM am2 = MockAM am2 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); Assert.assertFalse(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
// Preempt the second attempt. // Preempt the second attempt.
ContainerId amContainer2 = ContainerId amContainer2 =
@ -422,7 +420,7 @@ public class TestAMRestart {
MockAM am3 = MockAM am3 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt()); Assert.assertFalse(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
// mimic NM disk_failure // mimic NM disk_failure
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
@ -446,7 +444,7 @@ public class TestAMRestart {
MockAM am4 = MockAM am4 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1); rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
RMAppAttempt attempt4 = app1.getCurrentAppAttempt(); RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); Assert.assertFalse(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
// create second NM, and register to rm1 // create second NM, and register to rm1
MockNM nm2 = MockNM nm2 =
@ -456,7 +454,7 @@ public class TestAMRestart {
// This will mimic ContainerExitStatus.ABORT // This will mimic ContainerExitStatus.ABORT
nm1.nodeHeartbeat(false); nm1.nodeHeartbeat(false);
am4.waitForState(RMAppAttemptState.FAILED); am4.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry()); Assert.assertFalse(attempt4.shouldCountTowardsMaxAttemptRetry());
Assert.assertEquals(ContainerExitStatus.ABORTED, Assert.assertEquals(ContainerExitStatus.ABORTED,
appState.getAttempt(am4.getApplicationAttemptId()) appState.getAttempt(am4.getApplicationAttemptId())
.getAMContainerExitStatus()); .getAMContainerExitStatus());
@ -464,23 +462,72 @@ public class TestAMRestart {
MockAM am5 = MockAM am5 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2); rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
RMAppAttempt attempt5 = app1.getCurrentAppAttempt(); RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt()); Assert.assertFalse(((RMAppAttemptImpl) attempt5).mayBeLastAttempt());
// fail the AM normally // fail the AM normally
nm2 nm2
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am5.waitForState(RMAppAttemptState.FAILED); am5.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry()); Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
// launch next AM in nm2
MockAM am6 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 6, nm2);
RMAppAttempt attempt6 = app1.getCurrentAppAttempt();
// fail the AM normally
nm2
.nodeHeartbeat(am6.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm1.waitForState(am6.getApplicationAttemptId(), RMAppAttemptState.FAILED);
waitSchedulerApplicationAttemptStopped(scheduler,
am6.getApplicationAttemptId());
Assert.assertTrue(attempt6.shouldCountTowardsMaxAttemptRetry());
// AM should not be restarted. // AM should not be restarted.
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
Assert.assertEquals(5, app1.getAppAttempts().size()); Assert.assertEquals(6, app1.getAppAttempts().size());
rm1.stop();
}
@Test(timeout = 100000)
public void testMaxAttemptOneMeansOne() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
CapacityScheduler scheduler =
(CapacityScheduler) rm1.getResourceScheduler();
ContainerId amContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Preempt the attempt;
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
waitSchedulerApplicationAttemptStopped(scheduler,
am1.getApplicationAttemptId());
// AM should not be restarted.
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
Assert.assertEquals(1, app1.getAppAttempts().size());
rm1.stop(); rm1.stop();
} }
// Test RM restarts after AM container is preempted, new RM should not count // Test RM restarts after AM container is preempted, new RM should not count
// AM preemption failure towards the max-retry-account and should be able to // AM preemption failure towards the max-retry-account and should be able to
// re-launch the AM. // re-launch the AM.
@Test(timeout = 60000) @Test//(timeout = 60000)
public void testPreemptedAMRestartOnRMRestart() throws Exception { public void testPreemptedAMRestartOnRMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
@ -489,8 +536,7 @@ public class TestAMRestart {
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// explicitly set max-am-retry count as 1. conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore(); MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf); memStore.init(conf);
@ -507,21 +553,36 @@ public class TestAMRestart {
ContainerId amContainer = ContainerId amContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// fail the AM normally
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1,
ContainerState.COMPLETE);
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
waitSchedulerApplicationAttemptStopped(scheduler,
am1.getApplicationAttemptId());
Assert.assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry());
// wait for the next AM to start
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am2 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
// Forcibly preempt the am container; // Forcibly preempt the am container;
amContainer = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED); am2.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// state store has 1 attempt stored. // state store has 2 attempts stored.
ApplicationStateData appState = ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId()); memStore.getState().getApplicationState().get(app1.getApplicationId());
Assert.assertEquals(1, appState.getAttemptCount()); Assert.assertEquals(2, appState.getAttemptCount());
// attempt stored has the preempted container exit status. // attempt stored has the preempted container exit status.
Assert.assertEquals(ContainerExitStatus.PREEMPTED, Assert.assertEquals(ContainerExitStatus.PREEMPTED,
appState.getAttempt(am1.getApplicationAttemptId()) appState.getAttempt(am2.getApplicationAttemptId())
.getAMContainerExitStatus()); .getAMContainerExitStatus());
// Restart rm. // Restart rm.
MockRM rm2 = new MockRM(conf, memStore); MockRM rm2 = new MockRM(conf, memStore);
nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1.setResourceTrackerService(rm2.getResourceTrackerService());
@ -529,16 +590,16 @@ public class TestAMRestart {
rm2.start(); rm2.start();
// Restarted RM should re-launch the am. // Restarted RM should re-launch the am.
MockAM am2 = MockAM am3 =
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2); MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am3);
RMAppAttempt attempt2 = RMAppAttempt attempt3 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId()) rm2.getRMContext().getRMApps().get(app1.getApplicationId())
.getCurrentAppAttempt(); .getCurrentAppAttempt();
Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry()); Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
Assert.assertEquals(ContainerExitStatus.INVALID, Assert.assertEquals(ContainerExitStatus.INVALID,
appState.getAttempt(am2.getApplicationAttemptId()) appState.getAttempt(am3.getApplicationAttemptId())
.getAMContainerExitStatus()); .getAMContainerExitStatus());
rm1.stop(); rm1.stop();
rm2.stop(); rm2.stop();
} }
@ -556,21 +617,36 @@ public class TestAMRestart {
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// explicitly set max-am-retry count as 1. // explicitly set max-am-retry count as 2.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MemoryRMStateStore memStore = new MemoryRMStateStore(); MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf); memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore); MockRM rm1 = new MockRM(conf, memStore);
rm1.start(); rm1.start();
CapacityScheduler scheduler =
(CapacityScheduler) rm1.getResourceScheduler();
MockNM nm1 = MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode(); nm1.registerNode();
RMApp app1 = rm1.submitApp(200); RMApp app1 = rm1.submitApp(200);
// AM should be restarted even though max-am-attempt is 1.
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt()); Assert.assertFalse(((RMAppAttemptImpl) attempt1).mayBeLastAttempt());
// fail the AM normally
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1,
ContainerState.COMPLETE);
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
waitSchedulerApplicationAttemptStopped(scheduler,
am1.getApplicationAttemptId());
Assert.assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry());
// wait for the next AM to start
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am2 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
// Restart rm. // Restart rm.
MockRM rm2 = new MockRM(conf, memStore); MockRM rm2 = new MockRM(conf, memStore);
@ -582,27 +658,27 @@ public class TestAMRestart {
NMContainerStatus status = Records.newRecord(NMContainerStatus.class); NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
status status
.setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
status.setContainerId(attempt1.getMasterContainer().getId()); status.setContainerId(attempt2.getMasterContainer().getId());
status.setContainerState(ContainerState.COMPLETE); status.setContainerState(ContainerState.COMPLETE);
status.setDiagnostics(""); status.setDiagnostics("");
nm1.registerNode(Collections.singletonList(status), null); nm1.registerNode(Collections.singletonList(status), null);
rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED); rm2.waitForState(attempt2.getAppAttemptId(), RMAppAttemptState.FAILED);
Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
appState.getAttempt(am1.getApplicationAttemptId()) appState.getAttempt(am2.getApplicationAttemptId())
.getAMContainerExitStatus()); .getAMContainerExitStatus());
// Will automatically start a new AppAttempt in rm2 // Will automatically start a new AppAttempt in rm2
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am2 = MockAM am3 =
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2); MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am3);
RMAppAttempt attempt3 = RMAppAttempt attempt3 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId()) rm2.getRMContext().getRMApps().get(app1.getApplicationId())
.getCurrentAppAttempt(); .getCurrentAppAttempt();
Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry()); Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
Assert.assertEquals(ContainerExitStatus.INVALID, Assert.assertEquals(ContainerExitStatus.INVALID,
appState.getAttempt(am2.getApplicationAttemptId()) appState.getAttempt(am3.getApplicationAttemptId())
.getAMContainerExitStatus()); .getAMContainerExitStatus());
rm1.stop(); rm1.stop();
rm2.stop(); rm2.stop();
@ -837,4 +913,27 @@ public class TestAMRestart {
rm1.stop(); rm1.stop();
} }
static void waitSchedulerApplicationAttemptStopped(
AbstractYarnScheduler ys,
ApplicationAttemptId attemptId) throws InterruptedException {
SchedulerApplicationAttempt schedulerApp =
ys.getApplicationAttempt(attemptId);
if (null == schedulerApp) {
return;
}
// Wait at most 5 secs to make sure SchedulerApplicationAttempt stopped
int tick = 0;
while (tick < 100) {
if (schedulerApp.isStopped()) {
return;
}
tick++;
Thread.sleep(50);
}
// Only print, don't throw exception
System.err.println("Failed to wait scheduler application attempt stopped.");
}
} }