From 0c1fb15b29406c245d41e60ffd09173f96e10e57 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Thu, 12 Nov 2015 11:09:37 -0800 Subject: [PATCH] YARN-4287. Capacity Scheduler: Rack Locality improvement (Nathan Roberts via wangda) (cherry picked from commit 796638d9bc86235b9f3e5d1a3a9a25bbf5c04d1c) --- hadoop-yarn-project/CHANGES.txt | 2 + .../scheduler/AppSchedulingInfo.java | 4 +- .../scheduler/QueueMetrics.java | 37 ++++++++++ .../CapacitySchedulerConfiguration.java | 14 +++- .../scheduler/capacity/LeafQueue.java | 7 ++ .../allocator/AbstractContainerAllocator.java | 2 +- .../allocator/RegularContainerAllocator.java | 17 +++-- .../scheduler/TestQueueMetrics.java | 55 +++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 67 +++++++++++++++++-- 9 files changed, 191 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5044b69475b..1ab2be0bc9c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -502,6 +502,8 @@ Release 2.8.0 - UNRELEASED YARN-4310. FairScheduler: Log skipping reservation messages at DEBUG level (asuresh) + YARN-4287. Capacity Scheduler: Rack Locality improvement (Nathan Roberts via wangda) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 31560867a62..c5f8cd1247b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -595,9 +595,11 @@ public class AppSchedulingInfo { + " container=" + container.getId() + " host=" + container.getNodeId().toString() + " user=" + user - + " resource=" + request.getCapability()); + + " resource=" + request.getCapability() + + " type=" + type); } metrics.allocateResources(user, 1, request.getCapability(), true); + metrics.incrNodeTypeAggregations(user, type); return resourceRequests; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index d94b6218252..68ae3648109 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -64,6 +64,12 @@ public class QueueMetrics implements MetricsSource { @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores; @Metric("# of allocated containers") MutableGaugeInt allocatedContainers; @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated; + @Metric("Aggregate # of allocated node-local containers") + MutableCounterLong aggregateNodeLocalContainersAllocated; + @Metric("Aggregate # of allocated rack-local containers") + MutableCounterLong aggregateRackLocalContainersAllocated; + @Metric("Aggregate # of allocated off-switch containers") + MutableCounterLong aggregateOffSwitchContainersAllocated; @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; @Metric("Available memory in MB") MutableGaugeInt availableMB; @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores; @@ -379,6 +385,25 @@ public class QueueMetrics implements MetricsSource { pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1)); } + public void incrNodeTypeAggregations(String user, NodeType type) { + if (type == NodeType.NODE_LOCAL) { + aggregateNodeLocalContainersAllocated.incr(); + } else if (type == NodeType.RACK_LOCAL) { + aggregateRackLocalContainersAllocated.incr(); + } else if (type == NodeType.OFF_SWITCH) { + aggregateOffSwitchContainersAllocated.incr(); + } else { + return; + } + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incrNodeTypeAggregations(user, type); + } + if (parent != null) { + parent.incrNodeTypeAggregations(user, type); + } + } + public void allocateResources(String user, int containers, Resource res, boolean decrPending) { // if #containers = 0, means change container resource @@ -562,6 +587,18 @@ public class QueueMetrics implements MetricsSource { return aggregateContainersAllocated.value(); } + public long getAggregateNodeLocalContainersAllocated() { + return aggregateNodeLocalContainersAllocated.value(); + } + + public long getAggregateRackLocalContainersAllocated() { + return aggregateRackLocalContainersAllocated.value(); + } + + public long getAggregateOffSwitchContainersAllocated() { + return aggregateOffSwitchContainersAllocated.value(); + } + public long getAggegatedReleasedContainers() { return aggregateContainersReleased.value(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index a99122d9905..1e62b44c772 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -184,6 +184,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final int DEFAULT_NODE_LOCALITY_DELAY = 40; + @Private + public static final String RACK_LOCALITY_FULL_RESET = + PREFIX + "rack-locality-full-reset"; + + @Private + public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true; + @Private public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX = PREFIX + "schedule-asynchronously"; @@ -664,7 +671,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public int getNodeLocalityDelay() { return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY); } - + + public boolean getRackLocalityFullReset() { + return getBoolean(RACK_LOCALITY_FULL_RESET, + DEFAULT_RACK_LOCALITY_FULL_RESET); + } + public ResourceCalculator getResourceCalculator() { return ReflectionUtils.newInstance( getClass( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index d92a821deef..ef2bf7e4333 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -86,6 +86,7 @@ public class LeafQueue extends AbstractCSQueue { private float maxAMResourcePerQueuePercent; private volatile int nodeLocalityDelay; + private volatile boolean rackLocalityFullReset; Map applicationAttemptMap = new HashMap(); @@ -190,6 +191,7 @@ public class LeafQueue extends AbstractCSQueue { } nodeLocalityDelay = conf.getNodeLocalityDelay(); + rackLocalityFullReset = conf.getRackLocalityFullReset(); // re-init this since max allocation could have changed this.minimumAllocationFactor = @@ -1002,6 +1004,11 @@ public class LeafQueue extends AbstractCSQueue { return nodeLocalityDelay; } + @Lock(NoLock.class) + public boolean getRackLocalityFullReset() { + return rackLocalityFullReset; + } + @Lock(NoLock.class) private Resource computeUserLimit(FiCaSchedulerApp application, Resource clusterResource, User user, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index b986b1fbf5d..ee01bd17b30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -89,7 +89,7 @@ public abstract class AbstractContainerAllocator { LOG.info("assignedContainer" + " application attempt=" + application.getApplicationAttemptId() + " container=" + updatedContainer.getId() + " queue=" + this + " clusterResource=" - + clusterResource); + + clusterResource + " type=" + assignment.getType()); application .getCSLeafQueue() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index fd99d29e574..6bd4d17982b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -246,8 +246,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { float localityWaitFactor = getLocalityWaitFactor(priority, rmContext.getScheduler() .getNumClusterNodes()); - - return ((requiredContainers * localityWaitFactor) < missedOpportunities); + // Cap the delay by the number of nodes in the cluster. Under most conditions + // this means we will consider each node in the cluster before + // accepting an off-switch assignment. + return (Math.min(rmContext.getScheduler().getNumClusterNodes(), + (requiredContainers * localityWaitFactor)) < missedOpportunities); } // Check if we need containers on this rack @@ -643,9 +646,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { if (LOG.isDebugEnabled()) { LOG.debug("Resetting scheduling opportunities"); } - application.resetSchedulingOpportunities(priority); + // Only reset scheduling opportunities for RACK_LOCAL if configured + // to do so. Not resetting means we will continue to schedule + // RACK_LOCAL without delay. + if (allocationResult.containerNodeType == NodeType.NODE_LOCAL + || application.getCSLeafQueue().getRackLocalityFullReset()) { + application.resetSchedulingOpportunities(priority); + } } - + // Non-exclusive scheduling opportunity is different: we need reset // it every time to make sure non-labeled resource request will be // most likely allocated on non-labeled nodes first. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index 8ad71d231bc..864620f1f54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -198,6 +198,53 @@ public class TestQueueMetrics { checkApps(userSource, 1, 0, 0, 1, 0, 0, true); } + + @Test public void testNodeTypeMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String user = "alice"; + + QueueMetrics parentMetrics = + QueueMetrics.forQueue(ms, parentQueueName, null, true, conf); + Queue parentQueue = make(stub(Queue.class).returning(parentMetrics). + from.getMetrics()); + QueueMetrics metrics = + QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf); + MetricsSource parentQueueSource = queueSource(ms, parentQueueName); + MetricsSource queueSource = queueSource(ms, leafQueueName); + //AppSchedulingInfo app = mockApp(user); + + metrics.submitApp(user); + MetricsSource userSource = userSource(ms, leafQueueName, user); + MetricsSource parentUserSource = userSource(ms, parentQueueName, user); + + metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL); + checkAggregatedNodeTypes(queueSource,1L,0L,0L); + checkAggregatedNodeTypes(parentQueueSource,1L,0L,0L); + checkAggregatedNodeTypes(userSource,1L,0L,0L); + checkAggregatedNodeTypes(parentUserSource,1L,0L,0L); + + metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL); + checkAggregatedNodeTypes(queueSource,1L,1L,0L); + checkAggregatedNodeTypes(parentQueueSource,1L,1L,0L); + checkAggregatedNodeTypes(userSource,1L,1L,0L); + checkAggregatedNodeTypes(parentUserSource,1L,1L,0L); + + metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); + checkAggregatedNodeTypes(queueSource,1L,1L,1L); + checkAggregatedNodeTypes(parentQueueSource,1L,1L,1L); + checkAggregatedNodeTypes(userSource,1L,1L,1L); + checkAggregatedNodeTypes(parentUserSource,1L,1L,1L); + + metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); + checkAggregatedNodeTypes(queueSource,1L,1L,2L); + checkAggregatedNodeTypes(parentQueueSource,1L,1L,2L); + checkAggregatedNodeTypes(userSource,1L,1L,2L); + checkAggregatedNodeTypes(parentUserSource,1L,1L,2L); + + } + + @Test public void testTwoLevelWithUserMetrics() { String parentQueueName = "root"; String leafQueueName = "root.leaf"; @@ -367,6 +414,14 @@ public class TestQueueMetrics { assertGauge("ReservedContainers", reservedCtnrs, rb); } + public static void checkAggregatedNodeTypes(MetricsSource source, + long nodeLocal, long rackLocal, long offSwitch) { + MetricsRecordBuilder rb = getMetrics(source); + assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb); + assertCounter("AggregateRackLocalContainersAllocated", rackLocal, rb); + assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb); + } + private static AppSchedulingInfo mockApp(String user) { AppSchedulingInfo app = mock(AppSchedulingInfo.class); when(app.getUser()).thenReturn(user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index b85c6972f51..d3365de67a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1506,8 +1506,8 @@ public class TestLeafQueue { @Test public void testLocalityScheduling() throws Exception { - // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + // Manipulate queue 'b' + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B)); // User String user_0 = "user_0"; @@ -1614,34 +1614,87 @@ public class TestLeafQueue { TestUtils.createResourceRequest(host_1, 1*GB, 1, true, priority, recordFactory)); app_0_requests_0.add( - TestUtils.createResourceRequest(rack_1, 1*GB, 1, + TestUtils.createResourceRequest(rack_1, 1*GB, 3, true, priority, recordFactory)); app_0_requests_0.add( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 4, // one extra true, priority, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); - assertEquals(2, app_0.getTotalRequiredResources(priority)); + assertEquals(4, app_0.getTotalRequiredResources(priority)); String host_3 = "127.0.0.4"; // on rack_1 FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); // Rack-delay + doReturn(true).when(a).getRackLocalityFullReset(); doReturn(1).when(a).getNodeLocalityDelay(); // Shouldn't assign RACK_LOCAL yet assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(1, app_0.getSchedulingOpportunities(priority)); - assertEquals(2, app_0.getTotalRequiredResources(priority)); + assertEquals(4, app_0.getTotalRequiredResources(priority)); // Should assign RACK_LOCAL now assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset + assertEquals(3, app_0.getTotalRequiredResources(priority)); + + // Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset. + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(1, app_0.getSchedulingOpportunities(priority)); + assertEquals(3, app_0.getTotalRequiredResources(priority)); + + // Next time we schedule RACK_LOCAL, don't reset + doReturn(false).when(a).getRackLocalityFullReset(); + + // Should assign RACK_LOCAL now + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); + assertEquals(2, app_0.getSchedulingOpportunities(priority)); // should NOT reset + assertEquals(2, app_0.getTotalRequiredResources(priority)); + + // Another RACK_LOCAL since schedulingOpportunities not reset + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); + assertEquals(3, app_0.getSchedulingOpportunities(priority)); // should NOT reset assertEquals(1, app_0.getTotalRequiredResources(priority)); + + // Add a request larger than cluster size to verify + // OFF_SWITCH delay is capped by cluster size + app_0.resetSchedulingOpportunities(priority); + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_0, 1*GB, 100, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_0, 1*GB, 100, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 100, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // Start with off switch. 3 nodes in cluster so shouldn't allocate first 3 + for (int i = 0; i < numNodes; i++) { + assignment = + a.assignContainers(clusterResource, node_2, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + verifyNoContainerAllocated(assignment); + assertEquals(i+1, app_0.getSchedulingOpportunities(priority)); + } + // delay should be capped at numNodes so next one should allocate + assignment = a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); + assertEquals(numNodes+1, app_0.getSchedulingOpportunities(priority)); } - + @Test public void testApplicationPriorityScheduling() throws Exception { // Manipulate queue 'a'