From 026027166e1cf95f333055a32a300c6a2e43bd64 Mon Sep 17 00:00:00 2001 From: Naganarasimha Date: Fri, 26 Aug 2016 20:19:11 +0530 Subject: [PATCH] YARN-3940. Application moveToQueue should check NodeLabel permission. Contributed by Bibin A Chundatt (cherry picked from commit 46e02ab719d06f2708c0a61e8011b8f261235193) --- .../scheduler/capacity/CapacityScheduler.java | 35 ++++++ .../TestCapacitySchedulerNodeLabelUpdate.java | 116 +++++++++++++++++- 2 files changed, 150 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index bedf45570c4..def56eeb4d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -94,6 +94,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -1920,6 +1921,7 @@ public synchronized String moveApplication(ApplicationId appId, LeafQueue dest = getAndCheckLeafQueue(destQueueName); // Validation check - ACLs, submission limits for user & queue String user = app.getUser(); + checkQueuePartition(app, dest); try { dest.submitApplication(appId, user, destQueueName); } catch (AccessControlException e) { @@ -1944,6 +1946,39 @@ public synchronized String moveApplication(ApplicationId appId, return targetQueueName; } + /** + * Check application can be moved to queue with labels enabled. All labels in + * application life time will be checked + * + * @param appId + * @param dest + * @throws YarnException + */ + private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest) + throws YarnException { + if (!YarnConfiguration.areNodeLabelsEnabled(conf)) { + return; + } + Set targetqueuelabels = dest.getAccessibleNodeLabels(); + AppSchedulingInfo schedulingInfo = app.getAppSchedulingInfo(); + Set appLabelexpressions = schedulingInfo.getRequestedPartitions(); + // default partition access always available remove empty label + appLabelexpressions.remove(RMNodeLabelsManager.NO_LABEL); + Set nonAccessiblelabels = new HashSet(); + for (String label : appLabelexpressions) { + if (!SchedulerUtils.checkQueueLabelExpression(targetqueuelabels, label, + null)) { + nonAccessiblelabels.add(label); + } + } + if (nonAccessiblelabels.size() > 0) { + throw new YarnException( + "Specified queue=" + dest.getQueueName() + " can't satisfy following " + + "apps label expressions =" + nonAccessiblelabels + + " accessible node labels =" + targetqueuelabels); + } + } + /** * Check that the String provided in input is the name of an existing, * LeafQueue, if successful returns the queue. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 9aef77c8d69..0ae77f24390 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.junit.Assert.fail; + import java.util.ArrayList; import java.util.HashSet; import java.util.Map; @@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -86,7 +89,53 @@ private Configuration getConfigurationWithQueueLabels(Configuration config) { return conf; } - + + private Configuration getConfigurationWithSubQueueLabels( + Configuration config) { + CapacitySchedulerConfiguration conf2 = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf2.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b"}); + conf2.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf2.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + + final String a = CapacitySchedulerConfiguration.ROOT + ".a"; + final String b = CapacitySchedulerConfiguration.ROOT + ".b"; + final String aa1 = a + ".a1"; + final String aa2 = a + ".a2"; + final String aa3 = a + ".a3"; + final String aa4 = a + ".a4"; + conf2.setQueues(a, new String[] {"a1", "a2", "a3", "a4"}); + conf2.setCapacity(a, 50); + conf2.setCapacity(b, 50); + conf2.setCapacity(aa1, 40); + conf2.setCapacity(aa2, 20); + conf2.setCapacity(aa3, 20); + conf2.setCapacity(aa4, 20); + conf2.setAccessibleNodeLabels(a, ImmutableSet.of("x", "y", "z")); + conf2.setAccessibleNodeLabels(aa1, ImmutableSet.of("x", "y")); + conf2.setAccessibleNodeLabels(aa2, ImmutableSet.of("y")); + conf2.setAccessibleNodeLabels(aa3, ImmutableSet.of("x", "y", "z")); + conf2.setAccessibleNodeLabels(aa4, ImmutableSet.of("x", "y")); + conf2.setCapacityByLabel(a, "x", 50); + conf2.setCapacityByLabel(a, "y", 50); + conf2.setCapacityByLabel(a, "z", 50); + conf2.setCapacityByLabel(b, "x", 50); + conf2.setCapacityByLabel(b, "y", 50); + conf2.setCapacityByLabel(b, "z", 50); + conf2.setCapacityByLabel(aa1, "x", 50); + conf2.setCapacityByLabel(aa3, "x", 25); + conf2.setCapacityByLabel(aa4, "x", 25); + conf2.setCapacityByLabel(aa1, "y", 25); + conf2.setCapacityByLabel(aa2, "y", 25); + conf2.setCapacityByLabel(aa4, "y", 50); + conf2.setCapacityByLabel(aa3, "z", 50); + conf2.setCapacityByLabel(aa4, "z", 50); + return conf2; + } + private Set toSet(String... elements) { Set set = Sets.newHashSet(elements); return set; @@ -364,6 +413,71 @@ public RMNodeLabelsManager createNodeLabelManager() { rm.close(); } + @Test(timeout = 3000000) + public void testMoveApplicationWithLabel() throws Exception { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("z"))); + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithSubQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 4096 * 2); + MockNM nm2 = rm.registerNode("h2:1234", 4096 * 2); + MockNM nm3 = rm.registerNode("h3:1234", 4096 * 2); + MockNM nm4 = rm.registerNode("h4:1234", 4096 * 2); + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm.submitApp(GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3); + am1.allocate("*", GB, 1, new ArrayList(), "x"); + ContainerId container1 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm.waitForState(nm1, container1, RMContainerState.ALLOCATED, 10 * 1000); + am1.allocate("*", GB, 1, new ArrayList(), "y"); + ContainerId container2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + rm.waitForState(nm2, container2, RMContainerState.ALLOCATED, 10 * 1000); + CapacityScheduler scheduler = + ((CapacityScheduler) rm.getResourceScheduler()); + try { + scheduler.moveApplication(app1.getApplicationId(), "a2"); + fail("Should throw exception since target queue doesnt have " + + "required labels"); + } catch (Exception e) { + Assert.assertTrue("Yarn Exception should be thrown", + e instanceof YarnException); + Assert.assertEquals("Specified queue=a2 can't satisfy " + + "following apps label expressions =[x] accessible " + + "node labels =[y]", e.getMessage()); + } + try { + scheduler.moveApplication(app1.getApplicationId(), "a3"); + scheduler.moveApplication(app1.getApplicationId(), "a4"); + // Check move to queue with accessible label ANY + scheduler.moveApplication(app1.getApplicationId(), "b"); + } catch (Exception e) { + fail("Should not throw exception since target queue has " + + "required labels"); + } + rm.stop(); + } @Test (timeout = 60000) public void testComplexResourceUsageWhenNodeUpdatesPartition()