YARN-9413. Queue resource leak after app fail for CapacityScheduler. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2019-04-08 13:44:33 +08:00
parent f824f4dccb
commit 0e167fde55
2 changed files with 92 additions and 15 deletions

View File

@ -1453,7 +1453,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
&& !appAttempt.submissionContext.getUnmanagedAM()) { && !appAttempt.submissionContext.getUnmanagedAM()) {
int numberOfFailure = ((RMAppImpl)appAttempt.rmApp) int numberOfFailure = ((RMAppImpl)appAttempt.rmApp)
.getNumFailedAppAttempts(); .getNumFailedAppAttempts();
if (numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) { if (appAttempt.rmApp.getMaxAppAttempts() > 1
&& numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) {
keepContainersAcrossAppAttempts = true; keepContainersAcrossAppAttempts = true;
} }
} }

View File

@ -51,8 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
@ -369,8 +372,10 @@ public class TestAMRestart {
rm1.stop(); rm1.stop();
} }
// AM container preempted, nm disk failure /**
// should not be counted towards AM max retry count. * AM container preempted, nm disk failure
* should not be counted towards AM max retry count.
*/
@Test(timeout = 100000) @Test(timeout = 100000)
public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
@ -396,7 +401,7 @@ public class TestAMRestart {
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
am1.getApplicationAttemptId()); am1.getApplicationAttemptId());
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
ApplicationStateData appState = ApplicationStateData appState =
((MemoryRMStateStore) rm1.getRMStateStore()).getState() ((MemoryRMStateStore) rm1.getRMStateStore()).getState()
@ -416,7 +421,7 @@ public class TestAMRestart {
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
am2.getApplicationAttemptId()); am2.getApplicationAttemptId());
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am3 = MockAM am3 =
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
@ -438,7 +443,7 @@ public class TestAMRestart {
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
am3.getApplicationAttemptId()); am3.getApplicationAttemptId());
Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry()); Assert.assertFalse(attempt3.shouldCountTowardsMaxAttemptRetry());
Assert.assertEquals(ContainerExitStatus.DISKS_FAILED, Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
appState.getAttempt(am3.getApplicationAttemptId()) appState.getAttempt(am3.getApplicationAttemptId())
.getAMContainerExitStatus()); .getAMContainerExitStatus());
@ -527,9 +532,11 @@ public class TestAMRestart {
rm1.stop(); rm1.stop();
} }
// Test RM restarts after AM container is preempted, new RM should not count /**
// AM preemption failure towards the max-retry-account and should be able to * Test RM restarts after AM container is preempted, new RM should not count
// re-launch the AM. * AM preemption failure towards the max-retry-account and should be able to
* re-launch the AM.
*/
@Test(timeout = 60000) @Test(timeout = 60000)
public void testPreemptedAMRestartOnRMRestart() throws Exception { public void testPreemptedAMRestartOnRMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
@ -604,9 +611,11 @@ public class TestAMRestart {
rm2.stop(); rm2.stop();
} }
// Test regular RM restart/failover, new RM should not count /**
// AM failure towards the max-retry-account and should be able to * Test regular RM restart/failover, new RM should not count
// re-launch the AM. * AM failure towards the max-retry-account and should be able to
* re-launch the AM.
*/
@Test(timeout = 50000) @Test(timeout = 50000)
public void testRMRestartOrFailoverNotCountedForAMFailures() public void testRMRestartOrFailoverNotCountedForAMFailures()
throws Exception { throws Exception {
@ -923,9 +932,11 @@ public class TestAMRestart {
rm1.stop(); rm1.stop();
} }
// Test restarting AM launched with the KeepContainers and AM reset window. /**
// after AM reset window, even if AM who was the last is failed, * Test restarting AM launched with the KeepContainers and AM reset window.
// all containers are launched by previous AM should be kept. * after AM reset window, even if AM who was the last is failed,
* all containers are launched by previous AM should be kept.
*/
@Test (timeout = 20000) @Test (timeout = 20000)
public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval() public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval()
throws Exception { throws Exception {
@ -993,4 +1004,69 @@ public class TestAMRestart {
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
rm1.stop(); rm1.stop();
} }
/**
* Test to verify that there is no queue resource leak after app fail.
*
* 1. Submit an app which is configured to keep containers across app
* attempts and should fail after AM finished (am-max-attempts=1).
* 2. App is started with 2 containers running on NM1 node.
* 3. Preempt the AM of the application which should not count towards max
* attempt retry but app will fail immediately.
* 4. Verify that the used resource of queue should be cleaned up normally
* after app fail.
*/
@Test(timeout = 30000)
public void testQueueResourceDoesNotLeakForCapacityScheduler()
throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200, 0, true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
allocateContainers(nm1, am1, 1);
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm1.getResourceScheduler();
ContainerId amContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Preempt AM container
scheduler.killContainer(scheduler.getRMContainer(amContainer));
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
am1.getApplicationAttemptId());
Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
// AM should not be restarted.
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
// After app1 failed, used resource of this queue should
// be cleaned up, otherwise resource leak happened.
LeafQueue queue =
(LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
Assert.assertEquals(0,
queue.getQueueResourceUsage().getUsed().getMemorySize());
Assert.assertEquals(0,
queue.getQueueResourceUsage().getUsed().getVirtualCores());
rm1.stop();
}
} }