YARN-5617. AMs only intended to run one attempt can be run more than once? Contributed by Jason Lowe.
This commit is contained in:
parent
561718e05d
commit
52adf71914
|
@ -1517,14 +1517,22 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
@Override
|
@Override
|
||||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||||
int numberOfFailure = app.getNumFailedAppAttempts();
|
int numberOfFailure = app.getNumFailedAppAttempts();
|
||||||
LOG.info("The number of failed attempts"
|
if (app.maxAppAttempts == 1) {
|
||||||
+ (app.attemptFailuresValidityInterval > 0 ? " in previous "
|
// If the user explicitly set the attempts to 1 then there are likely
|
||||||
+ app.attemptFailuresValidityInterval + " milliseconds " : " ")
|
// correctness issues if the AM restarts for any reason.
|
||||||
+ "is " + numberOfFailure + ". The max attempts is "
|
LOG.info("Max app attempts is 1 for " + app.applicationId
|
||||||
+ app.maxAppAttempts);
|
+ ", preventing further attempts.");
|
||||||
|
numberOfFailure = app.maxAppAttempts;
|
||||||
|
} else {
|
||||||
|
LOG.info("The number of failed attempts"
|
||||||
|
+ (app.attemptFailuresValidityInterval > 0 ? " in previous "
|
||||||
|
+ app.attemptFailuresValidityInterval + " milliseconds " : " ")
|
||||||
|
+ "is " + numberOfFailure + ". The max attempts is "
|
||||||
|
+ app.maxAppAttempts);
|
||||||
|
|
||||||
if (app.attemptFailuresValidityInterval > 0) {
|
if (app.attemptFailuresValidityInterval > 0) {
|
||||||
removeExcessAttempts(app);
|
removeExcessAttempts(app);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!app.submissionContext.getUnmanagedAM()
|
if (!app.submissionContext.getUnmanagedAM()
|
||||||
|
|
|
@ -378,8 +378,7 @@ public class TestAMRestart {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
ResourceScheduler.class);
|
ResourceScheduler.class);
|
||||||
// explicitly set max-am-retry count as 1.
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
||||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
@ -396,7 +395,7 @@ public class TestAMRestart {
|
||||||
(CapacityScheduler) rm1.getResourceScheduler();
|
(CapacityScheduler) rm1.getResourceScheduler();
|
||||||
ContainerId amContainer =
|
ContainerId amContainer =
|
||||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||||
// Preempt the first attempt;
|
// Preempt the next attempt;
|
||||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
@ -464,7 +463,7 @@ public class TestAMRestart {
|
||||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
am4.getApplicationAttemptId());
|
am4.getApplicationAttemptId());
|
||||||
|
|
||||||
Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertFalse(attempt4.shouldCountTowardsMaxAttemptRetry());
|
||||||
Assert.assertEquals(ContainerExitStatus.ABORTED,
|
Assert.assertEquals(ContainerExitStatus.ABORTED,
|
||||||
appState.getAttempt(am4.getApplicationAttemptId())
|
appState.getAttempt(am4.getApplicationAttemptId())
|
||||||
.getAMContainerExitStatus());
|
.getAMContainerExitStatus());
|
||||||
|
@ -481,9 +480,58 @@ public class TestAMRestart {
|
||||||
|
|
||||||
Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
|
||||||
|
|
||||||
|
// launch next AM in nm2
|
||||||
|
MockAM am6 =
|
||||||
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 6, nm2);
|
||||||
|
RMAppAttempt attempt6 = app1.getCurrentAppAttempt();
|
||||||
|
|
||||||
|
// fail the AM normally
|
||||||
|
nm2
|
||||||
|
.nodeHeartbeat(am6.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(am6.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am6.getApplicationAttemptId());
|
||||||
|
|
||||||
|
Assert.assertTrue(attempt6.shouldCountTowardsMaxAttemptRetry());
|
||||||
|
|
||||||
// AM should not be restarted.
|
// AM should not be restarted.
|
||||||
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||||
Assert.assertEquals(5, app1.getAppAttempts().size());
|
Assert.assertEquals(6, app1.getAppAttempts().size());
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 100000)
|
||||||
|
public void testMaxAttemptOneMeansOne() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
RMApp app1 = rm1.submitApp(200);
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
CapacityScheduler scheduler =
|
||||||
|
(CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
ContainerId amContainer =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||||
|
// Preempt the attempt;
|
||||||
|
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am1.getApplicationAttemptId());
|
||||||
|
|
||||||
|
// AM should not be restarted.
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||||
|
Assert.assertEquals(1, app1.getAppAttempts().size());
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -499,8 +547,7 @@ public class TestAMRestart {
|
||||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
||||||
|
|
||||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
// explicitly set max-am-retry count as 1.
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
|
||||||
|
@ -517,21 +564,36 @@ public class TestAMRestart {
|
||||||
ContainerId amContainer =
|
ContainerId amContainer =
|
||||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||||
|
|
||||||
|
// fail the AM normally
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1,
|
||||||
|
ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am1.getApplicationAttemptId());
|
||||||
|
Assert.assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||||
|
|
||||||
|
// wait for the next AM to start
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
MockAM am2 =
|
||||||
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
||||||
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||||
|
|
||||||
// Forcibly preempt the am container;
|
// Forcibly preempt the am container;
|
||||||
|
amContainer = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
|
||||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
|
||||||
// state store has 1 attempt stored.
|
// state store has 2 attempts stored.
|
||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
||||||
Assert.assertEquals(1, appState.getAttemptCount());
|
Assert.assertEquals(2, appState.getAttemptCount());
|
||||||
// attempt stored has the preempted container exit status.
|
// attempt stored has the preempted container exit status.
|
||||||
Assert.assertEquals(ContainerExitStatus.PREEMPTED,
|
Assert.assertEquals(ContainerExitStatus.PREEMPTED,
|
||||||
appState.getAttempt(am1.getApplicationAttemptId())
|
appState.getAttempt(am2.getApplicationAttemptId())
|
||||||
.getAMContainerExitStatus());
|
.getAMContainerExitStatus());
|
||||||
// Restart rm.
|
// Restart rm.
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
@ -539,16 +601,16 @@ public class TestAMRestart {
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
// Restarted RM should re-launch the am.
|
// Restarted RM should re-launch the am.
|
||||||
MockAM am2 =
|
MockAM am3 =
|
||||||
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
||||||
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
|
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am3);
|
||||||
RMAppAttempt attempt2 =
|
RMAppAttempt attempt3 =
|
||||||
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
||||||
.getCurrentAppAttempt();
|
.getCurrentAppAttempt();
|
||||||
Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
|
||||||
Assert.assertEquals(ContainerExitStatus.INVALID,
|
Assert.assertEquals(ContainerExitStatus.INVALID,
|
||||||
appState.getAttempt(am2.getApplicationAttemptId())
|
appState.getAttempt(am3.getApplicationAttemptId())
|
||||||
.getAMContainerExitStatus());
|
.getAMContainerExitStatus());
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
@ -566,13 +628,15 @@ public class TestAMRestart {
|
||||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
||||||
|
|
||||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
// explicitly set max-am-retry count as 1.
|
// explicitly set max-am-retry count as 2.
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
CapacityScheduler scheduler =
|
||||||
|
(CapacityScheduler) rm1.getResourceScheduler();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
|
@ -581,6 +645,20 @@ public class TestAMRestart {
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
|
||||||
|
// fail the AM normally
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1,
|
||||||
|
ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am1.getApplicationAttemptId());
|
||||||
|
Assert.assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||||
|
|
||||||
|
// wait for the next AM to start
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
MockAM am2 =
|
||||||
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
||||||
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||||
|
|
||||||
// Restart rm.
|
// Restart rm.
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
@ -591,27 +669,27 @@ public class TestAMRestart {
|
||||||
NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
|
NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
|
||||||
status
|
status
|
||||||
.setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
|
.setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
|
||||||
status.setContainerId(attempt1.getMasterContainer().getId());
|
status.setContainerId(attempt2.getMasterContainer().getId());
|
||||||
status.setContainerState(ContainerState.COMPLETE);
|
status.setContainerState(ContainerState.COMPLETE);
|
||||||
status.setDiagnostics("");
|
status.setDiagnostics("");
|
||||||
nm1.registerNode(Collections.singletonList(status), null);
|
nm1.registerNode(Collections.singletonList(status), null);
|
||||||
|
|
||||||
rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED);
|
rm2.waitForState(attempt2.getAppAttemptId(), RMAppAttemptState.FAILED);
|
||||||
Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
||||||
appState.getAttempt(am1.getApplicationAttemptId())
|
appState.getAttempt(am2.getApplicationAttemptId())
|
||||||
.getAMContainerExitStatus());
|
.getAMContainerExitStatus());
|
||||||
// Will automatically start a new AppAttempt in rm2
|
// Will automatically start a new AppAttempt in rm2
|
||||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
MockAM am2 =
|
MockAM am3 =
|
||||||
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
|
||||||
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
|
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am3);
|
||||||
RMAppAttempt attempt3 =
|
RMAppAttempt attempt3 =
|
||||||
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
||||||
.getCurrentAppAttempt();
|
.getCurrentAppAttempt();
|
||||||
Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
|
||||||
Assert.assertEquals(ContainerExitStatus.INVALID,
|
Assert.assertEquals(ContainerExitStatus.INVALID,
|
||||||
appState.getAttempt(am2.getApplicationAttemptId())
|
appState.getAttempt(am3.getApplicationAttemptId())
|
||||||
.getAMContainerExitStatus());
|
.getAMContainerExitStatus());
|
||||||
|
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
|
|
Loading…
Reference in New Issue