configs = new HashMap<>();
+
+ String subStr = queueExpr.substring(left + 1, right);
+ for (String kv : subStr.split(",")) {
+ if (kv.contains("=")) {
+ String key = kv.substring(0, kv.indexOf("="));
+ String value = kv.substring(kv.indexOf("=") + 1);
+ configs.put(key, value);
+ }
+ }
+
+ return configs;
+ }
+ }
+
+ return Collections.EMPTY_MAP;
+ }
+
/**
* Level of a queue is how many "-" at beginning, root's level is 0
*/
@@ -740,6 +801,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
}
+ public void checkPriority(CSQueue queue, int expectedPriority) {
+ Assert.assertEquals(expectedPriority, queue.getPriority().getPriority());
+ }
+
public void checkReservedResource(CSQueue queue, String partition, int reserved) {
ResourceUsage ru = queue.getQueueResourceUsage();
Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
new file mode 100644
index 00000000000..2b54d77d301
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestPreemptionForQueueWithPriorities
+ extends ProportionalCapacityPreemptionPolicyMockFramework {
+ @Before
+ public void setup() {
+ super.setup();
+ policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+ }
+
+ @Test
+ public void testPreemptionForHighestPriorityUnderutilizedQueue()
+ throws IOException {
+ /**
+ * The simplest test of queue with priorities, Queue structure is:
+ *
+ *
+ * root
+ * / | \
+ * a b c
+ *
+ *
+ * For priorities
+ * - a=1
+ * - b/c=2
+ *
+ * So c will preempt more resource from a, till a reaches guaranteed
+ * resource.
+ */
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[30 100 40 50]){priority=1};" + // a
+ "-b(=[30 100 59 50]){priority=2};" + // b
+ "-c(=[40 100 1 25]){priority=2}"; // c
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t(1,1,n1,,40,false);" + // app1 in a
+ "b\t(1,1,n1,,59,false);" + // app2 in b
+ "c\t(1,1,n1,,1,false);"; // app3 in c
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 10 preempted from app1, 15 preempted from app2, and nothing preempted
+ // from app3
+ verify(mDisp, times(10)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(15)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testPreemptionForLowestPriorityUnderutilizedQueue()
+ throws IOException {
+ /**
+ * Similar to above, make sure we can still make sure less utilized queue
+ * can get resource first regardless of priority.
+ *
+ * Queue structure is:
+ *
+ *
+ * root
+ * / | \
+ * a b c
+ *
+ *
+ * For priorities
+ * - a=1
+ * - b=2
+ * - c=0
+ *
+ * So c will preempt more resource from a, till a reaches guaranteed
+ * resource.
+ */
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[30 100 40 50]){priority=1};" + // a
+ "-b(=[30 100 59 50]){priority=2};" + // b
+ "-c(=[40 100 1 25]){priority=0}"; // c
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t(1,1,n1,,40,false);" + // app1 in a
+ "b\t(1,1,n1,,59,false);" + // app2 in b
+ "c\t(1,1,n1,,1,false);"; // app3 in c
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 10 preempted from app1, 15 preempted from app2, and nothing preempted
+ // from app3
+ verify(mDisp, times(10)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(15)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testPreemptionWontHappenBetweenSatisfiedQueues()
+ throws IOException {
+ /**
+ * No preemption happen if a queue is already satisfied, regardless of
+ * priority
+ *
+ * Queue structure is:
+ *
+ *
+ * root
+ * / | \
+ * a b c
+ *
+ *
+ * For priorities
+ * - a=1
+ * - b=1
+ * - c=2
+ *
+ * When c is satisfied, it will not preempt any resource from other queues
+ */
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[30 100 0 0]){priority=1};" + // a
+ "-b(=[30 100 40 50]){priority=1};" + // b
+ "-c(=[40 100 60 25]){priority=2}"; // c
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "b\t(1,1,n1,,40,false);" + // app1 in b
+ "c\t(1,1,n1,,60,false)"; // app2 in c
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Nothing preempted
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testPreemptionForMultipleQueuesInTheSamePriorityBuckets()
+ throws IOException {
+ /**
+ * When a cluster has different priorities, each priority has multiple
+ * queues, preemption policy should try to balance resource between queues
+ * with same priority by ratio of their capacities
+ *
+ * Queue structure is:
+ *
+ *
+ * root
+ * - a (capacity=10), p=1
+ * - b (capacity=15), p=1
+ * - c (capacity=20), p=2
+ * - d (capacity=25), p=2
+ * - e (capacity=30), p=2
+ *
+ */
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[10 100 35 50]){priority=1};" + // a
+ "-b(=[15 100 25 50]){priority=1};" + // b
+ "-c(=[20 100 39 50]){priority=2};" + // c
+ "-d(=[25 100 0 0]){priority=2};" + // d
+ "-e(=[30 100 1 99]){priority=2}"; // e
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t(1,1,n1,,35,false);" + // app1 in a
+ "b\t(1,1,n1,,25,false);" + // app2 in b
+ "c\t(1,1,n1,,39,false);" + // app3 in c
+ "e\t(1,1,n1,,1,false)"; // app4 in e
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 23 preempted from app1, 6 preempted from app2, and nothing preempted
+ // from app3/app4
+ // (After preemption, a has 35 - 23 = 12, b has 25 - 6 = 19, so a:b after
+ // preemption is 1.58, close to 1.50)
+ verify(mDisp, times(23)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(6)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(4))));
+ }
+
+ @Test
+ public void testPreemptionForPriorityAndDisablePreemption()
+ throws IOException {
+ /**
+ * When a cluster has different priorities, each priority has multiple
+ * queues, preemption policy should try to balance resource between queues
+ * with same priority by ratio of their capacities.
+ *
+ * But also we need to make sure preemption disable will be honered
+ * regardless of priority.
+ *
+ * Queue structure is:
+ *
+ *
+ * root
+ * - a (capacity=10), p=1
+ * - b (capacity=15), p=1
+ * - c (capacity=20), p=2
+ * - d (capacity=25), p=2
+ * - e (capacity=30), p=2
+ *
+ */
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[10 100 35 50]){priority=1,disable_preemption=true};" + // a
+ "-b(=[15 100 25 50]){priority=1};" + // b
+ "-c(=[20 100 39 50]){priority=2};" + // c
+ "-d(=[25 100 0 0]){priority=2};" + // d
+ "-e(=[30 100 1 99]){priority=2}"; // e
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t(1,1,n1,,35,false);" + // app1 in a
+ "b\t(1,1,n1,,25,false);" + // app2 in b
+ "c\t(1,1,n1,,39,false);" + // app3 in c
+ "e\t(1,1,n1,,1,false)"; // app4 in e
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // We suppose to preempt some resource from A, but now since queueA
+ // disables preemption, so we need to preempt some resource from B and
+ // some from C even if C has higher priority than A
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(9)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ verify(mDisp, times(19)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(4))));
+ }
+
+ @Test
+ public void testPriorityPreemptionForHierarchicalOfQueues()
+ throws IOException {
+ /**
+ * When a queue has multiple hierarchy and different priorities:
+ *
+ *
+ * root
+ * - a (capacity=30), p=1
+ * - a1 (capacity=40), p=1
+ * - a2 (capacity=60), p=1
+ * - b (capacity=30), p=1
+ * - b1 (capacity=50), p=1
+ * - b1 (capacity=50), p=2
+ * - c (capacity=40), p=2
+ *
+ */
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[30 100 40 50]){priority=1};" + // a
+ "--a1(=[12 100 20 50]){priority=1};" + // a1
+ "--a2(=[18 100 20 50]){priority=1};" + // a2
+ "-b(=[30 100 59 50]){priority=1};" + // b
+ "--b1(=[15 100 30 50]){priority=1};" + // b1
+ "--b2(=[15 100 29 50]){priority=2};" + // b2
+ "-c(=[40 100 1 30]){priority=1}"; // c
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a1\t(1,1,n1,,20,false);" + // app1 in a1
+ "a2\t(1,1,n1,,20,false);" + // app2 in a2
+ "b1\t(1,1,n1,,30,false);" + // app3 in b1
+ "b2\t(1,1,n1,,29,false);" + // app4 in b2
+ "c\t(1,1,n1,,29,false)"; // app5 in c
+
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Preemption should first divide capacities between a / b, and b2 should
+ // get less preemption than b1 (because b2 has higher priority)
+ verify(mDisp, times(5)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ verify(mDisp, times(15)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ verify(mDisp, times(9)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(4))));
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 937000716c6..c777433d476 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -219,7 +220,9 @@ public class TestProportionalCapacityPreemptionPolicy {
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
- verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
+
+ // A will preempt guaranteed-allocated.
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
@@ -587,8 +590,8 @@ public class TestProportionalCapacityPreemptionPolicy {
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
- // correct imbalance between over-capacity queues
- verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ // Will not preempt for over capacity queues
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
@@ -701,7 +704,7 @@ public class TestProportionalCapacityPreemptionPolicy {
public void testZeroGuarOverCap() {
int[][] qData = new int[][] {
// / A B C D E F
- { 200, 100, 0, 99, 0, 100, 100 }, // abs
+ { 200, 100, 0, 100, 0, 100, 100 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 170, 170, 60, 20, 90, 0, 0 }, // used
{ 85, 50, 30, 10, 10, 20, 20 }, // pending
@@ -712,14 +715,14 @@ public class TestProportionalCapacityPreemptionPolicy {
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
- // we verify both that C has priority on B and D (has it has >0 guarantees)
- // and that B and D are force to share their over capacity fairly (as they
- // are both zero-guarantees) hence D sees some of its containers preempted
- verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ // No preemption should happen because zero guaranteed queues should be
+ // treated as always satisfied, they should not preempt from each other.
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC)));
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD)));
}
-
-
@Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
@@ -1231,6 +1234,13 @@ public class TestProportionalCapacityPreemptionPolicy {
when(pq.getChildQueues()).thenReturn(cqs);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(pq.getReadLock()).thenReturn(lock.readLock());
+
+ // Ordering policy
+ QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
+ when(policy.getConfigName()).thenReturn(
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+ when(pq.getQueueOrderingPolicy()).thenReturn(policy);
+ when(pq.getPriority()).thenReturn(Priority.newInstance(0));
for (int i = 0; i < subqueues; ++i) {
pqs.add(pq);
}
@@ -1301,6 +1311,7 @@ public class TestProportionalCapacityPreemptionPolicy {
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(lq.getReadLock()).thenReturn(lock.readLock());
+ when(lq.getPriority()).thenReturn(Priority.newInstance(0));
p.getChildQueues().add(lq);
return lq;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index e31a889c345..1fd455a607a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -95,7 +95,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
}
@Test
- public void testNodePartitionPreemptionRespectMaximumCapacity()
+ public void testNodePartitionPreemptionNotHappenBetweenSatisfiedQueues()
throws IOException {
/**
* Queue structure is:
@@ -114,8 +114,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
* 2 apps in cluster.
* app1 in b and app2 in c.
*
- * app1 uses 90x, and app2 use 10x. After preemption, app2 will preempt 10x
- * from app1 because of max capacity.
+ * app1 uses 90x, and app2 use 10x. We don't expect preemption happen
+ * between them because all of them are satisfied
*/
String labelsConfig =
"=100,true;" + // default partition
@@ -139,9 +139,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- // 30 preempted from app1, 30 preempted from app4, and nothing preempted
- // from app2/app3
- verify(mDisp, times(20)).handle(
+ // No preemption happens
+ verify(mDisp, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
verify(mDisp, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
index 07d1eefa2f6..964a23085dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -46,8 +46,8 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
"root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
"-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
"--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
- "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
- "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+ "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]){priority=2};" + // a2
+ "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0]){priority=1,disable_preemption=true}";
String appsConfig=
//queueName\t(priority,resource,host,expression,#repeat,reserved)
// app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
@@ -75,6 +75,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
checkPendingResource(cs.getQueue("root"), "red", 100);
checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
checkPendingResource(cs.getQueue("root"), "blue", 200);
+ checkPriority(cs.getQueue("root"), 0); // default
// a
checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
@@ -83,6 +84,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
checkPendingResource(cs.getQueue("a"), "red", 0);
checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
checkPendingResource(cs.getQueue("a"), "blue", 200);
+ checkPriority(cs.getQueue("a"), 0); // default
// a1
checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
@@ -91,6 +93,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
checkPendingResource(cs.getQueue("a1"), "red", 0);
checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
checkPendingResource(cs.getQueue("a1"), "blue", 0);
+ checkPriority(cs.getQueue("a1"), 0); // default
// a2
checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
@@ -99,14 +102,18 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
checkPendingResource(cs.getQueue("a2"), "red", 0);
checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
checkPendingResource(cs.getQueue("a2"), "blue", 200);
+ checkPriority(cs.getQueue("a2"), 2);
+ Assert.assertFalse(cs.getQueue("a2").getPreemptionDisabled());
- // b1
+ // b
checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
checkPendingResource(cs.getQueue("b"), "", 0);
checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
checkPendingResource(cs.getQueue("b"), "red", 100);
checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
checkPendingResource(cs.getQueue("b"), "blue", 0);
+ checkPriority(cs.getQueue("b"), 1);
+ Assert.assertTrue(cs.getQueue("b").getPreemptionDisabled());
// Check ignored partitioned containers in queue
Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
index bd9f6155b9f..943b7d2107e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
@@ -46,7 +46,7 @@ public class CapacitySchedulerPreemptionTestBase {
final int GB = 1024;
- Configuration conf;
+ CapacitySchedulerConfiguration conf;
RMNodeLabelsManager mgr;
@@ -54,13 +54,15 @@ public class CapacitySchedulerPreemptionTestBase {
@Before
void setUp() throws Exception {
- conf = new YarnConfiguration();
+ conf = new CapacitySchedulerConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
- conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
+ conf = (CapacitySchedulerConfiguration) TestUtils
+ .getConfigurationWithMultipleQueues(this.conf);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100 * GB);
// Set preemption related configurations
conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
@@ -146,4 +148,18 @@ public class CapacitySchedulerPreemptionTestBase {
Assert.fail();
}
+
+ public void checkNumberOfPreemptionCandidateFromApp(
+ ProportionalCapacityPreemptionPolicy policy, int expected,
+ ApplicationAttemptId attemptId) {
+ int total = 0;
+
+ for (RMContainer rmContainer : policy.getToPreemptContainers().keySet()) {
+ if (rmContainer.getApplicationAttemptId().equals(attemptId)) {
+ ++ total;
+ }
+ }
+
+ Assert.assertEquals(expected, total);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 0ec0260471f..2884f6708aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -110,9 +110,6 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
- when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(
- CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@@ -276,9 +273,6 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 16));
- when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(
- CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@@ -581,9 +575,6 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB));
- when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(
- CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
index 1f87c533ff3..2fa06e84788 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
@@ -594,9 +594,6 @@ public class TestApplicationLimitsByPartition {
.thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability())
.thenReturn(Resources.createResource(16 * GB));
- when(csContext.getNonPartitionedQueueComparator())
- .thenReturn(
- CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
RMContext rmContext = TestUtils.getMockRMContext();
RMContext spyRMContext = spy(rmContext);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
index db6115c4ab6..e64c4df2758 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -22,22 +22,26 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Set;
public class TestCapacitySchedulerSurgicalPreemption
extends CapacitySchedulerPreemptionTestBase {
@@ -167,8 +171,7 @@ public class TestCapacitySchedulerSurgicalPreemption
*
* 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
*
- * 2) app1 submit to queue-a first, it asked 38 * 1G containers
- * We will allocate 20 on n1 and 19 on n2.
+ * 2) app1 submit to queue-b, asks for 1G * 5
*
* 3) app2 submit to queue-c, ask for one 4G container (for AM)
*
@@ -243,4 +246,569 @@ public class TestCapacitySchedulerSurgicalPreemption
rm1.close();
}
+
+ @Test(timeout = 60000)
+ public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities()
+ throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ *
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ *
+ *
+ * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
+ *
+ * 2) app1 submit to queue-b first, it asked 6 * 1G containers
+ * We will allocate 4 on n1 (including AM) and 3 on n2.
+ *
+ * 3) app2 submit to queue-c, ask for one 18G container (for AM)
+ *
+ * After preemption, we should expect:
+ * Preempt 3 containers from app1 and AM of app2 successfully allocated.
+ */
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
+ conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+
+ // Queue c has higher priority than a/b
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
+
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList());
+
+ // Do allocation for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, so the abs-used-cap of b is
+ // 7 / 40 = 17.5% < 20% (guaranteed)
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ // 4 from n1 and 3 from n2
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+ am1.getApplicationAttemptId(), 4);
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+ am1.getApplicationAttemptId(), 3);
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(18 * GB, "app", "user", null, "c");
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+ while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Thread.sleep(10);
+ }
+
+ // Call editSchedule immediately: containers are not selected
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Sleep the timeout interval, we should be able to see containers selected
+ Thread.sleep(1000);
+ editPolicy.editSchedule();
+ Assert.assertEquals(2, editPolicy.getToPreemptContainers().size());
+
+ // Call editSchedule again: selected containers are killed, and new AM
+ // container launched
+ editPolicy.editSchedule();
+
+ // Do allocation till reserved container allocated
+ while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Thread.sleep(10);
+ }
+
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
+
+ rm1.close();
+ }
+
+ @Test(timeout = 300000)
+ public void testPriorityPreemptionRequiresMoveReservation()
+ throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ *
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ *
+ *
+ * 1) 3 nodes in the cluster, 10G for each
+ *
+ * 2) app1 submit to queue-b first, it asked 2G each,
+ * it can get 2G on n1 (AM), 2 * 2G on n2
+ *
+ * 3) app2 submit to queue-c, with 2G AM container (allocated on n3)
+ * app2 requires 9G resource, which will be reserved on n3
+ *
+ * We should expect container unreserved from n3 and allocated on n1/n2
+ */
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
+ conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(true);
+
+ // Queue c has higher priority than a/b
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
+
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
+ MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB);
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+ RMNode rmNode3 = rm1.getRMContext().getRMNodes().get(nm3.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "b");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 2 * GB, 2, new ArrayList());
+
+ // Do allocation for node2 twice
+ for (int i = 0; i < 2; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+
+ // 1 from n1 and 2 from n2
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+ am1.getApplicationAttemptId(), 1);
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+ am1.getApplicationAttemptId(), 2);
+
+ // Submit app2 to queue-c and asks for a 2G container for AM, on n3
+ RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+ // Asks 1 * 9G container
+ am2.allocate("*", 9 * GB, 1, new ArrayList());
+
+ // Do allocation for node3 once
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode3));
+
+ // Make sure container reserved on node3
+ Assert.assertNotNull(
+ cs.getNode(rmNode3.getNodeID()).getReservedContainer());
+
+ // Call editSchedule immediately: nothing happens
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+ editPolicy.editSchedule();
+ Assert.assertNotNull(
+ cs.getNode(rmNode3.getNodeID()).getReservedContainer());
+
+ // Sleep the timeout interval, we should be able to see reserved container
+ // moved to n2 (n1 occupied by AM)
+ Thread.sleep(1000);
+ editPolicy.editSchedule();
+ Assert.assertNull(
+ cs.getNode(rmNode3.getNodeID()).getReservedContainer());
+ Assert.assertNotNull(
+ cs.getNode(rmNode2.getNodeID()).getReservedContainer());
+ Assert.assertEquals(am2.getApplicationAttemptId(), cs.getNode(
+ rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId());
+
+ // Do it again, we should see containers marked to be preempt
+ editPolicy.editSchedule();
+ Assert.assertEquals(2, editPolicy.getToPreemptContainers().size());
+
+ // Call editSchedule again: selected containers are killed
+ editPolicy.editSchedule();
+
+ // Do allocation till reserved container allocated
+ while (schedulerApp2.getLiveContainers().size() < 2) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ Thread.sleep(200);
+ }
+
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
+
+ rm1.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied()
+ throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ *
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ *
+ *
+ * 1) 10 nodes (n0-n9) in the cluster, each of them has 10G.
+ *
+ * 2) app1 submit to queue-b first, it asked 8 * 1G containers
+ * We will allocate 1 container on each of n0-n10
+ *
+ * 3) app2 submit to queue-c, ask for 10 * 10G containers (including AM)
+ *
+ * After preemption, we should expect:
+ * Preempt 7 containers from app1 and usage of app2 is 70%
+ */
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
+ conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+
+ // Queue c has higher priority than a/b
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
+
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM[] mockNMs = new MockNM[10];
+ for (int i = 0; i < 10; i++) {
+ mockNMs[i] = rm1.registerNode("h" + i + ":1234", 10 * GB);
+ }
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ RMNode[] rmNodes = new RMNode[10];
+ for (int i = 0; i < 10; i++) {
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
+ }
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[0]);
+
+ am1.allocate("*", 1 * GB, 8, new ArrayList());
+
+ // Do allocation for nm1-nm8
+ for (int i = 1; i < 9; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+
+ // App1 should have 9 containers now, so the abs-used-cap of b is 9%
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(9, schedulerApp1.getLiveContainers().size());
+ for (int i = 0; i < 9; i++) {
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
+ am1.getApplicationAttemptId(), 1);
+ }
+
+ // Submit app2 to queue-c and asks for a 10G container for AM
+ // Launch AM in NM9
+ RMApp app2 = rm1.submitApp(10 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[9]);
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+ // Ask 10 * 10GB containers
+ am2.allocate("*", 10 * GB, 10, new ArrayList());
+
+ // Do allocation for all nms
+ for (int i = 1; i < 10; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+
+ // Check am2 reserved resource from nm1-nm9
+ for (int i = 1; i < 9; i++) {
+ Assert.assertNotNull("Should reserve on nm-" + i,
+ cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
+ }
+
+ // Sleep the timeout interval, we should be able to see 6 containers selected
+ // 6 (selected) + 1 (allocated) which makes target capacity to 70%
+ Thread.sleep(1000);
+
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+ editPolicy.editSchedule();
+ checkNumberOfPreemptionCandidateFromApp(editPolicy, 6,
+ am1.getApplicationAttemptId());
+
+ // Call editSchedule again: selected containers are killed
+ editPolicy.editSchedule();
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
+
+ // Do allocation for all nms
+ for (int i = 1; i < 10; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 7);
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
+
+ rm1.close();
+ }
+
+ @Test(timeout = 600000)
+ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer()
+ throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ *
+ * Root
+ * / | \
+ * a b c
+ * 45 45 10
+ *
+ *
+ * Priority of queue_a = 1
+ * Priority of queue_b = 2
+ *
+ * 1) 5 nodes (n0-n4) in the cluster, each of them has 4G.
+ *
+ * 2) app1 submit to queue-c first (AM=1G), it asked 4 * 1G containers
+ * We will allocate 1 container on each of n0-n4. AM on n4.
+ *
+ * 3) app2 submit to queue-a, AM container=0.5G, allocated on n0
+ * Ask for 2 * 3.5G containers. (Reserved on n0/n1)
+ *
+ * 4) app2 submit to queue-b, AM container=0.5G, allocated on n2
+ * Ask for 2 * 3.5G containers. (Reserved on n2/n3)
+ *
+ * First we will preempt container on n2 since it is the oldest container of
+ * Highest priority queue (b)
+ */
+
+ // Total preemption = 1G per round, which is 5% of cluster resource (20G)
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+ 0.05f);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
+ conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+
+ // A/B has higher priority
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1);
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2);
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f);
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f);
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f);
+
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM[] mockNMs = new MockNM[5];
+ for (int i = 0; i < 5; i++) {
+ mockNMs[i] = rm1.registerNode("h" + i + ":1234", 4 * GB);
+ }
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ RMNode[] rmNodes = new RMNode[5];
+ for (int i = 0; i < 5; i++) {
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
+ }
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
+
+ am1.allocate("*", 1 * GB, 4, new ArrayList());
+
+ // Do allocation for nm1-nm8
+ for (int i = 0; i < 4; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+
+ // App1 should have 5 containers now, one for each node
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
+ for (int i = 0; i < 5; i++) {
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
+ am1.getApplicationAttemptId(), 1);
+ }
+
+ // Submit app2 to queue-a and asks for a 0.5G container for AM (on n0)
+ RMApp app2 = rm1.submitApp(512, "app", "user", null, "a");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+ // Ask 2 * 3.5GB containers
+ am2.allocate("*", 3 * GB + 512, 2, new ArrayList());
+
+ // Do allocation for n0-n1
+ for (int i = 0; i < 2; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+
+ // Check am2 reserved resource from nm0-nm1
+ for (int i = 0; i < 2; i++) {
+ Assert.assertNotNull("Should reserve on nm-" + i,
+ cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
+ Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
+ .getReservedContainer().getQueueName(), "a");
+ }
+
+ // Submit app3 to queue-b and asks for a 0.5G container for AM (on n2)
+ RMApp app3 = rm1.submitApp(512, "app", "user", null, "b");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
+ FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
+ ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
+
+ // Ask 2 * 3.5GB containers
+ am3.allocate("*", 3 * GB + 512, 2, new ArrayList());
+
+ // Do allocation for n2-n3
+ for (int i = 2; i < 4; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+
+ // Check am2 reserved resource from nm2-nm3
+ for (int i = 2; i < 4; i++) {
+ Assert.assertNotNull("Should reserve on nm-" + i,
+ cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
+ Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
+ .getReservedContainer().getQueueName(), "b");
+ }
+
+ // Sleep the timeout interval, we should be able to see 1 container selected
+ Thread.sleep(1000);
+
+ /* 1st container preempted is on n2 */
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+ editPolicy.editSchedule();
+
+ // We should have one to-preempt container, on node[2]
+ Set selectedToPreempt =
+ editPolicy.getToPreemptContainers().keySet();
+ Assert.assertEquals(1, selectedToPreempt.size());
+ Assert.assertEquals(mockNMs[2].getNodeId(),
+ selectedToPreempt.iterator().next().getAllocatedNode());
+
+ // Call editSchedule again: selected containers are killed
+ editPolicy.editSchedule();
+
+ // Do allocation for all nms
+ for (int i = 0; i < 4; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
+
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
+ waitNumberOfLiveContainersFromApp(schedulerApp3, 2);
+
+ /* 2nd container preempted is on n3 */
+ editPolicy.editSchedule();
+
+ // We should have one to-preempt container, on node[3]
+ selectedToPreempt =
+ editPolicy.getToPreemptContainers().keySet();
+ Assert.assertEquals(1, selectedToPreempt.size());
+ Assert.assertEquals(mockNMs[3].getNodeId(),
+ selectedToPreempt.iterator().next().getAllocatedNode());
+
+ // Call editSchedule again: selected containers are killed
+ editPolicy.editSchedule();
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
+
+ // Do allocation for all nms
+ for (int i = 0; i < 4; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
+ waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
+
+ /* 3rd container preempted is on n0 */
+ editPolicy.editSchedule();
+
+ // We should have one to-preempt container, on node[0]
+ selectedToPreempt =
+ editPolicy.getToPreemptContainers().keySet();
+ Assert.assertEquals(1, selectedToPreempt.size());
+ Assert.assertEquals(mockNMs[0].getNodeId(),
+ selectedToPreempt.iterator().next().getAllocatedNode());
+
+ // Call editSchedule again: selected containers are killed
+ editPolicy.editSchedule();
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
+
+ // Do allocation for all nms
+ for (int i = 0; i < 4; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 2);
+ waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
+
+ /* 4th container preempted is on n1 */
+ editPolicy.editSchedule();
+
+ // We should have one to-preempt container, on node[0]
+ selectedToPreempt =
+ editPolicy.getToPreemptContainers().keySet();
+ Assert.assertEquals(1, selectedToPreempt.size());
+ Assert.assertEquals(mockNMs[1].getNodeId(),
+ selectedToPreempt.iterator().next().getAllocatedNode());
+
+ // Call editSchedule again: selected containers are killed
+ editPolicy.editSchedule();
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
+
+ // Do allocation for all nms
+ for (int i = 0; i < 4; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+ }
+
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 3);
+ waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
+
+ rm1.close();
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 6c2b4e52ba5..4457587ecb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -97,9 +97,6 @@ public class TestChildQueueOrder {
Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
- when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(
- CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index bb400efcd36..dd6b25b78c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -776,4 +776,115 @@ public class TestContainerAllocation {
Resources.createResource(20 * GB), "", true).getMemorySize());
rm1.close();
}
+
+
+ @Test(timeout = 60000)
+ public void testQueuePriorityOrdering() throws Exception {
+ CapacitySchedulerConfiguration newConf =
+ (CapacitySchedulerConfiguration) TestUtils
+ .getConfigurationWithMultipleQueues(conf);
+
+ // Set ordering policy
+ newConf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+
+ // Set maximum capacity of A to 20
+ newConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 20);
+ newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
+ newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2);
+ newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 3);
+
+ MockRM rm1 = new MockRM(newConf);
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 100 * GB);
+
+ // launch an app to queue A, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // launch an app to queue B, AM container should be launched in nm1
+ RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "b");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // launch an app to queue C, AM container should be launched in nm1
+ RMApp app3 = rm1.submitApp(2 * GB, "app", "user", null, "c");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+
+ // Each application asks 10 * 5GB containers
+ am1.allocate("*", 5 * GB, 10, null);
+ am2.allocate("*", 5 * GB, 10, null);
+ am3.allocate("*", 5 * GB, 10, null);
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ FiCaSchedulerApp schedulerApp1 =
+ cs.getApplicationAttempt(am1.getApplicationAttemptId());
+ FiCaSchedulerApp schedulerApp2 =
+ cs.getApplicationAttempt(am2.getApplicationAttemptId());
+ FiCaSchedulerApp schedulerApp3 =
+ cs.getApplicationAttempt(am3.getApplicationAttemptId());
+
+ // container will be allocated to am1
+ // App1 will get 2 container allocated (plus AM container)
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
+
+ // container will be allocated to am1 again,
+ // App1 will get 3 container allocated (plus AM container)
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
+
+ // (Now usages of queues: a=12G (satisfied), b=2G, c=2G)
+
+ // container will be allocated to am2 (since app1 reaches its guaranteed
+ // capacity)
+ // App2 will get 2 container allocated (plus AM container)
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
+
+ // Do this 3 times
+ // container will be allocated to am2 (since app1 reaches its guaranteed
+ // capacity)
+ // App2 will get 2 container allocated (plus AM container)
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(5, schedulerApp2.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
+
+ // (Now usages of queues: a=12G (satisfied), b=22G (satisfied), c=2G))
+
+ // Do this 10 times
+ for (int i = 0; i < 10; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(5, schedulerApp2.getLiveContainers().size());
+ Assert.assertEquals(11, schedulerApp3.getLiveContainers().size());
+
+ // (Now usages of queues: a=12G (satisfied), b=22G (satisfied),
+ // c=52G (satisfied and no pending))
+
+ // Do this 20 times, we can only allocate 3 containers, 1 to A and 3 to B
+ for (int i = 0; i < 20; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(6, schedulerApp2.getLiveContainers().size());
+ Assert.assertEquals(11, schedulerApp3.getLiveContainers().size());
+
+ // (Now usages of queues: a=17G (satisfied), b=27G (satisfied), c=52G))
+
+ rm1.close();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index e7091bf7f50..50dc92ed897 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -176,9 +176,6 @@ public class TestLeafQueue {
thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
- when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(
- CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@@ -415,7 +412,7 @@ public class TestLeafQueue {
"testPolicyRoot" + System.currentTimeMillis();
OrderingPolicy comPol =
- testConf.getOrderingPolicy(tproot);
+ testConf.getAppOrderingPolicy(tproot);
}
@@ -490,16 +487,16 @@ public class TestLeafQueue {
"testPolicyRoot" + System.currentTimeMillis();
OrderingPolicy schedOrder =
- testConf.getOrderingPolicy(tproot);
+ testConf.getAppOrderingPolicy(tproot);
//override default to fair
String policyType = CapacitySchedulerConfiguration.PREFIX + tproot +
"." + CapacitySchedulerConfiguration.ORDERING_POLICY;
testConf.set(policyType,
- CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY);
+ CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY);
schedOrder =
- testConf.getOrderingPolicy(tproot);
+ testConf.getAppOrderingPolicy(tproot);
FairOrderingPolicy fop = (FairOrderingPolicy) schedOrder;
assertFalse(fop.getSizeBasedWeight());
@@ -509,7 +506,7 @@ public class TestLeafQueue {
FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT;
testConf.set(sbwConfig, "true");
schedOrder =
- testConf.getOrderingPolicy(tproot);
+ testConf.getAppOrderingPolicy(tproot);
fop = (FairOrderingPolicy) schedOrder;
assertTrue(fop.getSizeBasedWeight());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 1348f515fad..11fea826219 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -97,9 +97,6 @@ public class TestParentQueue {
Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
- when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(
- CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index c9eb8b37d8c..7aa75645be0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -133,8 +133,6 @@ public class TestReservations {
Resources.createResource(16 * GB, 12));
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 12));
- when(csContext.getNonPartitionedQueueComparator()).thenReturn(
- CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java
new file mode 100644
index 00000000000..e3c108a581e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableTable;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestPriorityUtilizationQueueOrderingPolicy {
+ private List mockCSQueues(String[] queueNames, int[] priorities,
+ float[] utilizations, String partition) {
+ // sanity check
+ assert queueNames != null && priorities != null && utilizations != null
+ && queueNames.length > 0 && queueNames.length == priorities.length
+ && priorities.length == utilizations.length;
+
+ List list = new ArrayList<>();
+ for (int i = 0; i < queueNames.length; i++) {
+ CSQueue q = mock(CSQueue.class);
+ when(q.getQueueName()).thenReturn(queueNames[i]);
+
+ QueueCapacities qc = new QueueCapacities(false);
+ qc.setUsedCapacity(partition, utilizations[i]);
+
+ when(q.getQueueCapacities()).thenReturn(qc);
+ when(q.getPriority()).thenReturn(Priority.newInstance(priorities[i]));
+
+ list.add(q);
+ }
+
+ return list;
+ }
+
+ private void verifyOrder(QueueOrderingPolicy orderingPolicy, String partition,
+ String[] expectedOrder) {
+ Iterator iter = orderingPolicy.getAssignmentIterator(partition);
+ int i = 0;
+ while (iter.hasNext()) {
+ CSQueue q = iter.next();
+ Assert.assertEquals(expectedOrder[i], q.getQueueName());
+ i++;
+ }
+
+ assert i == expectedOrder.length;
+ }
+
+ @Test
+ public void testUtilizationOrdering() {
+ PriorityUtilizationQueueOrderingPolicy policy =
+ new PriorityUtilizationQueueOrderingPolicy(false);
+
+ // Case 1, one queue
+ policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 0 },
+ new float[] { 0.1f }, ""));
+ verifyOrder(policy, "", new String[] { "a" });
+
+ // Case 2, 2 queues
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 0, 0 },
+ new float[] { 0.1f, 0.0f }, ""));
+ verifyOrder(policy, "", new String[] { "b", "a" });
+
+ // Case 3, 3 queues
+ policy.setQueues(
+ mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 0, 0, 0 },
+ new float[] { 0.1f, 0.0f, 0.2f }, ""));
+ verifyOrder(policy, "", new String[] { "b", "a", "c" });
+
+ // Case 4, 3 queues, ignore priority
+ policy.setQueues(
+ mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 },
+ new float[] { 0.1f, 0.0f, 0.2f }, ""));
+ verifyOrder(policy, "", new String[] { "b", "a", "c" });
+
+ // Case 5, 3 queues, look at partition (default)
+ policy.setQueues(
+ mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 },
+ new float[] { 0.1f, 0.0f, 0.2f }, "x"));
+ verifyOrder(policy, "", new String[] { "a", "b", "c" });
+
+ // Case 5, 3 queues, look at partition (x)
+ policy.setQueues(
+ mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 },
+ new float[] { 0.1f, 0.0f, 0.2f }, "x"));
+ verifyOrder(policy, "x", new String[] { "b", "a", "c" });
+
+ // Case 6, 3 queues, with different accessibility to partition
+ List queues = mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 },
+ new float[] { 0.1f, 0.0f, 0.2f }, "x");
+ // a can access "x"
+ when(queues.get(0).getAccessibleNodeLabels()).thenReturn(ImmutableSet.of("x", "y"));
+ // c can access "x"
+ when(queues.get(2).getAccessibleNodeLabels()).thenReturn(ImmutableSet.of("x", "y"));
+ policy.setQueues(queues);
+ verifyOrder(policy, "x", new String[] { "a", "c", "b" });
+ }
+
+ @Test
+ public void testPriorityUtilizationOrdering() {
+ PriorityUtilizationQueueOrderingPolicy policy =
+ new PriorityUtilizationQueueOrderingPolicy(true);
+
+ // Case 1, one queue
+ policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 1 },
+ new float[] { 0.1f }, ""));
+ verifyOrder(policy, "", new String[] { "a" });
+
+ // Case 2, 2 queues, both under utilized, same priority
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 },
+ new float[] { 0.2f, 0.1f }, ""));
+ verifyOrder(policy, "", new String[] { "b", "a" });
+
+ // Case 3, 2 queues, both over utilized, same priority
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 },
+ new float[] { 1.1f, 1.2f }, ""));
+ verifyOrder(policy, "", new String[] { "a", "b" });
+
+ // Case 4, 2 queues, one under and one over, same priority
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 },
+ new float[] { 0.1f, 1.2f }, ""));
+ verifyOrder(policy, "", new String[] { "a", "b" });
+
+ // Case 5, 2 queues, both over utilized, different priority
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
+ new float[] { 1.1f, 1.2f }, ""));
+ verifyOrder(policy, "", new String[] { "b", "a" });
+
+ // Case 6, 2 queues, both under utilized, different priority
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
+ new float[] { 0.1f, 0.2f }, ""));
+ verifyOrder(policy, "", new String[] { "b", "a" });
+
+ // Case 7, 2 queues, one under utilized and one over utilized,
+ // different priority (1)
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
+ new float[] { 0.1f, 1.2f }, ""));
+ verifyOrder(policy, "", new String[] { "a", "b" });
+
+ // Case 8, 2 queues, one under utilized and one over utilized,
+ // different priority (1)
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 },
+ new float[] { 0.1f, 1.2f }, ""));
+ verifyOrder(policy, "", new String[] { "a", "b" });
+
+ // Case 9, 2 queues, one under utilized and one meet, different priority (1)
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
+ new float[] { 0.1f, 1.0f }, ""));
+ verifyOrder(policy, "", new String[] { "a", "b" });
+
+ // Case 10, 2 queues, one under utilized and one meet, different priority (2)
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 },
+ new float[] { 0.1f, 1.0f }, ""));
+ verifyOrder(policy, "", new String[] { "a", "b" });
+
+ // Case 11, 2 queues, one under utilized and one meet, same priority
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 },
+ new float[] { 0.1f, 1.0f }, ""));
+ verifyOrder(policy, "", new String[] { "a", "b" });
+
+ // Case 12, 2 queues, both meet, different priority
+ policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
+ new float[] { 1.0f, 1.0f }, ""));
+ verifyOrder(policy, "", new String[] { "b", "a" });
+
+ // Case 13, 5 queues, different priority
+ policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" },
+ new int[] { 1, 2, 0, 0, 3 },
+ new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, ""));
+ verifyOrder(policy, "", new String[] { "e", "c", "b", "a", "d" });
+
+ // Case 14, 5 queues, different priority, partition default;
+ policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" },
+ new int[] { 1, 2, 0, 0, 3 },
+ new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x"));
+ verifyOrder(policy, "", new String[] { "e", "b", "a", "c", "d" });
+
+ // Case 15, 5 queues, different priority, partition x;
+ policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" },
+ new int[] { 1, 2, 0, 0, 3 },
+ new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x"));
+ verifyOrder(policy, "x", new String[] { "e", "c", "b", "a", "d" });
+
+ // Case 16, 5 queues, different priority, partition x; and different
+ // accessibility
+ List queues = mockCSQueues(new String[] { "a", "b", "c", "d", "e" },
+ new int[] { 1, 2, 0, 0, 3 },
+ new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x");
+ // Only a/d has access to x
+ when(queues.get(0).getAccessibleNodeLabels()).thenReturn(
+ ImmutableSet.of("x"));
+ when(queues.get(3).getAccessibleNodeLabels()).thenReturn(
+ ImmutableSet.of("x"));
+ policy.setQueues(queues);
+ verifyOrder(policy, "x", new String[] { "a", "d", "e", "c", "b" });
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
index 98cfdab46db..37fc3b3474c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
@@ -20,23 +20,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeLabel;
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@@ -157,7 +148,7 @@ public class TestFairOrderingPolicy {
// Define top-level queues
String queuePath = CapacitySchedulerConfiguration.ROOT + ".default";
- csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY);
+ csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY);
csConf.setOrderingPolicyParameter(queuePath,
FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT, "true");
csConf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 0.1f);