From 536254f5e746eeb2238d2859f8533dc9336ff11c Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 13 Oct 2014 14:08:38 -0700 Subject: [PATCH] YARN-2308. Changed CapacityScheduler to explicitly throw exception if the queue to which the apps were submitted is changed across RM restart. Contributed by Craig Welch & Chang Li (cherry picked from commit f9680d9a160ee527c8f2c1494584abf1a1f70f82) --- .../scheduler/capacity/CapacityScheduler.java | 13 ++++ .../yarn/server/resourcemanager/MockRM.java | 9 +++ .../TestWorkPreservingRMRestart.java | 78 +++++++++++++++++++ 3 files changed, 100 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 02f27b8cdca..ed5518c00cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -648,6 +648,19 @@ public class CapacityScheduler extends // sanity checks. CSQueue queue = getQueue(queueName); if (queue == null) { + //During a restart, this indicates a queue was removed, which is + //not presently supported + if (isAppRecovering) { + //throwing RuntimeException because some other exceptions are caught + //(including YarnRuntimeException) and we want this to force an exit + String queueErrorMsg = "Queue named " + queueName + + " missing during application recovery." + + " Queue removal during recovery is not presently supported by the" + + " capacity scheduler, please restart with all queues configured" + + " which were present before shutdown/restart."; + LOG.fatal(queueErrorMsg); + throw new RuntimeException(queueErrorMsg); + } String message = "Application " + applicationId + " submitted by user " + user + " to unknown queue: " + queueName; this.rmContext.getDispatcher().getEventHandler() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 5d37d48fc68..cfac585997d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -249,6 +249,15 @@ public class MockRM extends ResourceManager { super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, String queue, + boolean waitForAccepted) throws Exception { + return submitApp(masterMemory, name, user, acls, false, queue, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + waitForAccepted); + } public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 5f00f3180b5..aadfbba0396 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -336,6 +336,8 @@ public class TestWorkPreservingRMRestart { private static final String R = "Default"; private static final String A = "QueueA"; private static final String B = "QueueB"; + //don't ever create the below queue ;-) + private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue"; private static final String USER_1 = "user1"; private static final String USER_2 = "user2"; @@ -351,6 +353,18 @@ public class TestWorkPreservingRMRestart { conf.setDouble(CapacitySchedulerConfiguration .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f); } + + private void setupQueueConfigurationOnlyA( + CapacitySchedulerConfiguration conf) { + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R }); + final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R; + conf.setCapacity(Q_R, 100); + final String Q_A = Q_R + "." + A; + conf.setQueues(Q_R, new String[] {A}); + conf.setCapacity(Q_A, 100); + conf.setDouble(CapacitySchedulerConfiguration + .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f); + } // Test CS recovery with multi-level queues and multi-users: // 1. setup 2 NMs each with 8GB memory; @@ -470,6 +484,70 @@ public class TestWorkPreservingRMRestart { totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(), totalUsedResource.getVirtualCores()); } + + //Test that we receive a meaningful exit-causing exception if a queue + //is removed during recovery + //1. Add some apps to two queues, attempt to add an app to a non-existant + // queue to verify that the new logic is not in effect during normal app + // submission + //2. Remove one of the queues, restart the RM + //3. Verify that the expected exception was thrown + @Test (timeout = 30000) + public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { + if (!schedulerClass.equals(CapacityScheduler.class)) { + return; + } + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfiguration(csConf); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(csConf); + rm1 = new MockRM(csConf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm2 = + new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); + RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A); + MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1); + RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A); + MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2); + + RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + //Submit an app with a non existant queue to make sure it does not + //cause a fatal failure in the non-recovery case + RMApp appNA = rm1.submitApp(1024, "app1_2", USER_1, null, + QUEUE_DOESNT_EXIST, false); + + // clear queue metrics + rm1.clearQueueMetrics(app1_1); + rm1.clearQueueMetrics(app1_2); + rm1.clearQueueMetrics(app2); + + // Re-start RM + csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationOnlyA(csConf); + rm2 = new MockRM(csConf, memStore); + boolean runtimeThrown = false; + try { + rm2.start(); + } catch (RuntimeException e) { + //we're catching it because we want to verify the message + //and we don't want to set it as an expected exception for the + //test because we only want it to happen here + assertTrue(e.getMessage().contains(B + " missing")); + runtimeThrown = true; + } + assertTrue(runtimeThrown); + } private void checkParentQueue(ParentQueue parentQueue, int numContainers, Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {