YARN-8005. Add unit tests for queue priority with dominant resource calculator. (Zian Chen via wangda)
Change-Id: I17a645f20869a1e5d86fa7a325c93fec908b91dc
(cherry picked from commit 92c5331423
)
This commit is contained in:
parent
f9add5fdd9
commit
090a8a274e
|
@ -18,10 +18,16 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Mockito.never;
|
||||
|
@ -30,8 +36,28 @@ import static org.mockito.Mockito.verify;
|
|||
|
||||
public class TestPreemptionForQueueWithPriorities
|
||||
extends ProportionalCapacityPreemptionPolicyMockFramework {
|
||||
// Initialize resource map
|
||||
private Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
|
||||
// Initialize mandatory resources
|
||||
ResourceInformation memory = ResourceInformation.newInstance(
|
||||
ResourceInformation.MEMORY_MB.getName(),
|
||||
ResourceInformation.MEMORY_MB.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
ResourceInformation vcores = ResourceInformation.newInstance(
|
||||
ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.VCORES.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
|
||||
super.setup();
|
||||
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
|
||||
}
|
||||
|
@ -358,4 +384,125 @@ public class TestPreemptionForQueueWithPriorities
|
|||
getAppAttemptId(4))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityPreemptionWithMandatoryResourceForHierarchicalOfQueues()
|
||||
throws Exception {
|
||||
/**
|
||||
* Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* / \ / \
|
||||
* a1 a2 b1 b2
|
||||
* </pre>
|
||||
*
|
||||
* a2 is underserved and need more resource. b2 will be preemptable.
|
||||
*/
|
||||
|
||||
String labelsConfig = "=100:200,true"; // default partition
|
||||
String nodesConfig = "n1="; // only one node
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending
|
||||
"root(=[100:200 100:200 100:200 100:200]);" + //root
|
||||
"-a(=[50:100 100:200 20:40 60:100]){priority=1};" + // a
|
||||
"--a1(=[10:20 100:200 10:30 30:20]){priority=1};" + // a1
|
||||
"--a2(=[40:80 100:200 10:10 30:80]){priority=1};" + // a2
|
||||
"-b(=[50:100 100:200 80:160 40:100]){priority=1};" + // b
|
||||
"--b1(=[20:40 100:200 20:40 20:70]){priority=2};" + // b1
|
||||
"--b2(=[30:60 100:200 60:120 20:30]){priority=1}";// b2
|
||||
|
||||
String appsConfig =
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a1\t(1,1:4,n1,,10,false);" + // app1 in a1
|
||||
"a2\t(1,1:1,n1,,10,false);" + // app2 in a2
|
||||
"b1\t(1,3:4,n1,,10,false);" + // app3 in b1
|
||||
"b2\t(1,20:40,n1,,3,false)"; // app4 in b2
|
||||
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
|
||||
policy.editSchedule();
|
||||
|
||||
// Preemption should first divide capacities between a / b, and b1 should
|
||||
// get less preemption than b2 (because b1 has higher priority)
|
||||
verify(mDisp, never()).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(1))));
|
||||
verify(mDisp, never()).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(2))));
|
||||
verify(mDisp, never()).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(3))));
|
||||
verify(mDisp, times(2)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(4))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityPreemptionWithMultipleResource()
|
||||
throws Exception {
|
||||
String RESOURCE_1 = "res1";
|
||||
|
||||
riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
|
||||
ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
|
||||
/**
|
||||
* Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* / \
|
||||
* a1 a2
|
||||
* </pre>
|
||||
*
|
||||
* a1 and a2 are using most of resources.
|
||||
* b needs more resources which is under served.
|
||||
*/
|
||||
String labelsConfig =
|
||||
"=100:100:10,true;";
|
||||
String nodesConfig =
|
||||
"n1=;"; // n1 is default partition
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending
|
||||
"root(=[100:100:10 100:100:10 100:100:10 100:100:10]);" + //root
|
||||
"-a(=[50:60:3 100:100:10 80:90:10 30:20:4]){priority=1};" + // a
|
||||
"--a1(=[20:15:3 100:50:10 60:50:10 0]){priority=1};" + // a1
|
||||
"--a2(=[30:45 100:50:10 20:40 30:20:4]){priority=2};" + // a2
|
||||
"-b(=[50:40:7 100:100:10 20:10 30:10:2]){priority=1}"; // b
|
||||
|
||||
String appsConfig =
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a1\t" // app1 in a1
|
||||
+ "(1,6:5:1,n1,,10,false);" +
|
||||
"a2\t" // app2 in a2
|
||||
+ "(1,2:4,n1,,10,false);" +
|
||||
"b\t" // app3 in b
|
||||
+ "(1,2:1,n1,,10,false)";
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
|
||||
policy.editSchedule();
|
||||
|
||||
// Preemption should first divide capacities between a / b, and a2 should
|
||||
// get less preemption than a1 (because a2 has higher priority). More
|
||||
// specifically, a2 will not get preempted since the resource preempted
|
||||
// from a1 can satisfy b already.
|
||||
verify(mDisp, times(7)).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(1))));
|
||||
|
||||
verify(mDisp, never()).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(2))));
|
||||
|
||||
verify(mDisp, never()).handle(argThat(
|
||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||
getAppAttemptId(3))));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue