+ * Update Queue Statistics: + *
+ * + *+ * When nodePartition is null, all partition of + * used-capacity/absolute-used-capacity will be updated. + *
+ */ + @Lock(CSQueue.class) + public static void updateQueueStatistics( + final ResourceCalculator rc, final Resource cluster, final Resource minimumAllocation, + final CSQueue childQueue, final RMNodeLabelsManager nlm, + final String nodePartition) { + QueueCapacities queueCapacities = childQueue.getQueueCapacities(); + ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage(); + + if (nodePartition == null) { + for (String partition : Sets.union( + queueCapacities.getNodePartitionsSet(), + queueResourceUsage.getNodePartitionsSet())) { + updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), + minimumAllocation, queueResourceUsage, queueCapacities, partition); + } + } else { + updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), + minimumAllocation, queueResourceUsage, queueCapacities, nodePartition); + } + + // Now in QueueMetrics, we only store available-resource-to-queue for + // default partition. + if (nodePartition == null + || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + childQueue.getMetrics().setAvailableResourcesToQueue( + getNonPartitionedMaxAvailableResourceToQueue(rc, + nlm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, cluster), + childQueue)); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cfeee37d1e6..5d58b153953 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -136,7 +136,8 @@ public class CapacityScheduler extends // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; - static final Comparator+ * root + * / \ + * a b + * / \ (x) + * a1 a2 + * (x) (x) + *+ * + * Both a/b can access x, we need to verify when + *
+ * 1) container allocated/released in both partitioned/non-partitioned node, + * 2) clusterResource updates + * 3) queue guaranteed resource changed + *+ * + * used capacity / absolute used capacity of queues are correctly updated. + */ + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a", + "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + /** + * Initially, we set A/B's resource 50:50 + */ + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(A, 50); + csConf.setAccessibleNodeLabels(A, toSet("x")); + csConf.setCapacityByLabel(A, "x", 50); + + csConf.setQueues(A, new String[] { "a1", "a2" }); + + final String A1 = A + ".a1"; + csConf.setCapacity(A1, 50); + csConf.setAccessibleNodeLabels(A1, toSet("x")); + csConf.setCapacityByLabel(A1, "x", 50); + + final String A2 = A + ".a2"; + csConf.setCapacity(A2, 50); + csConf.setAccessibleNodeLabels(A2, toSet("x")); + csConf.setCapacityByLabel(A2, "x", 50); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(B, 50); + csConf.setAccessibleNodeLabels(B, toSet("x")); + csConf.setCapacityByLabel(B, "x", 50); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x")); + // Makes x to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + /* + * Before we adding any node to the cluster, used-capacity/abs-used-capacity + * should be 0 + */ + checkQueueUsedCapacity("a", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a", cs, "", 0f, 0f); + checkQueueUsedCapacity("a1", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a1", cs, "", 0f, 0f); + checkQueueUsedCapacity("a2", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a2", cs, "", 0f, 0f); + checkQueueUsedCapacity("b", cs, "x", 0f, 0f); + checkQueueUsedCapacity("b", cs, "", 0f, 0f); + checkQueueUsedCapacity("root", cs, "x", 0f, 0f); + checkQueueUsedCapacity("root", cs, "", 0f, 0f); + + MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label =
+ * root + * ________________ + * / | \ \ + * a (x) b (x) c d + *+ * + * Both a/b can access x, we need to verify when + *
+ * When doing allocation on partitioned nodes, + * - Queue has accessibility to the node will go first + * - When accessibility is same + * - Queue has less used_capacity on given partition will go first + * - When used_capacity is same + * - Queue has more abs_capacity will go first + *+ * + * used capacity / absolute used capacity of queues are correctly updated. + */ + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a", + "b", "c", "d" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(A, 25); + csConf.setAccessibleNodeLabels(A, toSet("x")); + csConf.setCapacityByLabel(A, "x", 30); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(B, 25); + csConf.setAccessibleNodeLabels(B, toSet("x")); + csConf.setCapacityByLabel(B, "x", 70); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + csConf.setCapacity(C, 25); + + final String D = CapacitySchedulerConfiguration.ROOT + ".d"; + csConf.setCapacity(D, 25); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x")); + // Makes x to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label =