From 0e167fde552d68024e153ed0a7c365be34b3f007 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 8 Apr 2019 13:44:33 +0800 Subject: [PATCH] YARN-9413. Queue resource leak after app fail for CapacityScheduler. Contributed by Tao Yang. --- .../rmapp/attempt/RMAppAttemptImpl.java | 3 +- .../applicationsmanager/TestAMRestart.java | 104 +++++++++++++++--- 2 files changed, 92 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 5998592986f..82ada61fe49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1453,7 +1453,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { && !appAttempt.submissionContext.getUnmanagedAM()) { int numberOfFailure = ((RMAppImpl)appAttempt.rmApp) .getNumFailedAppAttempts(); - if (numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) { + if (appAttempt.rmApp.getMaxAppAttempts() > 1 + && numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) { keepContainersAcrossAppAttempts = true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index c43069bac08..329e676318c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -51,8 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -369,8 +372,10 @@ public class TestAMRestart { rm1.stop(); } - // AM container preempted, nm disk failure - // should not be counted towards AM max retry count. + /** + * AM container preempted, nm disk failure + * should not be counted towards AM max retry count. + */ @Test(timeout = 100000) public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { YarnConfiguration conf = new YarnConfiguration(); @@ -396,7 +401,7 @@ public class TestAMRestart { TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, am1.getApplicationAttemptId()); - Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); + Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); ApplicationStateData appState = ((MemoryRMStateStore) rm1.getRMStateStore()).getState() @@ -416,7 +421,7 @@ public class TestAMRestart { TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, am2.getApplicationAttemptId()); - Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); + Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); MockAM am3 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); @@ -438,7 +443,7 @@ public class TestAMRestart { TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, am3.getApplicationAttemptId()); - Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry()); + Assert.assertFalse(attempt3.shouldCountTowardsMaxAttemptRetry()); Assert.assertEquals(ContainerExitStatus.DISKS_FAILED, appState.getAttempt(am3.getApplicationAttemptId()) .getAMContainerExitStatus()); @@ -527,9 +532,11 @@ public class TestAMRestart { rm1.stop(); } - // Test RM restarts after AM container is preempted, new RM should not count - // AM preemption failure towards the max-retry-account and should be able to - // re-launch the AM. + /** + * Test RM restarts after AM container is preempted, new RM should not count + * AM preemption failure towards the max-retry-account and should be able to + * re-launch the AM. + */ @Test(timeout = 60000) public void testPreemptedAMRestartOnRMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); @@ -604,9 +611,11 @@ public class TestAMRestart { rm2.stop(); } - // Test regular RM restart/failover, new RM should not count - // AM failure towards the max-retry-account and should be able to - // re-launch the AM. + /** + * Test regular RM restart/failover, new RM should not count + * AM failure towards the max-retry-account and should be able to + * re-launch the AM. + */ @Test(timeout = 50000) public void testRMRestartOrFailoverNotCountedForAMFailures() throws Exception { @@ -923,9 +932,11 @@ public class TestAMRestart { rm1.stop(); } - // Test restarting AM launched with the KeepContainers and AM reset window. - // after AM reset window, even if AM who was the last is failed, - // all containers are launched by previous AM should be kept. + /** + * Test restarting AM launched with the KeepContainers and AM reset window. + * after AM reset window, even if AM who was the last is failed, + * all containers are launched by previous AM should be kept. + */ @Test (timeout = 20000) public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval() throws Exception { @@ -993,4 +1004,69 @@ public class TestAMRestart { rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); rm1.stop(); } + + /** + * Test to verify that there is no queue resource leak after app fail. + * + * 1. Submit an app which is configured to keep containers across app + * attempts and should fail after AM finished (am-max-attempts=1). + * 2. App is started with 2 containers running on NM1 node. + * 3. Preempt the AM of the application which should not count towards max + * attempt retry but app will fail immediately. + * 4. Verify that the used resource of queue should be cleaned up normally + * after app fail. + */ + @Test(timeout = 30000) + public void testQueueResourceDoesNotLeakForCapacityScheduler() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200, 0, true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + allocateContainers(nm1, am1, 1); + + // launch the 2nd container, for testing running container transferred. + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + // Preempt AM container + scheduler.killContainer(scheduler.getRMContainer(amContainer)); + + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, + am1.getApplicationAttemptId()); + + Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry()); + + // AM should not be restarted. + rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); + + // After app1 failed, used resource of this queue should + // be cleaned up, otherwise resource leak happened. + LeafQueue queue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + Assert.assertEquals(0, + queue.getQueueResourceUsage().getUsed().getMemorySize()); + Assert.assertEquals(0, + queue.getQueueResourceUsage().getUsed().getVirtualCores()); + + rm1.stop(); + } }