diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 3e41da7aa44..541ebb25c4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1336,6 +1336,14 @@ public abstract class AbstractYarnScheduler return -1; } + /** + * Kill a RMContainer. This is meant to be called in tests only to simulate + * AM container failures. + * @param container the container to kill + */ + @VisibleForTesting + public abstract void killContainer(RMContainer container); + /** * Update internal state of the scheduler. This can be useful for scheduler * implementations that maintain some state that needs to be periodically diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 7b554db4705..041a762e9fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -99,6 +99,19 @@ public class SchedulerUtils { ContainerExitStatus.ABORTED, diagnostics); } + + /** + * Utility to create a {@link ContainerStatus} for killed containers. + * @param containerId {@link ContainerId} of the killed container. + * @param diagnostics diagnostic message + * @return ContainerStatus for a killed container + */ + public static ContainerStatus createKilledContainerStatus( + ContainerId containerId, String diagnostics) { + return createAbnormalContainerStatus(containerId, + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, diagnostics); + } + /** * Utility to create a {@link ContainerStatus} during exceptional * circumstances. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d472d35bedc..eb421dda89c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1760,6 +1760,12 @@ public class CapacityScheduler extends } } + @VisibleForTesting + @Override + public void killContainer(RMContainer container) { + markContainerForKillable(container); + } + public void markContainerForKillable( RMContainer killableContainer) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 31594fad478..a14b2ec87cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -793,6 +793,16 @@ public class FairScheduler extends incrAllocation); } + @VisibleForTesting + @Override + public void killContainer(RMContainer container) { + ContainerStatus status = SchedulerUtils.createKilledContainerStatus( + container.getContainerId(), + "Killed by RM to simulate an AM container failure"); + LOG.info("Killing container " + container); + completedContainer(container, status, RMContainerEventType.KILL); + } + @Override public Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List release, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 01ab6bfae89..ac1d005555b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -984,6 +984,16 @@ public class FifoScheduler extends updateAvailableResourcesMetrics(); } + @VisibleForTesting + @Override + public void killContainer(RMContainer container) { + ContainerStatus status = SchedulerUtils.createKilledContainerStatus( + container.getContainerId(), + "Killed by RM to simulate an AM container failure"); + LOG.info("Killing container " + container); + completedContainer(container, status, RMContainerEventType.KILL); + } + @Override public synchronized void recoverContainersOnNode( List containerReports, RMNode nm) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 9d0d87979cb..c43069bac08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -51,10 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -376,8 +374,6 @@ public class TestAMRestart { @Test(timeout = 100000) public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); @@ -389,12 +385,12 @@ public class TestAMRestart { RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the next attempt; - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, @@ -414,7 +410,7 @@ public class TestAMRestart { // Preempt the second attempt. ContainerId amContainer2 = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2)); + scheduler.killContainer(scheduler.getRMContainer(amContainer2)); rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, @@ -503,8 +499,6 @@ public class TestAMRestart { @Test(timeout = 100000) public void testMaxAttemptOneMeansOne() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); @@ -516,12 +510,12 @@ public class TestAMRestart { RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the attempt; - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, @@ -539,8 +533,6 @@ public class TestAMRestart { @Test(timeout = 60000) public void testPreemptedAMRestartOnRMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); @@ -556,8 +548,8 @@ public class TestAMRestart { RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); @@ -577,7 +569,7 @@ public class TestAMRestart { // Forcibly preempt the am container; amContainer = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry()); @@ -619,8 +611,6 @@ public class TestAMRestart { public void testRMRestartOrFailoverNotCountedForAMFailures() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); @@ -631,8 +621,8 @@ public class TestAMRestart { MockRM rm1 = new MockRM(conf); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -694,8 +684,6 @@ public class TestAMRestart { @Test (timeout = 120000) public void testRMAppAttemptFailuresValidityInterval() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);