YARN-10834. Intra-queue preemption: apps that don't use defined custom resource won't be preempted. Contributed by Eric Payne.
This commit is contained in:
parent
51be95e234
commit
f7bcc58e0f
|
@ -290,8 +290,9 @@ public class FifoIntraQueuePreemptionPlugin
|
||||||
|
|
||||||
// Once unallocated resource is 0, we can stop assigning ideal per app.
|
// Once unallocated resource is 0, we can stop assigning ideal per app.
|
||||||
if (Resources.lessThanOrEqual(rc, clusterResource,
|
if (Resources.lessThanOrEqual(rc, clusterResource,
|
||||||
queueReassignableResource, Resources.none()) || rc
|
queueReassignableResource, Resources.none()) ||
|
||||||
.isAnyMajorResourceZeroOrNegative(queueReassignableResource)) {
|
(rc.isAnyMajorResourceZeroOrNegative(queueReassignableResource)
|
||||||
|
&& context.getInQueuePreemptionConservativeDRF())) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,11 +18,14 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -191,6 +194,88 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
|
||||||
getAppAttemptId(1))));
|
getAppAttemptId(1))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
public void testIntraQueuePreemptionFairOrdering3ResourcesWithStrictAndRelaxedDRF()
|
||||||
|
throws IOException {
|
||||||
|
/**
|
||||||
|
* Continue to allow intra-queue preemption when only one of the user's
|
||||||
|
* resources is above the user limit.
|
||||||
|
* Queue structure is:
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* root
|
||||||
|
* / |
|
||||||
|
* a b
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* Guaranteed resource of a and b are 30720:300 and 30720:300 Total cluster
|
||||||
|
* resource = 61440:600.
|
||||||
|
* Scenario: Queue B has one running app using 61720:60 resources with no
|
||||||
|
* pending resources, and one app with no used resources and 30720:30
|
||||||
|
* pending resources.
|
||||||
|
*
|
||||||
|
* The first part of the test is to show what happens when the conservative
|
||||||
|
* DRF property is set. Since the memory is above and the vcores is below
|
||||||
|
* the user limit, only the minimum number of containers is allowed.
|
||||||
|
* In the second part, since conservative DRF is relaxed, all containers
|
||||||
|
* needed are allowed to be preempted (minus the AM size).
|
||||||
|
*/
|
||||||
|
|
||||||
|
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
|
||||||
|
"userlimit_first");
|
||||||
|
conf.set(CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ "root.b." + CapacitySchedulerConfiguration.ORDERING_POLICY, "fair");
|
||||||
|
conf.setBoolean(
|
||||||
|
CapacitySchedulerConfiguration.IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
|
||||||
|
true);
|
||||||
|
String RESOURCE_1 = "res1";
|
||||||
|
riMap.put(RESOURCE_1, ResourceInformation
|
||||||
|
.newInstance(RESOURCE_1, "", 0, ResourceTypes.COUNTABLE, 0,
|
||||||
|
Integer.MAX_VALUE));
|
||||||
|
|
||||||
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
|
|
||||||
|
String labelsConfig = "=61440:600,true;";
|
||||||
|
String nodesConfig = // n1 has no label
|
||||||
|
"n1= res=61440:600";
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending,reserved
|
||||||
|
"root(=[61440:600 61440:600 61440:600 30720:30 0]);" + // root
|
||||||
|
"-a(=[30720:300 61440:600 0:0 0:0 0]);" + // a
|
||||||
|
"-b(=[30720:300 61440:600 61440:60 30720:30 0]);"; // b
|
||||||
|
|
||||||
|
String appsConfig =
|
||||||
|
"b\t" + "(1,1024:1,n1,,60,false,0:0,user1);" + // app1 in b
|
||||||
|
"b\t" + "(1,0:0,n1,,0,false,30720:30,user3);"; // app2 in b
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
Resource ul = Resource.newInstance(30720, 300);
|
||||||
|
when(((LeafQueue)(cs.getQueue("b")))
|
||||||
|
.getResourceLimitForAllUsers(any(String.class), any(Resource.class),
|
||||||
|
any(String.class), any(SchedulingMode.class))
|
||||||
|
).thenReturn(ul);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
verify(mDisp, times(0)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(1))));
|
||||||
|
reset(mDisp);
|
||||||
|
|
||||||
|
conf.setBoolean(
|
||||||
|
CapacitySchedulerConfiguration.IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
|
||||||
|
false);
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
when(((LeafQueue)(cs.getQueue("b")))
|
||||||
|
.getResourceLimitForAllUsers(any(String.class), any(Resource.class),
|
||||||
|
any(String.class), any(SchedulingMode.class))
|
||||||
|
).thenReturn(ul);
|
||||||
|
policy.editSchedule();
|
||||||
|
verify(mDisp, times(29)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(1))));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIntraQueuePreemptionWithDominantVCoreResource()
|
public void testIntraQueuePreemptionWithDominantVCoreResource()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue