[YARN-10613] Config to allow Intra- and Inter-queue preemption to enable/disable conservativeDRF. Contributed by Eric Payne

This commit is contained in:
Jim Brennan 2021-02-25 20:30:42 +00:00
parent 628ccf81ef
commit 4ed7b80b19
7 changed files with 217 additions and 7 deletions

View File

@ -68,4 +68,8 @@ TempQueuePerPartition getQueueByPartition(String queueName,
@Unstable
IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy();
boolean getCrossQueuePreemptionConservativeDRF();
boolean getInQueuePreemptionConservativeDRF();
}

View File

@ -111,7 +111,9 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
.tryPreemptContainerAndDeductResToObtain(rc,
preemptionContext, resToObtainByPartition, c,
clusterResource, selectedCandidates,
totalPreemptionAllowed, false);
totalPreemptionAllowed,
preemptionContext.getCrossQueuePreemptionConservativeDRF()
);
if (!preempted) {
continue;
}
@ -187,7 +189,8 @@ private void preemptAMContainers(Resource clusterResource,
boolean preempted = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, preemptMap,
totalPreemptionAllowed, false);
totalPreemptionAllowed,
preemptionContext.getCrossQueuePreemptionConservativeDRF());
if (preempted) {
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
}
@ -222,7 +225,8 @@ private void preemptFrom(FiCaSchedulerApp app,
// Try to preempt this container
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
selectedContainers, totalPreemptionAllowed, false);
selectedContainers, totalPreemptionAllowed,
preemptionContext.getCrossQueuePreemptionConservativeDRF());
if (!preemptionContext.isObserveOnly()) {
preemptionContext.getRMContext().getDispatcher().getEventHandler()
@ -265,7 +269,8 @@ private void preemptFrom(FiCaSchedulerApp app,
// Try to preempt this container
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
selectedContainers, totalPreemptionAllowed, false);
selectedContainers, totalPreemptionAllowed,
preemptionContext.getCrossQueuePreemptionConservativeDRF());
}
}
}

View File

@ -270,7 +270,8 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
boolean ret = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, selectedCandidates,
totalPreemptedResourceAllowed, true);
totalPreemptedResourceAllowed,
preemptionContext.getInQueuePreemptionConservativeDRF());
// Subtract from respective user's resource usage once a container is
// selected for preemption.

View File

@ -109,6 +109,9 @@ public enum IntraQueuePreemptionOrderPolicy {
private float minimumThresholdForIntraQueuePreemption;
private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy;
private boolean crossQueuePreemptionConservativeDRF;
private boolean inQueuePreemptionConservativeDRF;
// Current configuration
private CapacitySchedulerConfiguration csConfig;
@ -221,6 +224,18 @@ private void updateConfigIfNeeded() {
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY)
.toUpperCase());
crossQueuePreemptionConservativeDRF = config.getBoolean(
CapacitySchedulerConfiguration.
CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
CapacitySchedulerConfiguration.
DEFAULT_CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF);
inQueuePreemptionConservativeDRF = config.getBoolean(
CapacitySchedulerConfiguration.
IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
CapacitySchedulerConfiguration.
DEFAULT_IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF);
candidatesSelectionPolicies = new ArrayList<>();
// Do we need white queue-priority preemption policy?
@ -281,7 +296,11 @@ private void updateConfigIfNeeded() {
"select_based_on_reserved_containers = " +
selectCandidatesForResevedContainers + "\n" +
"additional_res_balance_based_on_reserved_containers = " +
additionalPreemptionBasedOnReservedResource);
additionalPreemptionBasedOnReservedResource + "\n" +
"cross-queue-preemption.conservative-drf = " +
crossQueuePreemptionConservativeDRF + "\n" +
"in-queue-preemption.conservative-drf = " +
inQueuePreemptionConservativeDRF);
csConfig = config;
}
@ -727,4 +746,14 @@ public void addPartitionToUnderServedQueues(String queueName,
public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() {
return intraQueuePreemptionOrderPolicy;
}
@Override
public boolean getCrossQueuePreemptionConservativeDRF() {
return crossQueuePreemptionConservativeDRF;
}
@Override
public boolean getInQueuePreemptionConservativeDRF() {
return inQueuePreemptionConservativeDRF;
}
}

View File

