YARN-6153. KeepContainer does not work when AM retry window is set. Contributed by kyungwan nam

(cherry picked from commit 235203dffd)
This commit is contained in:
Jian He 2017-02-28 13:23:36 -08:00
parent 95bd3c3d55
commit c7ddf95d4a
7 changed files with 112 additions and 66 deletions

View File

@ -926,13 +926,7 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) {
}
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf,
// The newly created attempt maybe last attempt if (number of
// previously failed attempts(which should not include Preempted,
// hardware error and NM resync) + 1) equal to the max-attempt
// limit.
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
currentAMBlacklistManager);
submissionContext, conf, amReq, this, currentAMBlacklistManager);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
@ -1413,18 +1407,13 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
private int getNumFailedAppAttempts() {
public int getNumFailedAppAttempts() {
int completedAttempts = 0;
long endTime = this.systemClock.getTime();
// Do not count AM preemption, hardware failures or NM resync
// as attempt failure.
for (RMAppAttempt attempt : attempts.values()) {
if (attempt.shouldCountTowardsMaxAttemptRetry()) {
if (this.attemptFailuresValidityInterval <= 0
|| (attempt.getFinishTime() > endTime
- this.attemptFailuresValidityInterval)) {
completedAttempts++;
}
completedAttempts++;
}
}
return completedAttempts;

View File

@ -143,6 +143,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private final EventHandler eventHandler;
private final YarnScheduler scheduler;
private final ApplicationMasterService masterService;
private final RMApp rmApp;
private final ReadLock readLock;
private final WriteLock writeLock;
@ -179,12 +180,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private int amContainerExitStatus = ContainerExitStatus.INVALID;
private Configuration conf;
// Since AM preemption, hardware error and NM resync are not counted towards
// AM failure count, even if this flag is true, a new attempt can still be
// re-created if this attempt is eventually failed because of preemption,
// hardware error or NM resync. So this flag indicates that this may be
// last attempt.
private final boolean maybeLastAttempt;
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
private static final AttemptFailedTransition FAILED_TRANSITION =
@ -490,16 +485,16 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext,
Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq) {
Configuration conf, ResourceRequest amReq, RMApp rmApp) {
this(appAttemptId, rmContext, scheduler, masterService, submissionContext,
conf, maybeLastAttempt, amReq, new DisabledBlacklistManager());
conf, amReq, rmApp, new DisabledBlacklistManager());
}
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext,
Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq,
Configuration conf, ResourceRequest amReq, RMApp rmApp,
BlacklistManager amBlacklistManager) {
this.conf = conf;
this.applicationAttemptId = appAttemptId;
@ -514,7 +509,6 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
this.writeLock = lock.writeLock();
this.proxiedTrackingUrl = generateProxyUriWithScheme();
this.maybeLastAttempt = maybeLastAttempt;
this.stateMachine = stateMachineFactory.make(this);
this.attemptMetrics =
@ -531,6 +525,7 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
}
this.diagnostics = new BoundedAppender(diagnosticsLimitKC * 1024);
this.rmApp = rmApp;
}
private int getDiagnosticsLimitKCOrThrow(final Configuration configuration) {
@ -1217,8 +1212,7 @@ private static class AttemptRecoveredTransition
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
RMApp rmApp = appAttempt.rmContext.getRMApps().get(
appAttempt.getAppAttemptId().getApplicationId());
RMApp rmApp = appAttempt.rmApp;
/*
* If last attempt recovered final state is null .. it means attempt was
@ -1464,14 +1458,9 @@ public void transition(RMAppAttemptImpl appAttempt,
if (appAttempt.submissionContext
.getKeepContainersAcrossApplicationAttempts()
&& !appAttempt.submissionContext.getUnmanagedAM()) {
// See if we should retain containers for non-unmanaged applications
if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) {
// Premption, hardware failures, NM resync doesn't count towards
// app-failures and so we should retain containers.
keepContainersAcrossAppAttempts = true;
} else if (!appAttempt.maybeLastAttempt) {
// Not preemption, hardware failures or NM resync.
// Not last-attempt too - keep containers.
int numberOfFailure = ((RMAppImpl)appAttempt.rmApp)
.getNumFailedAppAttempts();
if (numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) {
keepContainersAcrossAppAttempts = true;
}
}
@ -1498,9 +1487,7 @@ public void transition(RMAppAttemptImpl appAttempt,
.applicationAttemptFinished(appAttempt, finalAttemptState);
appAttempt.rmContext.getSystemMetricsPublisher()
.appAttemptFinished(appAttempt, finalAttemptState,
appAttempt.rmContext.getRMApps().get(
appAttempt.applicationAttemptId.getApplicationId()),
System.currentTimeMillis());
appAttempt.rmApp, System.currentTimeMillis());
}
}
@ -1547,6 +1534,14 @@ public void transition(RMAppAttemptImpl appAttempt,
@Override
public boolean shouldCountTowardsMaxAttemptRetry() {
long attemptFailuresValidityInterval = this.submissionContext
.getAttemptFailuresValidityInterval();
long end = System.currentTimeMillis();
if (attemptFailuresValidityInterval > 0
&& this.getFinishTime() > 0
&& this.getFinishTime() < (end - attemptFailuresValidityInterval)) {
return false;
}
try {
this.readLock.lock();
int exitStatus = getAMContainerExitStatus();
@ -2225,11 +2220,6 @@ public ApplicationAttemptReport createApplicationAttemptReport() {
return attemptReport;
}
// for testing
public boolean mayBeLastAttempt() {
return maybeLastAttempt;
}
@Override
public RMAppAttemptMetrics getRMAppAttemptMetrics() {
// didn't use read/write lock here because RMAppAttemptMetrics has its own

View File

@ -618,15 +618,15 @@ public RMApp submitApp(int masterMemory, String name, String user,
false, null, 0, null, true, Priority.newInstance(0));
}
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
throws Exception {
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval,
boolean keepContainers) throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemorySize(masterMemory);
Priority priority = Priority.newInstance(0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, keepContainers,
false, null, attemptFailuresValidityInterval, null, true, priority);
}

View File

@ -390,7 +390,7 @@ public void handle(Event event) {
mock(ApplicationSubmissionContext.class);
YarnConfiguration config = new YarnConfiguration();
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
rmContext, yarnScheduler, null, asContext, config, false, null);
rmContext, yarnScheduler, null, asContext, config, null, null);
ApplicationResourceUsageReport report = rmAppAttemptImpl
.getApplicationResourceUsageReport();
assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
@ -1059,7 +1059,7 @@ public ApplicationReport createAndGetApplicationReport(
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(123456, 1), 1);
RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,
rmContext, yarnScheduler, null, asContext, config, false, null));
rmContext, yarnScheduler, null, asContext, config, null, app));
Container container = Container.newInstance(
ContainerId.newContainerId(attemptId, 1), null, "", null, null, null);
RMContainerImpl containerimpl = spy(new RMContainerImpl(container,

View File

@ -411,7 +411,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
MockAM am2 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
// Preempt the second attempt.
ContainerId amContainer2 =
@ -427,7 +426,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
MockAM am3 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
// mimic NM disk_failure
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
@ -454,7 +452,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
MockAM am4 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
// create second NM, and register to rm1
MockNM nm2 =
@ -475,7 +472,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
MockAM am5 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt());
// fail the AM normally
nm2
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
@ -584,7 +580,6 @@ public void testRMRestartOrFailoverNotCountedForAMFailures()
// AM should be restarted even though max-am-attempt is 1.
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt());
// Restart rm.
MockRM rm2 = new MockRM(conf, memStore);
@ -645,7 +640,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
// set window size to a larger number : 60s
// we will verify the app should be failed if
// two continuous attempts failed in 60s.
RMApp app = rm1.submitApp(200, 60000);
RMApp app = rm1.submitApp(200, 60000, false);
MockAM am = MockRM.launchAM(app, rm1, nm1);
// Fail current attempt normally
@ -655,8 +650,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
// launch the second attempt
rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(2, app.getAppAttempts().size());
Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt())
.mayBeLastAttempt());
MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
rm1.waitForState(am_2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
nm1.nodeHeartbeat(am_2.getApplicationAttemptId(),
@ -667,7 +661,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
ControlledClock clock = new ControlledClock();
// set window size to 10s
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000);
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false);
app1.setSystemClock(clock);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
@ -684,7 +678,6 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
Assert.assertEquals(2, app1.getAppAttempts().size());
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
@ -863,4 +856,75 @@ public void testAMRestartNotLostContainerCompleteMsg() throws Exception {
rm1.stop();
}
// Test restarting AM launched with the KeepContainers and AM reset window.
// after AM reset window, even if AM who was the last is failed,
// all containers are launched by previous AM should be kept.
@Test (timeout = 20000)
public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval()
throws Exception {
YarnConfiguration conf = new YarnConfiguration();
// explicitly set max-am-retry count as 2.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
// set window size to 10s and enable keepContainers
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, true);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 2;
allocateContainers(nm1, am1, NUM_CONTAINERS);
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// Fail attempt1 normally
nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
// launch the second attempt
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(2, app1.getAppAttempts().size());
// It will be the last attempt.
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
// wait for 10 seconds to reset AM failure count
Thread.sleep(10 * 1000);
// Fail attempt2 normally
nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
// can launch the third attempt successfully
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(3, app1.getAppAttempts().size());
MockAM am3 = rm1.launchAM(app1, rm1, nm1);
RegisterApplicationMasterResponse registerResponse =
am3.registerAppAttempt();
// keepContainers is applied, even if attempt2 was the last attempt.
Assert.assertEquals(1, registerResponse.getContainersFromPreviousAttempts()
.size());
boolean containerId2Exists = false;
Container container = registerResponse.getContainersFromPreviousAttempts().get(0);
if (container.getId().equals(containerId2)) {
containerId2Exists = true;
}
Assert.assertTrue(containerId2Exists);
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
rm1.stop();
}
}

View File

@ -106,6 +106,6 @@ private RMAppAttemptImpl createRMAppAttemptImpl(
when(mockRMContext.getDispatcher()).thenReturn(mockDispatcher);
return new RMAppAttemptImpl(mockApplicationAttemptId, mockRMContext, null,
null, null, configuration, false, null);
null, null, configuration, null, null);
}
}

View File

@ -327,10 +327,10 @@ public void setUp() throws Exception {
application = mock(RMAppImpl.class);
applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, spyRMContext, scheduler,
masterService, submissionContext, new Configuration(), false,
masterService, submissionContext, new Configuration(),
BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1));
submissionContext.getResource(), 1), application);
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
when(application.getApplicationId()).thenReturn(applicationId);
@ -1107,10 +1107,10 @@ public void testLaunchedFailWhileAHSEnabled() {
RMAppAttempt myApplicationAttempt =
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
spyRMContext, scheduler,masterService,
submissionContext, myConf, false,
submissionContext, myConf,
BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1));
submissionContext.getResource(), 1), application);
//submit, schedule and allocate app attempt
myApplicationAttempt.handle(
@ -1536,6 +1536,9 @@ public void testFailedToFailed() {
// create a failed attempt.
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
.thenReturn(true);
when(application.getMaxAppAttempts()).thenReturn(2);
when(application.getNumFailedAppAttempts()).thenReturn(1);
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
@ -1581,9 +1584,9 @@ public void testContainersCleanupForLastAttempt() {
applicationAttempt =
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext,
scheduler, masterService, submissionContext, new Configuration(),
true, BuilderUtils.newResourceRequest(
BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1));
submissionContext.getResource(), 1), application);
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
.thenReturn(true);
when(submissionContext.getMaxAppAttempts()).thenReturn(1);
@ -1642,9 +1645,9 @@ public Allocation answer(InvocationOnMock invocation)
applicationAttempt =
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
spyRMContext, scheduler, masterService, submissionContext,
new Configuration(), true, ResourceRequest.newInstance(
new Configuration(), ResourceRequest.newInstance(
Priority.UNDEFINED, "host1", Resource.newInstance(3333, 1), 3,
false, "label-expression"));
false, "label-expression"), application);
new RMAppAttemptImpl.ScheduleTransition().transition(
(RMAppAttemptImpl) applicationAttempt, null);
}