From ec143cbf678bd65f87fdd464c23022a2d2c54c07 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Sat, 6 Apr 2019 19:59:36 +0800 Subject: [PATCH] YARN-9413. Queue resource leak after app fail for CapacityScheduler. Contributed by Tao Yang. --- .../rmapp/attempt/RMAppAttemptImpl.java | 3 +- .../applicationsmanager/TestAMRestart.java | 145 ++++++++++++++---- 2 files changed, 116 insertions(+), 32 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 73c0b6cacee..bcc52e2cd22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1502,7 +1502,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { && !appAttempt.submissionContext.getUnmanagedAM()) { int numberOfFailure = ((RMAppImpl)appAttempt.rmApp) .getNumFailedAppAttempts(); - if (numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) { + if (appAttempt.rmApp.getMaxAppAttempts() > 1 + && numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) { keepContainersAcrossAppAttempts = true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 9f122cb34bd..0083f40a860 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -60,6 +60,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -381,8 +385,10 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { rm1.stop(); } - // AM container preempted, nm disk failure - // should not be counted towards AM max retry count. + /** + * AM container preempted, nm disk failure + * should not be counted towards AM max retry count. + */ @Test(timeout = 100000) public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -408,7 +414,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, am1.getApplicationAttemptId()); - Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); + Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); ApplicationStateData appState = ((MemoryRMStateStore) rm1.getRMStateStore()).getState() @@ -428,7 +434,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, am2.getApplicationAttemptId()); - Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); + Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); MockAM am3 = rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); @@ -450,7 +456,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, am3.getApplicationAttemptId()); - Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry()); + Assert.assertFalse(attempt3.shouldCountTowardsMaxAttemptRetry()); Assert.assertEquals(ContainerExitStatus.DISKS_FAILED, appState.getAttempt(am3.getApplicationAttemptId()) .getAMContainerExitStatus()); @@ -539,9 +545,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { rm1.stop(); } - // Test RM restarts after AM container is preempted, new RM should not count - // AM preemption failure towards the max-retry-account and should be able to - // re-launch the AM. + /** + * Test RM restarts after AM container is preempted, new RM should not count + * AM preemption failure towards the max-retry-account and should be able to + * re-launch the AM. + */ @Test(timeout = 60000) public void testPreemptedAMRestartOnRMRestart() throws Exception { getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); @@ -624,9 +632,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { rm2.stop(); } - // Test regular RM restart/failover, new RM should not count - // AM failure towards the max-retry-account and should be able to - // re-launch the AM. + /** + * Test regular RM restart/failover, new RM should not count + * AM failure towards the max-retry-account and should be able to + * re-launch the AM. + */ @Test(timeout = 50000) public void testRMRestartOrFailoverNotCountedForAMFailures() throws Exception { @@ -944,9 +954,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { rm1.stop(); } - // Test restarting AM launched with the KeepContainers and AM reset window. - // after AM reset window, even if AM who was the last is failed, - // all containers are launched by previous AM should be kept. + /** + * Test restarting AM launched with the KeepContainers and AM reset window. + * after AM reset window, even if AM who was the last is failed, + * all containers are launched by previous AM should be kept. + */ @Test (timeout = 20000) public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval() throws Exception { @@ -1014,23 +1026,25 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { rm1.stop(); } - // Test to verify that the containers of previous attempt are returned in - // the RM response to the heartbeat of AM if these containers were not - // recovered by the time AM registered. - // - // 1. App is started with 2 containers running on 2 different nodes- - // container 2 on the NM1 node and container 3 on the NM2 node. - // 2. Fail the AM of the application. - // 3. Simulate RM restart. - // 4. NM1 connects to the restarted RM immediately. It sends the RM the status - // of container 2. - // 5. 2nd attempt of the app is launched and the app master registers with RM. - // 6. Verify that app master receives container 2 in the RM response to - // register request. - // 7. NM2 connects to the RM after a delay. It sends the RM the status of - // container 3. - // 8. Verify that the app master receives container 3 in the RM response to - // its heartbeat. + /** + * Test to verify that the containers of previous attempt are returned in + * the RM response to the heartbeat of AM if these containers were not + * recovered by the time AM registered. + * + * 1. App is started with 2 containers running on 2 different nodes- + * container 2 on the NM1 node and container 3 on the NM2 node. + * 2. Fail the AM of the application. + * 3. Simulate RM restart. + * 4. NM1 connects to the restarted RM immediately. It sends the RM the status + * of container 2. + * 5. 2nd attempt of the app is launched and the app master registers with RM. + * 6. Verify that app master receives container 2 in the RM response to + * register request. + * 7. NM2 connects to the RM after a delay. It sends the RM the status of + * container 3. + * 8. Verify that the app master receives container 3 in the RM response to + * its heartbeat. + */ @Test(timeout = 200000) public void testContainersFromPreviousAttemptsWithRMRestart() throws Exception { @@ -1167,4 +1181,73 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { rm2.stop(); rm1.stop(); } + + /** + * Test to verify that there is no queue resource leak after app fail. + * + * 1. Submit an app which is configured to keep containers across app + * attempts and should fail after AM finished (am-max-attempts=1). + * 2. App is started with 2 containers running on NM1 node. + * 3. Preempt the AM of the application which should not count towards max + * attempt retry but app will fail immediately. + * 4. Verify that the used resource of queue should be cleaned up normally + * after app fail. + */ + @Test(timeout = 30000) + public void testQueueResourceDoesNotLeak() throws Exception { + getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + getConf() + .set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MockRM rm1 = new MockRM(getConf()); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200, 0, true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + allocateContainers(nm1, am1, 1); + + // launch the 2nd container, for testing running container transferred. + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + // Preempt AM container + scheduler.killContainer(scheduler.getRMContainer(amContainer)); + + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, + am1.getApplicationAttemptId()); + + Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry()); + + // AM should not be restarted. + rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); + + // After app1 failed, used resource of this queue should + // be cleaned up, otherwise resource leak happened. + if (getSchedulerType() == SchedulerType.CAPACITY) { + LeafQueue queue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + Assert.assertEquals(0, + queue.getQueueResourceUsage().getUsed().getMemorySize()); + Assert.assertEquals(0, + queue.getQueueResourceUsage().getUsed().getVirtualCores()); + } else if (getSchedulerType() == SchedulerType.FAIR) { + FSLeafQueue queue = ((FairScheduler) scheduler).getQueueManager() + .getLeafQueue("root.default", false); + Assert.assertEquals(0, queue.getResourceUsage().getMemorySize()); + Assert.assertEquals(0, queue.getResourceUsage().getVirtualCores()); + } + + rm1.stop(); + } }