@ -1316,6 +1316,21 @@ public boolean getLazyPreemptionEnabled() {
+ INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy";
public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
/**
* Flag to determine whether or not to preempt containers from apps where some
* used resources are less than the user's user limit.
*/
public static final String CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF =
PREEMPTION_CONFIG_PREFIX + "conservative-drf";
public static final Boolean DEFAULT_CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF =
false;
public static final String IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF =
PREEMPTION_CONFIG_PREFIX + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX +
"conservative-drf";
public static final Boolean DEFAULT_IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF =
true;
/**
* Maximum application for a queue to be used when application per queue is
* not defined.To be consistent with previous version the default value is set

View File

@ -20,7 +20,10 @@
import java.io.IOException;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@ -28,8 +31,10 @@
import org.junit.Test;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -173,4 +178,74 @@ public void test3ResourceTypesInterQueuePreemption() throws IOException {
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@SuppressWarnings("unchecked")
@Test
public void testInterQueuePreemptionWithStrictAndRelaxedDRF()
throws IOException {
/*
* root
* / \ \
* a b c
*
* A / B / C have 33.3 / 33.3 / 33.4 resources
* Total cluster resource have mem=61440, cpu=600
*
* +=================+========================+
* | used in queue a | user limit for queue a |
* +=================+========================+
* | 61440:60 | 20480:200 |
* +=================+========================+
* In this case, the used memory is over the user limit but the used vCores
* is not. If conservative DRF is true, preemptions will not occur.
* If conservative DRF is false (default) preemptions will occur.
*/
String labelsConfig = "=61440:600,true;";
String nodesConfig = "n1= res=61440:600"; // n1 is default partition
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[61440:600 61440:600 61440:600 20480:20 0]);" + // root
"-a(=[20480:200 61440:600 61440:60 0:0 0]);" + // b
"-b(=[20480:200 61440:600 0:0 20480:20 0]);" + // a
"-c(=[20480:200 61440:600 0:0 0:0 0])"; // c
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t" + "(1,1024:1,n1,,60,false,0:0,user1);" + // app1 in a
"b\t" + "(1,0:0,n1,,0,false,20480:20,user2);"; // app2 in b
conf.setBoolean(
CapacitySchedulerConfiguration.CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
true);
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
Resource ul = Resource.newInstance(20480, 20);
when(((LeafQueue)(cs.getQueue("a")))
.getResourceLimitForAllUsers(any(String.class), any(Resource.class),
any(String.class), any(SchedulingMode.class))
).thenReturn(ul);
policy.editSchedule();
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
reset(mDisp);
conf.setBoolean(
CapacitySchedulerConfiguration.CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
false);
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
ul = Resource.newInstance(20480, 20);
when(((LeafQueue)(cs.getQueue("a")))
.getResourceLimitForAllUsers(any(String.class), any(Resource.class),
any(String.class), any(SchedulingMode.class))
).thenReturn(ul);
policy.editSchedule();
verify(mDisp, times(20)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
}

View File

@ -18,15 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -110,6 +115,82 @@ public void testSimpleIntraQueuePreemptionWithVCoreResource()
getAppAttemptId(3))));
}
@SuppressWarnings("unchecked")
@Test
public void testIntraQueuePreemptionFairOrderingWithStrictAndRelaxedDRF()
throws IOException {
/**
* Continue to allow intra-queue preemption when only one of the user's
* resources is above the user limit.
* Queue structure is:
*
* <pre>
* root
* / |
* a b
* </pre>
*
* Guaranteed resource of a and b are 30720:300 and 30720:300 Total cluster
* resource = 61440:600.
* Scenario: Queue B has one running app using 61720:60 resources with no
* pending resources, and one app with no used resources and 30720:30
* pending resources.
*
* The first part of the test is to show what happens when the conservative
* DRF property is set. Since the memory is above and the vcores is below
* the user limit, only the minimum number of containers is allowed.
* In the second part, since conservative DRF is relaxed, all containers
* needed are allowed to be preempted (minus the AM size).
*/
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"userlimit_first");
conf.set(CapacitySchedulerConfiguration.PREFIX
+ "root.b." + CapacitySchedulerConfiguration.ORDERING_POLICY, "fair");
conf.setBoolean(
CapacitySchedulerConfiguration.IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
true);
String labelsConfig = "=61440:600,true;";
String nodesConfig = // n1 has no label
"n1= res=61440:600";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[61440:600 61440:600 61440:600 30720:30 0]);" + // root
"-a(=[30720:300 61440:600 0:0 0:0 0]);" + // a
"-b(=[30720:300 61440:600 61440:60 30720:30 0]);"; // b
String appsConfig =
"b\t" + "(1,1024:1,n1,,60,false,0:0,user1);" + // app1 in b
"b\t" + "(1,0:0,n1,,0,false,30720:30,user3);"; // app2 in b
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
Resource ul = Resource.newInstance(30720, 300);
when(((LeafQueue)(cs.getQueue("b")))
.getResourceLimitForAllUsers(any(String.class), any(Resource.class),
any(String.class), any(SchedulingMode.class))
).thenReturn(ul);
policy.editSchedule();
verify(mDisp, times(6)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
reset(mDisp);
conf.setBoolean(
CapacitySchedulerConfiguration.IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
false);
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
when(((LeafQueue)(cs.getQueue("b")))
.getResourceLimitForAllUsers(any(String.class), any(Resource.class),
any(String.class), any(SchedulingMode.class))
).thenReturn(ul);
policy.editSchedule();
verify(mDisp, times(29)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@Test
public void testIntraQueuePreemptionWithDominantVCoreResource()
throws IOException {