Merge r1606407 from trunk. YARN-614. Changed ResourceManager to not count disk failure, node loss and RM restart towards app failures. Contributed by Xuan Gong

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1606408 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-06-28 23:41:51 +00:00
parent 28b82d46af
commit c63e3f027b
5 changed files with 171 additions and 32 deletions

View File

@ -180,6 +180,9 @@ Release 2.5.0 - UNRELEASED
YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when
AMs heartbeat in. (Jason Lowe via vinodkv)
YARN-614. Changed ResourceManager to not count disk failure, node loss and
RM restart towards app failures. (Xuan Gong via jianhe)
OPTIMIZATIONS
BUG FIXES

View File

@ -687,9 +687,10 @@ public class RMAppImpl implements RMApp, Recoverable {
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf,
// The newly created attempt maybe last attempt if (number of
// previously NonPreempted attempts + 1) equal to the max-attempt
// previously failed attempts(which should not include Preempted,
// hardware error and NM resync) + 1) equal to the max-attempt
// limit.
maxAppAttempts == (getNumNonPreemptedAppAttempts() + 1));
maxAppAttempts == (getNumFailedAppAttempts() + 1));
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
@ -797,7 +798,7 @@ public class RMAppImpl implements RMApp, Recoverable {
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
|| (app.currentAttempt.getState() == RMAppAttemptState.FAILED
&& app.getNumNonPreemptedAppAttempts() == app.maxAppAttempts))) {
&& app.getNumFailedAppAttempts() == app.maxAppAttempts))) {
return RMAppState.ACCEPTED;
}
@ -888,7 +889,7 @@ public class RMAppImpl implements RMApp, Recoverable {
msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
} else if (getNumNonPreemptedAppAttempts() >= this.maxAppAttempts) {
} else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
msg = "Application " + this.getApplicationId() + " failed "
+ this.maxAppAttempts + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application.";
@ -1105,11 +1106,12 @@ public class RMAppImpl implements RMApp, Recoverable {
};
}
private int getNumNonPreemptedAppAttempts() {
private int getNumFailedAppAttempts() {
int completedAttempts = 0;
// Do not count AM preemption as attempt failure.
// Do not count AM preemption, hardware failures or NM resync
// as attempt failure.
for (RMAppAttempt attempt : attempts.values()) {
if (!attempt.isPreempted()) {
if (attempt.shouldCountTowardsMaxAttemptRetry()) {
completedAttempts++;
}
}
@ -1129,7 +1131,7 @@ public class RMAppImpl implements RMApp, Recoverable {
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (!app.submissionContext.getUnmanagedAM()
&& app.getNumNonPreemptedAppAttempts() < app.maxAppAttempts) {
&& app.getNumFailedAppAttempts() < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt =

View File

@ -197,8 +197,14 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
ApplicationAttemptReport createApplicationAttemptReport();
/**
* Return the flag which indicates whether the attempt is preempted by the
* scheduler.
* Return the flag which indicates whether the attempt failure should be
* counted to attempt retry count.
* <ul>
* There failure types should not be counted to attempt retry count:
* <li>preempted by the scheduler.</li>
* <li>hardware failures, such as NM failing, lost NM and NM disk errors.</li>
* <li>killed by RM because of RM restart or failover.</li>
* </ul>
*/
boolean isPreempted();
boolean shouldCountTowardsMaxAttemptRetry();
}

View File

@ -149,9 +149,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private int amContainerExitStatus = ContainerExitStatus.INVALID;
private Configuration conf;
// Since AM preemption is not counted towards AM failure count,
// even if this flag is true, a new attempt can still be re-created if this
// attempt is eventually preempted. So this flag indicates that this may be
// Since AM preemption, hardware error and NM resync are not counted towards
// AM failure count, even if this flag is true, a new attempt can still be
// re-created if this attempt is eventually failed because of preemption,
// hardware error or NM resync. So this flag indicates that this may be
// last attempt.
private final boolean maybeLastAttempt;
private static final ExpiredTransition EXPIRED_TRANSITION =
@ -1087,12 +1088,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
.getKeepContainersAcrossApplicationAttempts()
&& !appAttempt.submissionContext.getUnmanagedAM()) {
// See if we should retain containers for non-unmanaged applications
if (appAttempt.isPreempted()) {
// Premption doesn't count towards app-failures and so we should
// retain containers.
if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) {
// Premption, hardware failures, NM resync doesn't count towards
// app-failures and so we should retain containers.
keepContainersAcrossAppAttempts = true;
} else if (!appAttempt.maybeLastAttempt) {
// Not preemption. Not last-attempt too - keep containers.
// Not preemption, hardware failures or NM resync.
// Not last-attempt too - keep containers.
keepContainersAcrossAppAttempts = true;
}
}
@ -1136,8 +1138,17 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
@Override
public boolean isPreempted() {
return getAMContainerExitStatus() == ContainerExitStatus.PREEMPTED;
public boolean shouldCountTowardsMaxAttemptRetry() {
try {
this.readLock.lock();
int exitStatus = getAMContainerExitStatus();
return !(exitStatus == ContainerExitStatus.PREEMPTED
|| exitStatus == ContainerExitStatus.ABORTED
|| exitStatus == ContainerExitStatus.DISKS_FAILED
|| exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
} finally {
this.readLock.unlock();
}
}
private static final class UnmanagedAMAttemptSavedTransition

View File

@ -19,13 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -347,15 +352,20 @@ public class TestAMRestart {
rm1.stop();
}
// AM container preempted should not be counted towards AM max retry count.
@Test(timeout = 20000)
public void testAMPreemptedNotCountedForAMFailures() throws Exception {
// AM container preempted, nm disk failure
// should not be counted towards AM max retry count.
@Test(timeout = 100000)
public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// explicitly set max-am-retry count as 1.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MockRM rm1 = new MockRM(conf);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
@ -371,8 +381,10 @@ public class TestAMRestart {
scheduler.killContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(attempt1.isPreempted());
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
ApplicationState appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
// AM should be restarted even though max-am-attempt is 1.
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
@ -384,20 +396,62 @@ public class TestAMRestart {
scheduler.killContainer(scheduler.getRMContainer(amContainer2));
am2.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(attempt2.isPreempted());
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
// fail the AM normally
nm1.nodeHeartbeat(am3.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
// mimic NM disk_failure
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
containerStatus.setContainerId(attempt3.getMasterContainer().getId());
containerStatus.setDiagnostics("mimic NM disk_failure");
containerStatus.setState(ContainerState.COMPLETE);
containerStatus.setExitStatus(ContainerExitStatus.DISKS_FAILED);
Map<ApplicationId, List<ContainerStatus>> conts =
new HashMap<ApplicationId, List<ContainerStatus>>();
conts.put(app1.getApplicationId(),
Collections.singletonList(containerStatus));
nm1.nodeHeartbeat(conts, true);
am3.waitForState(RMAppAttemptState.FAILED);
Assert.assertFalse(attempt3.isPreempted());
Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
appState.getAttempt(am3.getApplicationAttemptId())
.getAMContainerExitStatus());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
// create second NM, and register to rm1
MockNM nm2 =
new MockNM("127.0.0.1:2234", 8000, rm1.getResourceTrackerService());
nm2.registerNode();
// nm1 heartbeats to report unhealthy
// This will mimic ContainerExitStatus.ABORT
nm1.nodeHeartbeat(false);
am4.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry());
Assert.assertEquals(ContainerExitStatus.ABORTED,
appState.getAttempt(am4.getApplicationAttemptId())
.getAMContainerExitStatus());
// launch next AM in nm2
nm2.nodeHeartbeat(true);
MockAM am5 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt());
// fail the AM normally
nm2
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am5.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
// AM should not be restarted.
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
Assert.assertEquals(3, app1.getAppAttempts().size());
Assert.assertEquals(5, app1.getAppAttempts().size());
rm1.stop();
}
@ -433,7 +487,7 @@ public class TestAMRestart {
scheduler.killContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(attempt1.isPreempted());
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// state store has 1 attempt stored.
@ -457,11 +511,74 @@ public class TestAMRestart {
RMAppAttempt attempt2 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
.getCurrentAppAttempt();
Assert.assertFalse(attempt2.isPreempted());
Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry());
Assert.assertEquals(ContainerExitStatus.INVALID,
appState.getAttempt(am2.getApplicationAttemptId())
.getAMContainerExitStatus());
rm1.stop();
rm2.stop();
}
// Test regular RM restart/failover, new RM should not count
// AM failure towards the max-retry-account and should be able to
// re-launch the AM.
@Test(timeout = 50000)
public void testRMRestartOrFailoverNotCountedForAMFailures()
throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// explicitly set max-am-retry count as 1.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
// AM should be restarted even though max-am-attempt is 1.
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt());
// Restart rm.
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
ApplicationState appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
// re-register the NM
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
status
.setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
status.setContainerId(attempt1.getMasterContainer().getId());
status.setContainerState(ContainerState.COMPLETE);
status.setDiagnostics("");
nm1.registerNode(Collections.singletonList(status), null);
rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED);
Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
appState.getAttempt(am1.getApplicationAttemptId())
.getAMContainerExitStatus());
// Will automatically start a new AppAttempt in rm2
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am2 =
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
RMAppAttempt attempt3 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
.getCurrentAppAttempt();
Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
Assert.assertEquals(ContainerExitStatus.INVALID,
appState.getAttempt(am2.getApplicationAttemptId())
.getAMContainerExitStatus());
rm1.stop();
rm2.stop();
}
}