From 5ff5f67332b527acaca7a69ac421930a02ca55b3 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Thu, 21 Jan 2016 11:15:04 +0800 Subject: [PATCH] YARN-4557. Fix improper Queues sorting in PartitionedQueueComparator when accessible-node-labels=*. (Naganarasimha G R via wangda) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/AppSchedulingInfo.java | 2 +- .../capacity/PartitionedQueueComparator.java | 10 ++- .../TestNodeLabelContainerAllocation.java | 77 ++++++++++++++++++- 4 files changed, 87 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 212ffa17a8a..bd467eaeeb2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1289,6 +1289,9 @@ Release 2.8.0 - UNRELEASED YARN-4565. Fix a bug that leads to AM resource limit not hornored when sizeBasedWeight enabled for FairOrderingPolicy. (wtan via jianhe) + YARN-4557. Fix improper Queues sorting in PartitionedQueueComparator + when accessible-node-labels=*. (Naganarasimha G R via wangda) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 631b4182afd..07f3d8baae9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class AppSchedulingInfo { private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); - private static final Comparator COMPARATOR = + private static final Comparator COMPARATOR = new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator(); private static final int EPOCH_BIT_SHIFT = 40; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java index ddcc719ce00..477c615fe53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.Comparator; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; + public class PartitionedQueueComparator implements Comparator { private String partitionToLookAt = null; @@ -35,15 +37,17 @@ public class PartitionedQueueComparator implements Comparator { * the other not, accessible queue goes first. */ boolean q1Accessible = - q1.getAccessibleNodeLabels().contains(partitionToLookAt); + q1.getAccessibleNodeLabels().contains(partitionToLookAt) + || q1.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY); boolean q2Accessible = - q2.getAccessibleNodeLabels().contains(partitionToLookAt); + q2.getAccessibleNodeLabels().contains(partitionToLookAt) + || q2.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY); if (q1Accessible && !q2Accessible) { return -1; } else if (!q1Accessible && q2Accessible) { return 1; } - + /* * * 2. When two queue has same accessibility, check who will go first: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index dff82ca5b47..bbf6e43df5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -502,7 +503,6 @@ public class TestNodeLabelContainerAllocation { }; rm1.getRMContext().setNodeLabelManager(mgr); rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // no label MockNM nm2 = rm1.registerNode("h2:1234", 40 * GB); // label = y // launch an app to queue b1 (label = y), AM container should be launched in // nm2 @@ -1470,9 +1470,11 @@ public class TestNodeLabelContainerAllocation { csConf.setCapacityByLabel(B, "x", 70); final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + csConf.setAccessibleNodeLabels(C, Collections. emptySet()); csConf.setCapacity(C, 25); final String D = CapacitySchedulerConfiguration.ROOT + ".d"; + csConf.setAccessibleNodeLabels(D, Collections. emptySet()); csConf.setCapacity(D, 25); // set node -> label @@ -1601,4 +1603,77 @@ public class TestNodeLabelContainerAllocation { cs.getApplicationAttempt(am4.getApplicationAttemptId())); } + + @Test + public void testOrderOfAllocationOnPartitionsWhenAccessibilityIsAll() + throws Exception { + /** + * Test case: have a following queue structure: + * + *
+     *             root
+     *          __________
+     *         /          \
+     *        a (*)      b (x)
+     * 
+ * + * Both queues a/b can access x, we need to verify whether * accessibility + * is considered in ordering of queues + */ + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(A, 25); + csConf.setAccessibleNodeLabels(A, toSet("*")); + csConf.setCapacityByLabel(A, "x", 60); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(B, 75); + csConf.setAccessibleNodeLabels(B, toSet("x")); + csConf.setCapacityByLabel(B, "x", 40); + + // set node -> label + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x + + // app1 -> a + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // app2 -> b + RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b", "x"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + // Both a/b has used_capacity(x) = 0, when doing exclusive allocation, a + // will go first since a has more capacity(x) + am1.allocate("*", 1 * GB, 1, new ArrayList(), "x"); + am2.allocate("*", 1 * GB, 1, new ArrayList(), "x"); + doNMHeartbeat(rm, nm1.getNodeId(), 1); + checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + } }