From c63e3f027bfe01e421fb692f6a86bcf1a4cecc5f Mon Sep 17 00:00:00 2001 From: Jian He Date: Sat, 28 Jun 2014 23:41:51 +0000 Subject: [PATCH] Merge r1606407 from trunk. YARN-614. Changed ResourceManager to not count disk failure, node loss and RM restart towards app failures. Contributed by Xuan Gong git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1606408 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/rmapp/RMAppImpl.java | 18 ++- .../rmapp/attempt/RMAppAttempt.java | 12 +- .../rmapp/attempt/RMAppAttemptImpl.java | 29 ++-- .../applicationsmanager/TestAMRestart.java | 141 ++++++++++++++++-- 5 files changed, 171 insertions(+), 32 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5fa27454265..670355e9480 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -180,6 +180,9 @@ Release 2.5.0 - UNRELEASED YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when AMs heartbeat in. (Jason Lowe via vinodkv) + YARN-614. Changed ResourceManager to not count disk failure, node loss and + RM restart towards app failures. (Xuan Gong via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index b6ca684a739..bff41cfc219 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -687,9 +687,10 @@ public class RMAppImpl implements RMApp, Recoverable { new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, submissionContext, conf, // The newly created attempt maybe last attempt if (number of - // previously NonPreempted attempts + 1) equal to the max-attempt + // previously failed attempts(which should not include Preempted, + // hardware error and NM resync) + 1) equal to the max-attempt // limit. - maxAppAttempts == (getNumNonPreemptedAppAttempts() + 1)); + maxAppAttempts == (getNumFailedAppAttempts() + 1)); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } @@ -797,7 +798,7 @@ public class RMAppImpl implements RMApp, Recoverable { && (app.currentAttempt.getState() == RMAppAttemptState.KILLED || app.currentAttempt.getState() == RMAppAttemptState.FINISHED || (app.currentAttempt.getState() == RMAppAttemptState.FAILED - && app.getNumNonPreemptedAppAttempts() == app.maxAppAttempts))) { + && app.getNumFailedAppAttempts() == app.maxAppAttempts))) { return RMAppState.ACCEPTED; } @@ -888,7 +889,7 @@ public class RMAppImpl implements RMApp, Recoverable { msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (getNumNonPreemptedAppAttempts() >= this.maxAppAttempts) { + } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -1105,11 +1106,12 @@ public class RMAppImpl implements RMApp, Recoverable { }; } - private int getNumNonPreemptedAppAttempts() { + private int getNumFailedAppAttempts() { int completedAttempts = 0; - // Do not count AM preemption as attempt failure. + // Do not count AM preemption, hardware failures or NM resync + // as attempt failure. for (RMAppAttempt attempt : attempts.values()) { - if (!attempt.isPreempted()) { + if (attempt.shouldCountTowardsMaxAttemptRetry()) { completedAttempts++; } } @@ -1129,7 +1131,7 @@ public class RMAppImpl implements RMApp, Recoverable { public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (!app.submissionContext.getUnmanagedAM() - && app.getNumNonPreemptedAppAttempts() < app.maxAppAttempts) { + && app.getNumFailedAppAttempts() < app.maxAppAttempts) { boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 42c37a93aba..68a068b9811 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -197,8 +197,14 @@ public interface RMAppAttempt extends EventHandler { ApplicationAttemptReport createApplicationAttemptReport(); /** - * Return the flag which indicates whether the attempt is preempted by the - * scheduler. + * Return the flag which indicates whether the attempt failure should be + * counted to attempt retry count. + * */ - boolean isPreempted(); + boolean shouldCountTowardsMaxAttemptRetry(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1e7693f5b71..ef572d42daa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -149,9 +149,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private int amContainerExitStatus = ContainerExitStatus.INVALID; private Configuration conf; - // Since AM preemption is not counted towards AM failure count, - // even if this flag is true, a new attempt can still be re-created if this - // attempt is eventually preempted. So this flag indicates that this may be + // Since AM preemption, hardware error and NM resync are not counted towards + // AM failure count, even if this flag is true, a new attempt can still be + // re-created if this attempt is eventually failed because of preemption, + // hardware error or NM resync. So this flag indicates that this may be // last attempt. private final boolean maybeLastAttempt; private static final ExpiredTransition EXPIRED_TRANSITION = @@ -1087,12 +1088,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { .getKeepContainersAcrossApplicationAttempts() && !appAttempt.submissionContext.getUnmanagedAM()) { // See if we should retain containers for non-unmanaged applications - if (appAttempt.isPreempted()) { - // Premption doesn't count towards app-failures and so we should - // retain containers. + if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) { + // Premption, hardware failures, NM resync doesn't count towards + // app-failures and so we should retain containers. keepContainersAcrossAppAttempts = true; } else if (!appAttempt.maybeLastAttempt) { - // Not preemption. Not last-attempt too - keep containers. + // Not preemption, hardware failures or NM resync. + // Not last-attempt too - keep containers. keepContainersAcrossAppAttempts = true; } } @@ -1136,8 +1138,17 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } @Override - public boolean isPreempted() { - return getAMContainerExitStatus() == ContainerExitStatus.PREEMPTED; + public boolean shouldCountTowardsMaxAttemptRetry() { + try { + this.readLock.lock(); + int exitStatus = getAMContainerExitStatus(); + return !(exitStatus == ContainerExitStatus.PREEMPTED + || exitStatus == ContainerExitStatus.ABORTED + || exitStatus == ContainerExitStatus.DISKS_FAILED + || exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + } finally { + this.readLock.unlock(); + } } private static final class UnmanagedAMAttemptSavedTransition diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 4145b6411d8..de0987808e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -19,13 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -347,15 +352,20 @@ public class TestAMRestart { rm1.stop(); } - // AM container preempted should not be counted towards AM max retry count. - @Test(timeout = 20000) - public void testAMPreemptedNotCountedForAMFailures() throws Exception { + // AM container preempted, nm disk failure + // should not be counted towards AM max retry count. + @Test(timeout = 100000) + public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); // explicitly set max-am-retry count as 1. conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); - MockRM rm1 = new MockRM(conf); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); @@ -371,8 +381,10 @@ public class TestAMRestart { scheduler.killContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); - Assert.assertTrue(attempt1.isPreempted()); + Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + ApplicationState appState = + memStore.getState().getApplicationState().get(app1.getApplicationId()); // AM should be restarted even though max-am-attempt is 1. MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); @@ -384,20 +396,62 @@ public class TestAMRestart { scheduler.killContainer(scheduler.getRMContainer(amContainer2)); am2.waitForState(RMAppAttemptState.FAILED); - Assert.assertTrue(attempt2.isPreempted()); + Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1); RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt()); - // fail the AM normally - nm1.nodeHeartbeat(am3.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + // mimic NM disk_failure + ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); + containerStatus.setContainerId(attempt3.getMasterContainer().getId()); + containerStatus.setDiagnostics("mimic NM disk_failure"); + containerStatus.setState(ContainerState.COMPLETE); + containerStatus.setExitStatus(ContainerExitStatus.DISKS_FAILED); + Map> conts = + new HashMap>(); + conts.put(app1.getApplicationId(), + Collections.singletonList(containerStatus)); + nm1.nodeHeartbeat(conts, true); + am3.waitForState(RMAppAttemptState.FAILED); - Assert.assertFalse(attempt3.isPreempted()); + Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.DISKS_FAILED, + appState.getAttempt(am3.getApplicationAttemptId()) + .getAMContainerExitStatus()); + + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + RMAppAttempt attempt4 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); + + // create second NM, and register to rm1 + MockNM nm2 = + new MockNM("127.0.0.1:2234", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + // nm1 heartbeats to report unhealthy + // This will mimic ContainerExitStatus.ABORT + nm1.nodeHeartbeat(false); + am4.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.ABORTED, + appState.getAttempt(am4.getApplicationAttemptId()) + .getAMContainerExitStatus()); + // launch next AM in nm2 + nm2.nodeHeartbeat(true); + MockAM am5 = + rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2); + RMAppAttempt attempt5 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt()); + // fail the AM normally + nm2 + .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am5.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry()); // AM should not be restarted. rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); - Assert.assertEquals(3, app1.getAppAttempts().size()); + Assert.assertEquals(5, app1.getAppAttempts().size()); rm1.stop(); } @@ -433,7 +487,7 @@ public class TestAMRestart { scheduler.killContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); - Assert.assertTrue(attempt1.isPreempted()); + Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); // state store has 1 attempt stored. @@ -457,11 +511,74 @@ public class TestAMRestart { RMAppAttempt attempt2 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()) .getCurrentAppAttempt(); - Assert.assertFalse(attempt2.isPreempted()); + Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry()); Assert.assertEquals(ContainerExitStatus.INVALID, appState.getAttempt(am2.getApplicationAttemptId()) .getAMContainerExitStatus()); rm1.stop(); rm2.stop(); } + + // Test regular RM restart/failover, new RM should not count + // AM failure towards the max-retry-account and should be able to + // re-launch the AM. + @Test(timeout = 50000) + public void testRMRestartOrFailoverNotCountedForAMFailures() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + // AM should be restarted even though max-am-attempt is 1. + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt()); + + // Restart rm. + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + ApplicationState appState = + memStore.getState().getApplicationState().get(app1.getApplicationId()); + // re-register the NM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus status = Records.newRecord(NMContainerStatus.class); + status + .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + status.setContainerId(attempt1.getMasterContainer().getId()); + status.setContainerState(ContainerState.COMPLETE); + status.setDiagnostics(""); + nm1.registerNode(Collections.singletonList(status), null); + + rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED); + Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, + appState.getAttempt(am1.getApplicationAttemptId()) + .getAMContainerExitStatus()); + // Will automatically start a new AppAttempt in rm2 + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am2 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2); + RMAppAttempt attempt3 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()) + .getCurrentAppAttempt(); + Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.INVALID, + appState.getAttempt(am2.getApplicationAttemptId()) + .getAMContainerExitStatus()); + + rm1.stop(); + rm2.stop(); + } }