YARN-8379. Improve balancing resources in already satisfied queues by using Capacity Scheduler preemption. Contributed by Zian Chen.

This commit is contained in:
Sunil G 2018-06-28 10:23:31 -07:00
parent 384764cdea
commit 291194302c
15 changed files with 636 additions and 89 deletions

View File

@ -42,6 +42,7 @@ public class AbstractPreemptableResourceCalculator {
protected final ResourceCalculator rc; protected final ResourceCalculator rc;
protected boolean isReservedPreemptionCandidatesSelector; protected boolean isReservedPreemptionCandidatesSelector;
private Resource stepFactor; private Resource stepFactor;
private boolean allowQueuesBalanceAfterAllQueuesSatisfied;
static class TQComparator implements Comparator<TempQueuePerPartition> { static class TQComparator implements Comparator<TempQueuePerPartition> {
private ResourceCalculator rc; private ResourceCalculator rc;
@ -83,15 +84,28 @@ public class AbstractPreemptableResourceCalculator {
* this will be set by different implementation of candidate * this will be set by different implementation of candidate
* selectors, please refer to TempQueuePerPartition#offer for * selectors, please refer to TempQueuePerPartition#offer for
* details. * details.
* @param allowQueuesBalanceAfterAllQueuesSatisfied
* Should resources be preempted from an over-served queue when the
* requesting queues are all at or over their guarantees?
* An example is, there're 10 queues under root, guaranteed resource
* of them are all 10%.
* Assume there're two queues are using resources, queueA uses 10%
* queueB uses 90%. For all queues are guaranteed, but it's not fair
* for queueA.
* We wanna make this behavior can be configured. By default it is
* not allowed.
*
*/ */
public AbstractPreemptableResourceCalculator( public AbstractPreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext, CapacitySchedulerPreemptionContext preemptionContext,
boolean isReservedPreemptionCandidatesSelector) { boolean isReservedPreemptionCandidatesSelector,
boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
context = preemptionContext; context = preemptionContext;
rc = preemptionContext.getResourceCalculator(); rc = preemptionContext.getResourceCalculator();
this.isReservedPreemptionCandidatesSelector = this.isReservedPreemptionCandidatesSelector =
isReservedPreemptionCandidatesSelector; isReservedPreemptionCandidatesSelector;
this.allowQueuesBalanceAfterAllQueuesSatisfied =
allowQueuesBalanceAfterAllQueuesSatisfied;
stepFactor = Resource.newInstance(0, 0); stepFactor = Resource.newInstance(0, 0);
for (ResourceInformation ri : stepFactor.getResources()) { for (ResourceInformation ri : stepFactor.getResources()) {
ri.setValue(1); ri.setValue(1);
@ -193,7 +207,8 @@ public class AbstractPreemptableResourceCalculator {
wQavail = Resources.componentwiseMin(wQavail, unassigned); wQavail = Resources.componentwiseMin(wQavail, unassigned);
Resource wQidle = sub.offer(wQavail, rc, totGuarant, Resource wQidle = sub.offer(wQavail, rc, totGuarant,
isReservedPreemptionCandidatesSelector); isReservedPreemptionCandidatesSelector,
allowQueuesBalanceAfterAllQueuesSatisfied);
Resource wQdone = Resources.subtract(wQavail, wQidle); Resource wQdone = Resources.subtract(wQavail, wQidle);
if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) { if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {

View File

@ -70,6 +70,8 @@ public interface CapacitySchedulerPreemptionContext {
float getMaxAllowableLimitForIntraQueuePreemption(); float getMaxAllowableLimitForIntraQueuePreemption();
long getDefaultMaximumKillWaitTimeout();
@Unstable @Unstable
IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy(); IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy();
} }

View File

@ -151,6 +151,7 @@ public class CapacitySchedulerPreemptionUtils {
Map<String, Resource> resourceToObtainByPartitions, Map<String, Resource> resourceToObtainByPartitions,
RMContainer rmContainer, Resource clusterResource, RMContainer rmContainer, Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
Resource totalPreemptionAllowed, boolean conservativeDRF) { Resource totalPreemptionAllowed, boolean conservativeDRF) {
ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
@ -218,7 +219,7 @@ public class CapacitySchedulerPreemptionUtils {
} }
// Add to preemptMap // Add to preemptMap
addToPreemptMap(preemptMap, attemptId, rmContainer); addToPreemptMap(preemptMap, curCandidates, attemptId, rmContainer);
return true; return true;
} }
@ -230,15 +231,23 @@ public class CapacitySchedulerPreemptionUtils {
return context.getScheduler().getSchedulerNode(nodeId).getPartition(); return context.getScheduler().getSchedulerNode(nodeId).getPartition();
} }
private static void addToPreemptMap( protected static void addToPreemptMap(
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
Set<RMContainer> set = preemptMap.get(appAttemptId); Set<RMContainer> setForToPreempt = preemptMap.get(appAttemptId);
if (null == set) { Set<RMContainer> setForCurCandidates = curCandidates.get(appAttemptId);
set = new HashSet<>(); if (null == setForToPreempt) {
preemptMap.put(appAttemptId, set); setForToPreempt = new HashSet<>();
preemptMap.put(appAttemptId, setForToPreempt);
} }
set.add(containerToPreempt); setForToPreempt.add(containerToPreempt);
if (null == setForCurCandidates) {
setForCurCandidates = new HashSet<>();
curCandidates.put(appAttemptId, setForCurCandidates);
}
setForCurCandidates.add(containerToPreempt);
} }
private static boolean preemptMapContains( private static boolean preemptMapContains(

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -42,19 +43,25 @@ public class FifoCandidatesSelector
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(FifoCandidatesSelector.class); LogFactory.getLog(FifoCandidatesSelector.class);
private PreemptableResourceCalculator preemptableAmountCalculator; private PreemptableResourceCalculator preemptableAmountCalculator;
private boolean allowQueuesBalanceAfterAllQueuesSatisfied;
FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext, FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
boolean includeReservedResource) { boolean includeReservedResource,
boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
super(preemptionContext); super(preemptionContext);
this.allowQueuesBalanceAfterAllQueuesSatisfied =
allowQueuesBalanceAfterAllQueuesSatisfied;
preemptableAmountCalculator = new PreemptableResourceCalculator( preemptableAmountCalculator = new PreemptableResourceCalculator(
preemptionContext, includeReservedResource); preemptionContext, includeReservedResource,
allowQueuesBalanceAfterAllQueuesSatisfied);
} }
@Override @Override
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptionAllowed) { Resource clusterResource, Resource totalPreemptionAllowed) {
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
// Calculate how much resources we need to preempt // Calculate how much resources we need to preempt
preemptableAmountCalculator.computeIdealAllocation(clusterResource, preemptableAmountCalculator.computeIdealAllocation(clusterResource,
totalPreemptionAllowed); totalPreemptionAllowed);
@ -110,7 +117,7 @@ public class FifoCandidatesSelector
boolean preempted = CapacitySchedulerPreemptionUtils boolean preempted = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, .tryPreemptContainerAndDeductResToObtain(rc,
preemptionContext, resToObtainByPartition, c, preemptionContext, resToObtainByPartition, c,
clusterResource, selectedCandidates, clusterResource, selectedCandidates, curCandidates,
totalPreemptionAllowed, false); totalPreemptionAllowed, false);
if (!preempted) { if (!preempted) {
continue; continue;
@ -134,7 +141,7 @@ public class FifoCandidatesSelector
preemptFrom(fc, clusterResource, resToObtainByPartition, preemptFrom(fc, clusterResource, resToObtainByPartition,
skippedAMContainerlist, skippedAMSize, selectedCandidates, skippedAMContainerlist, skippedAMSize, selectedCandidates,
totalPreemptionAllowed); curCandidates, totalPreemptionAllowed);
} }
// Can try preempting AMContainers (still saving atmost // Can try preempting AMContainers (still saving atmost
@ -145,15 +152,15 @@ public class FifoCandidatesSelector
leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL), leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL),
leafQueue.getMaxAMResourcePerQueuePercent()); leafQueue.getMaxAMResourcePerQueuePercent());
preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, preemptAMContainers(clusterResource, selectedCandidates, curCandidates,
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, skippedAMContainerlist, resToObtainByPartition, skippedAMSize,
totalPreemptionAllowed); maxAMCapacityForThisQueue, totalPreemptionAllowed);
} finally { } finally {
leafQueue.getReadLock().unlock(); leafQueue.getReadLock().unlock();
} }
} }
return selectedCandidates; return curCandidates;
} }
/** /**
@ -169,6 +176,7 @@ public class FifoCandidatesSelector
*/ */
private void preemptAMContainers(Resource clusterResource, private void preemptAMContainers(Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
List<RMContainer> skippedAMContainerlist, List<RMContainer> skippedAMContainerlist,
Map<String, Resource> resToObtainByPartition, Resource skippedAMSize, Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) { Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) {
@ -187,7 +195,7 @@ public class FifoCandidatesSelector
boolean preempted = CapacitySchedulerPreemptionUtils boolean preempted = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, preemptMap, resToObtainByPartition, c, clusterResource, preemptMap,
totalPreemptionAllowed, false); curCandidates, totalPreemptionAllowed, false);
if (preempted) { if (preempted) {
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
} }
@ -203,6 +211,7 @@ public class FifoCandidatesSelector
Resource clusterResource, Map<String, Resource> resToObtainByPartition, Resource clusterResource, Map<String, Resource> resToObtainByPartition,
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize, List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers, Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
Resource totalPreemptionAllowed) { Resource totalPreemptionAllowed) {
ApplicationAttemptId appId = app.getApplicationAttemptId(); ApplicationAttemptId appId = app.getApplicationAttemptId();
@ -219,9 +228,10 @@ public class FifoCandidatesSelector
} }
// Try to preempt this container // Try to preempt this container
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( CapacitySchedulerPreemptionUtils
rc, preemptionContext, resToObtainByPartition, c, clusterResource, .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
selectedContainers, totalPreemptionAllowed, false); resToObtainByPartition, c, clusterResource, selectedContainers,
curCandidates, totalPreemptionAllowed, false);
if (!preemptionContext.isObserveOnly()) { if (!preemptionContext.isObserveOnly()) {
preemptionContext.getRMContext().getDispatcher().getEventHandler() preemptionContext.getRMContext().getDispatcher().getEventHandler()
@ -262,9 +272,14 @@ public class FifoCandidatesSelector
} }
// Try to preempt this container // Try to preempt this container
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( CapacitySchedulerPreemptionUtils
rc, preemptionContext, resToObtainByPartition, c, clusterResource, .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
selectedContainers, totalPreemptionAllowed, false); resToObtainByPartition, c, clusterResource, selectedContainers,
curCandidates, totalPreemptionAllowed, false);
} }
} }
public boolean getAllowQueuesBalanceAfterAllQueuesSatisfied() {
return allowQueuesBalanceAfterAllQueuesSatisfied;
}
} }

View File

@ -122,7 +122,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed) { Resource clusterResource, Resource totalPreemptedResourceAllowed) {
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
// 1. Calculate the abnormality within each queue one by one. // 1. Calculate the abnormality within each queue one by one.
computeIntraQueuePreemptionDemand( computeIntraQueuePreemptionDemand(
clusterResource, totalPreemptedResourceAllowed, selectedCandidates); clusterResource, totalPreemptedResourceAllowed, selectedCandidates);
@ -182,7 +182,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
leafQueue.getReadLock().lock(); leafQueue.getReadLock().lock();
for (FiCaSchedulerApp app : apps) { for (FiCaSchedulerApp app : apps) {
preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates, preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
clusterResource, totalPreemptedResourceAllowed, curCandidates, clusterResource, totalPreemptedResourceAllowed,
resToObtainByPartition, rollingResourceUsagePerUser); resToObtainByPartition, rollingResourceUsagePerUser);
} }
} finally { } finally {
@ -191,7 +191,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
} }
} }
return selectedCandidates; return curCandidates;
} }
private void initializeUsageAndUserLimitForCompute(Resource clusterResource, private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
@ -211,6 +211,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
private void preemptFromLeastStarvedApp(LeafQueue leafQueue, private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
FiCaSchedulerApp app, FiCaSchedulerApp app,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed, Resource clusterResource, Resource totalPreemptedResourceAllowed,
Map<String, Resource> resToObtainByPartition, Map<String, Resource> resToObtainByPartition,
Map<String, Resource> rollingResourceUsagePerUser) { Map<String, Resource> rollingResourceUsagePerUser) {
@ -270,7 +271,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
boolean ret = CapacitySchedulerPreemptionUtils boolean ret = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, selectedCandidates, resToObtainByPartition, c, clusterResource, selectedCandidates,
totalPreemptedResourceAllowed, true); curCandidates, totalPreemptedResourceAllowed, true);
// Subtract from respective user's resource usage once a container is // Subtract from respective user's resource usage once a container is
// selected for preemption. // selected for preemption.

View File

@ -48,11 +48,14 @@ public class PreemptableResourceCalculator
* @param isReservedPreemptionCandidatesSelector this will be set by * @param isReservedPreemptionCandidatesSelector this will be set by
* different implementation of candidate selectors, please refer to * different implementation of candidate selectors, please refer to
* TempQueuePerPartition#offer for details. * TempQueuePerPartition#offer for details.
* @param allowQueuesBalanceAfterAllQueuesSatisfied
*/ */
public PreemptableResourceCalculator( public PreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext, CapacitySchedulerPreemptionContext preemptionContext,
boolean isReservedPreemptionCandidatesSelector) { boolean isReservedPreemptionCandidatesSelector,
super(preemptionContext, isReservedPreemptionCandidatesSelector); boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
super(preemptionContext, isReservedPreemptionCandidatesSelector,
allowQueuesBalanceAfterAllQueuesSatisfied);
} }
/** /**

View File

@ -34,6 +34,7 @@ import java.util.Set;
public abstract class PreemptionCandidatesSelector { public abstract class PreemptionCandidatesSelector {
protected CapacitySchedulerPreemptionContext preemptionContext; protected CapacitySchedulerPreemptionContext preemptionContext;
protected ResourceCalculator rc; protected ResourceCalculator rc;
private long maximumKillWaitTime = -1;
PreemptionCandidatesSelector( PreemptionCandidatesSelector(
CapacitySchedulerPreemptionContext preemptionContext) { CapacitySchedulerPreemptionContext preemptionContext) {
@ -77,4 +78,14 @@ public abstract class PreemptionCandidatesSelector {
}); });
} }
public long getMaximumKillWaitTimeMs() {
if (maximumKillWaitTime > 0) {
return maximumKillWaitTime;
}
return preemptionContext.getDefaultMaximumKillWaitTimeout();
}
public void setMaximumKillWaitTime(long maximumKillWaitTime) {
this.maximumKillWaitTime = maximumKillWaitTime;
}
} }

View File

@ -131,6 +131,8 @@ public class ProportionalCapacityPreemptionPolicy
private List<PreemptionCandidatesSelector> candidatesSelectionPolicies; private List<PreemptionCandidatesSelector> candidatesSelectionPolicies;
private Set<String> allPartitions; private Set<String> allPartitions;
private Set<String> leafQueueNames; private Set<String> leafQueueNames;
Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
Set<RMContainer>>> pcsMap;
// Preemptable Entities, synced from scheduler at every run // Preemptable Entities, synced from scheduler at every run
private Map<String, PreemptableQueue> preemptableQueues; private Map<String, PreemptableQueue> preemptableQueues;
@ -249,7 +251,21 @@ public class ProportionalCapacityPreemptionPolicy
// initialize candidates preemption selection policies // initialize candidates preemption selection policies
candidatesSelectionPolicies.add(new FifoCandidatesSelector(this, candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
additionalPreemptionBasedOnReservedResource)); additionalPreemptionBasedOnReservedResource, false));
// Do we need to do preemption to balance queue even after queues get satisfied?
boolean isPreemptionToBalanceRequired = config.getBoolean(
CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED);
long maximumKillWaitTimeForPreemptionToQueueBalance = config.getLong(
CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
CapacitySchedulerConfiguration.DEFAULT_MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION);
if (isPreemptionToBalanceRequired) {
PreemptionCandidatesSelector selector = new FifoCandidatesSelector(this,
false, true);
selector.setMaximumKillWaitTime(maximumKillWaitTimeForPreemptionToQueueBalance);
candidatesSelectionPolicies.add(selector);
}
// Do we need to specially consider intra queue // Do we need to specially consider intra queue
boolean isIntraQueuePreemptionEnabled = config.getBoolean( boolean isIntraQueuePreemptionEnabled = config.getBoolean(
@ -282,7 +298,8 @@ public class ProportionalCapacityPreemptionPolicy
"select_based_on_reserved_containers = " + "select_based_on_reserved_containers = " +
selectCandidatesForResevedContainers + "\n" + selectCandidatesForResevedContainers + "\n" +
"additional_res_balance_based_on_reserved_containers = " + "additional_res_balance_based_on_reserved_containers = " +
additionalPreemptionBasedOnReservedResource); additionalPreemptionBasedOnReservedResource + "\n" +
"Preemption-to-balance-queue-enabled = " + isPreemptionToBalanceRequired);
csConfig = config; csConfig = config;
} }
@ -308,27 +325,41 @@ public class ProportionalCapacityPreemptionPolicy
} }
private void preemptOrkillSelectedContainerAfterWait( private void preemptOrkillSelectedContainerAfterWait(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
long currentTime) { Set<RMContainer>>> toPreemptPerSelector, long currentTime) {
int toPreemptCount = 0;
for (Map<ApplicationAttemptId, Set<RMContainer>> containers :
toPreemptPerSelector.values()) {
toPreemptCount += containers.size();
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug(
"Starting to preempt containers for selectedCandidates and size:" "Starting to preempt containers for selectedCandidates and size:"
+ selectedCandidates.size()); + toPreemptCount);
} }
// preempt (or kill) the selected containers // preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates // We need toPreemptPerSelector here to match list of containers to
// its selector so that we can get custom timeout per selector when
// checking if current container should be killed or not
for (Map.Entry<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
Set<RMContainer>>> pc : toPreemptPerSelector
.entrySet()) { .entrySet()) {
Map<ApplicationAttemptId, Set<RMContainer>> cMap = pc.getValue();
if (cMap.size() > 0) {
for (Map.Entry<ApplicationAttemptId,
Set<RMContainer>> e : cMap.entrySet()) {
ApplicationAttemptId appAttemptId = e.getKey(); ApplicationAttemptId appAttemptId = e.getKey();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Send to scheduler: in app=" + appAttemptId LOG.debug("Send to scheduler: in app=" + appAttemptId
+ " #containers-to-be-preemptionCandidates=" + e.getValue().size()); + " #containers-to-be-preemptionCandidates=" + e.getValue().size());
} }
for (RMContainer container : e.getValue()) { for (RMContainer container : e.getValue()) {
// if we tried to preempt this for more than maxWaitTime // if we tried to preempt this for more than maxWaitTime, this
// should be based on custom timeout per container per selector
if (preemptionCandidates.get(container) != null if (preemptionCandidates.get(container) != null
&& preemptionCandidates.get(container) && preemptionCandidates.get(container)
+ maxWaitTime <= currentTime) { + pc.getKey().getMaximumKillWaitTimeMs() <= currentTime) {
// kill it // kill it
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container, new ContainerPreemptEvent(appAttemptId, container,
@ -350,6 +381,8 @@ public class ProportionalCapacityPreemptionPolicy
} }
} }
} }
}
}
private void syncKillableContainersFromScheduler() { private void syncKillableContainersFromScheduler() {
// sync preemptable entities from scheduler // sync preemptable entities from scheduler
@ -438,6 +471,8 @@ public class ProportionalCapacityPreemptionPolicy
// queue and each application // queue and each application
Map<ApplicationAttemptId, Set<RMContainer>> toPreempt = Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
new HashMap<>(); new HashMap<>();
Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
Set<RMContainer>>> toPreemptPerSelector = new HashMap<>();;
for (PreemptionCandidatesSelector selector : for (PreemptionCandidatesSelector selector :
candidatesSelectionPolicies) { candidatesSelectionPolicies) {
long startTime = 0; long startTime = 0;
@ -447,20 +482,27 @@ public class ProportionalCapacityPreemptionPolicy
selector.getClass().getName())); selector.getClass().getName()));
startTime = clock.getTime(); startTime = clock.getTime();
} }
toPreempt = selector.selectCandidates(toPreempt, Map<ApplicationAttemptId, Set<RMContainer>> curCandidates =
clusterResources, totalPreemptionAllowed); selector.selectCandidates(toPreempt, clusterResources,
totalPreemptionAllowed);
toPreemptPerSelector.putIfAbsent(selector, curCandidates);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(MessageFormat LOG.debug(MessageFormat
.format("{0} uses {1} millisecond to run", .format("{0} uses {1} millisecond to run",
selector.getClass().getName(), clock.getTime() - startTime)); selector.getClass().getName(), clock.getTime() - startTime));
int totalSelected = 0; int totalSelected = 0;
int curSelected = 0;
for (Set<RMContainer> set : toPreempt.values()) { for (Set<RMContainer> set : toPreempt.values()) {
totalSelected += set.size(); totalSelected += set.size();
} }
for (Set<RMContainer> set : curCandidates.values()) {
curSelected += set.size();
}
LOG.debug(MessageFormat LOG.debug(MessageFormat
.format("So far, total {0} containers selected to be preempted", .format("So far, total {0} containers selected to be preempted, {1}"
totalSelected)); + " containers selected this round\n",
totalSelected, curSelected));
} }
} }
@ -483,8 +525,10 @@ public class ProportionalCapacityPreemptionPolicy
long currentTime = clock.getTime(); long currentTime = clock.getTime();
pcsMap = toPreemptPerSelector;
// preempt (or kill) the selected containers // preempt (or kill) the selected containers
preemptOrkillSelectedContainerAfterWait(toPreempt, currentTime); preemptOrkillSelectedContainerAfterWait(toPreemptPerSelector, currentTime);
// cleanup staled preemption candidates // cleanup staled preemption candidates
cleanupStaledPreemptionCandidates(currentTime); cleanupStaledPreemptionCandidates(currentTime);
@ -689,6 +733,12 @@ public class ProportionalCapacityPreemptionPolicy
return queueToPartitions; return queueToPartitions;
} }
@VisibleForTesting
Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
Set<RMContainer>>> getToPreemptCandidatesPerSelector() {
return pcsMap;
}
@Override @Override
public int getClusterMaxApplicationPriority() { public int getClusterMaxApplicationPriority() {
return scheduler.getMaxClusterLevelAppPriority().getPriority(); return scheduler.getMaxClusterLevelAppPriority().getPriority();
@ -730,4 +780,9 @@ public class ProportionalCapacityPreemptionPolicy
public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() { public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() {
return intraQueuePreemptionOrderPolicy; return intraQueuePreemptionOrderPolicy;
} }
@Override
public long getDefaultMaximumKillWaitTimeout() {
return maxWaitTime;
}
} }

View File

@ -380,6 +380,7 @@ public class QueuePriorityContainerCandidateSelector
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource clusterResource,
Resource totalPreemptedResourceAllowed) { Resource totalPreemptedResourceAllowed) {
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
// Initialize digraph from queues // Initialize digraph from queues
// TODO (wangda): only do this when queue refreshed. // TODO (wangda): only do this when queue refreshed.
priorityDigraph.clear(); priorityDigraph.clear();
@ -388,7 +389,7 @@ public class QueuePriorityContainerCandidateSelector
// When all queues are set to same priority, or priority is not respected, // When all queues are set to same priority, or priority is not respected,
// direct return. // direct return.
if (priorityDigraph.isEmpty()) { if (priorityDigraph.isEmpty()) {
return selectedCandidates; return curCandidates;
} }
// Save parameters to be shared by other methods // Save parameters to be shared by other methods
@ -478,13 +479,9 @@ public class QueuePriorityContainerCandidateSelector
.getReservedResource()); .getReservedResource());
} }
Set<RMContainer> containers = selectedCandidates.get( // Add to preemptMap
c.getApplicationAttemptId()); CapacitySchedulerPreemptionUtils.addToPreemptMap(selectedCandidates,
if (null == containers) { curCandidates, c.getApplicationAttemptId(), c);
containers = new HashSet<>();
selectedCandidates.put(c.getApplicationAttemptId(), containers);
}
containers.add(c);
// Update totalPreemptionResourceAllowed // Update totalPreemptionResourceAllowed
Resources.subtractFrom(totalPreemptedResourceAllowed, Resources.subtractFrom(totalPreemptedResourceAllowed,
@ -504,7 +501,6 @@ public class QueuePriorityContainerCandidateSelector
} }
} }
} }
return curCandidates;
return selectedCandidates;
} }
} }

View File

@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -63,7 +62,7 @@ public class ReservedContainerCandidatesSelector
CapacitySchedulerPreemptionContext preemptionContext) { CapacitySchedulerPreemptionContext preemptionContext) {
super(preemptionContext); super(preemptionContext);
preemptableAmountCalculator = new PreemptableResourceCalculator( preemptableAmountCalculator = new PreemptableResourceCalculator(
preemptionContext, true); preemptionContext, true, false);
} }
@Override @Override
@ -71,6 +70,7 @@ public class ReservedContainerCandidatesSelector
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource clusterResource,
Resource totalPreemptedResourceAllowed) { Resource totalPreemptedResourceAllowed) {
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
// Calculate how much resources we need to preempt // Calculate how much resources we need to preempt
preemptableAmountCalculator.computeIdealAllocation(clusterResource, preemptableAmountCalculator.computeIdealAllocation(clusterResource,
totalPreemptedResourceAllowed); totalPreemptedResourceAllowed);
@ -101,14 +101,10 @@ public class ReservedContainerCandidatesSelector
selectedCandidates, totalPreemptedResourceAllowed, false); selectedCandidates, totalPreemptedResourceAllowed, false);
if (null != preemptionResult) { if (null != preemptionResult) {
for (RMContainer c : preemptionResult.selectedContainers) { for (RMContainer c : preemptionResult.selectedContainers) {
ApplicationAttemptId appId = c.getApplicationAttemptId(); // Add to preemptMap
Set<RMContainer> containers = selectedCandidates.get(appId); CapacitySchedulerPreemptionUtils.addToPreemptMap(selectedCandidates,
if (null == containers) { curCandidates, c.getApplicationAttemptId(), c);
containers = new HashSet<>();
selectedCandidates.put(appId, containers);
}
containers.add(c);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(this.getClass().getName() + " Marked container=" + c LOG.debug(this.getClass().getName() + " Marked container=" + c
.getContainerId() + " from queue=" + c.getQueueName() .getContainerId() + " from queue=" + c.getQueueName()
@ -118,7 +114,7 @@ public class ReservedContainerCandidatesSelector
} }
} }
return selectedCandidates; return curCandidates;
} }
private Resource getPreemptableResource(String queueName, private Resource getPreemptableResource(String queueName,

View File

@ -138,7 +138,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
// This function "accepts" all the resources it can (pending) and return // This function "accepts" all the resources it can (pending) and return
// the unused ones // the unused ones
Resource offer(Resource avail, ResourceCalculator rc, Resource offer(Resource avail, ResourceCalculator rc,
Resource clusterResource, boolean considersReservedResource) { Resource clusterResource, boolean considersReservedResource,
boolean allowQueueBalanceAfterAllSafisfied) {
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
Resources.subtract(getMax(), idealAssigned), Resources.subtract(getMax(), idealAssigned),
Resource.newInstance(0, 0)); Resource.newInstance(0, 0));
@ -179,7 +180,10 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
// leaf queues. Such under-utilized leaf queue could preemption resources // leaf queues. Such under-utilized leaf queue could preemption resources
// from over-utilized leaf queue located at other hierarchies. // from over-utilized leaf queue located at other hierarchies.
// Allow queues can continue grow and balance even if all queues are satisfied.
if (!allowQueueBalanceAfterAllSafisfied) {
accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted);
}
// accepted so far contains the "quota acceptable" amount, we now filter by // accepted so far contains the "quota acceptable" amount, we now filter by
// locality acceptable // locality acceptable

View File

@ -1459,6 +1459,23 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
+ INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy"; + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy";
public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first"; public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
/**
* Should we allow queues continue grow after all queue reaches their
* guaranteed capacity.
*/
public static final String PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED =
PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.enabled";
public static final boolean DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED = false;
/**
* How long we will wait to balance queues, by default it is 5 mins.
*/
public static final String MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION =
PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.max-wait-before-kill";
public static final long
DEFAULT_MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION =
300 * 1000;
/** /**
* Maximum application for a queue to be used when application per queue is * Maximum application for a queue to be used when application per queue is
* not defined.To be consistent with previous version the default value is set * not defined.To be consistent with previous version the default value is set

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@ -538,4 +539,61 @@ public class TestPreemptionForQueueWithPriorities
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1)))); getAppAttemptId(1))));
} }
@Test
public void testPriorityPreemptionForBalanceBetweenSatisfiedQueues()
throws IOException {
/**
* All queues are beyond guarantee, c has higher priority than b.
* c ask for more resource, and there is no idle left, c should preempt
* some resource from b but wont let b under its guarantee.
*
* Queue structure is:
*
* <pre>
* root
* / | \
* a b c
* </pre>
*
* For priorities
* - a=1
* - b=1
* - c=2
*
*/
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[30 100 0 0]){priority=1};" + // a
"-b(=[30 100 40 50]){priority=1};" + // b
"-c(=[40 100 60 25]){priority=2}"; // c
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"b\t(1,1,n1,,40,false);" + // app1 in b
"c\t(1,1,n1,,60,false)"; // app2 in c
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
boolean isPreemptionToBalanceRequired = true;
newConf.setBoolean(
CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
isPreemptionToBalanceRequired);
when(cs.getConfiguration()).thenReturn(newConf);
policy.editSchedule();
// IdealAssigned b: 30 c: 70. initIdealAssigned: b: 30 c: 40, even though
// b and c has same relativeAssigned=1.0f(idealAssigned / guaranteed),
// since c has higher priority, c will be put in mostUnderServedQueue and
// get all remain 30 capacity.
verify(mDisp, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
} }

View File

@ -0,0 +1,254 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Test;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
extends ProportionalCapacityPreemptionPolicyMockFramework {
@Test
public void testPreemptionToBalanceDisabled() throws IOException {
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[30 100 10 30]);" + // a
"-b(=[30 100 40 30]);" + // b
"-c(=[30 100 50 30]);" + // c
"-d(=[10 100 0 0])"; // d
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,1,n1,,10,false);" + // app1 in a
"b\t(1,1,n1,,40,false);" + // app2 in b
"c\t(1,1,n1,,50,false)"; // app3 in c
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// I_A: A:30 B:35 C:35, preempt 5 from B and 15 from C to A
verify(mDisp, times(5)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
assertEquals(30, policy.getQueuePartitions().get("a")
.get("").getIdealAssigned().getMemorySize());
assertEquals(35, policy.getQueuePartitions().get("b")
.get("").getIdealAssigned().getMemorySize());
assertEquals(35, policy.getQueuePartitions().get("c")
.get("").getIdealAssigned().getMemorySize());
}
@Test
public void testPreemptionToBalanceEnabled() throws IOException {
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[30 100 10 30]);" + // a
"-b(=[30 100 40 30]);" + // b
"-c(=[30 100 50 30]);" + // c
"-d(=[10 100 0 0])"; // d
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,1,n1,,10,false);" + // app1 in a
"b\t(1,1,n1,,40,false);" + // app2 in b
"c\t(1,1,n1,,50,false)"; // app3 in c
// enable preempt to balance and ideal assignment will change.
boolean isPreemptionToBalanceEnabled = true;
conf.setBoolean(
CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
isPreemptionToBalanceEnabled);
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// I_A: A:33 B:33 C:33, preempt 7 from B and 17 from C to A
verify(mDisp, times(7)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, times(17)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
assertEquals(33, policy.getQueuePartitions().get("a")
.get("").getIdealAssigned().getMemorySize());
assertEquals(33, policy.getQueuePartitions().get("b")
.get("").getIdealAssigned().getMemorySize());
assertEquals(33, policy.getQueuePartitions().get("c")
.get("").getIdealAssigned().getMemorySize());
}
@Test
public void testPreemptionToBalanceUsedPlusPendingLessThanGuaranteed()
throws IOException{
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[30 100 10 6]);" + // a
"-b(=[30 100 40 30]);" + // b
"-c(=[30 100 50 30]);" + // c
"-d(=[10 100 0 0])"; // d
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,1,n1,,10,false);" + // app1 in a
"b\t(1,1,n1,,40,false);" + // app2 in b
"c\t(1,1,n1,,50,false)"; // app3 in c
boolean isPreemptionToBalanceEnabled = true;
conf.setBoolean(
CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
isPreemptionToBalanceEnabled);
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// I_A: A:15 B:42 C:43, preempt 7 from B and 17 from C to A
verify(mDisp, times(8)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
assertEquals(16, policy.getQueuePartitions().get("a")
.get("").getIdealAssigned().getMemorySize());
assertEquals(42, policy.getQueuePartitions().get("b")
.get("").getIdealAssigned().getMemorySize());
assertEquals(42, policy.getQueuePartitions().get("c")
.get("").getIdealAssigned().getMemorySize());
}
@Test
public void testPreemptionToBalanceWithVcoreResource() throws IOException {
Logger.getRootLogger().setLevel(Level.DEBUG);
String labelsConfig = "=100:100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100:100 100:100 100:100 120:140]);" + //root
"-a(=[60:60 100:100 40:40 70:40]);" + // a
"-b(=[40:40 100:100 60:60 50:100])"; // b
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,1:1,n1,,40,false);" + // app1 in a
"b\t(1,1:1,n1,,60,false)"; // app2 in b
boolean isPreemptionToBalanceEnabled = true;
conf.setBoolean(
CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
isPreemptionToBalanceEnabled);
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
policy.editSchedule();
// 21 containers will be preempted here
verify(mDisp, times(21)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.
IsPreemptionRequestFor(getAppAttemptId(2))));
assertEquals(60, policy.getQueuePartitions().get("a")
.get("").getIdealAssigned().getMemorySize());
assertEquals(60, policy.getQueuePartitions().get("a")
.get("").getIdealAssigned().getVirtualCores());
assertEquals(40, policy.getQueuePartitions().get("b")
.get("").getIdealAssigned().getMemorySize());
assertEquals(40, policy.getQueuePartitions().get("b")
.get("").getIdealAssigned().getVirtualCores());
}
@Test
public void testPreemptionToBalanceWithConfiguredTimeout() throws IOException {
Logger.getRootLogger().setLevel(Level.DEBUG);
String labelsConfig = "=100:100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100:100 100:100 100:100 120:140]);" + //root
"-a(=[60:60 100:100 40:40 70:40]);" + // a
"-b(=[40:40 100:100 60:60 50:100])"; // b
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,1:1,n1,,40,false);" + // app1 in a
"b\t(1,1:1,n1,,60,false)"; // app2 in b
boolean isPreemptionToBalanceEnabled = true;
conf.setBoolean(
CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
isPreemptionToBalanceEnabled);
final long FB_MAX_BEFORE_KILL = 60 *1000;
conf.setLong(
CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
FB_MAX_BEFORE_KILL);
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
policy.editSchedule();
Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
Set<RMContainer>>> pcps= policy.getToPreemptCandidatesPerSelector();
String FIFO_CANDIDATE_SELECTOR = "FifoCandidatesSelector";
boolean hasFifoSelector = false;
for (Map.Entry<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
Set<RMContainer>>> pc : pcps.entrySet()) {
if (pc.getKey().getClass().getSimpleName().equals(FIFO_CANDIDATE_SELECTOR)) {
FifoCandidatesSelector pcs = (FifoCandidatesSelector) pc.getKey();
if (pcs.getAllowQueuesBalanceAfterAllQueuesSatisfied() == true) {
hasFifoSelector = true;
assertEquals(pcs.getMaximumKillWaitTimeMs(), FB_MAX_BEFORE_KILL);
}
}
}
assertEquals(hasFifoSelector, true);
// 21 containers will be preempted here
verify(mDisp, times(21)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.
IsPreemptionRequestFor(getAppAttemptId(2))));
assertEquals(60, policy.getQueuePartitions().get("a")
.get("").getIdealAssigned().getMemorySize());
assertEquals(60, policy.getQueuePartitions().get("a")
.get("").getIdealAssigned().getVirtualCores());
assertEquals(40, policy.getQueuePartitions().get("b")
.get("").getIdealAssigned().getMemorySize());
assertEquals(40, policy.getQueuePartitions().get("b")
.get("").getIdealAssigned().getVirtualCores());
}
}

View File

@ -1111,5 +1111,116 @@ public class TestCapacitySchedulerSurgicalPreemption
rm1.close(); rm1.close();
} }
@Test(timeout = 600000)
public void testPreemptionToBalanceWithCustomTimeout() throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
*
* 2) app1 submit to queue-b, asks for 1G * 5
*
* 3) app2 submit to queue-c, ask for one 4G container (for AM)
*
* After preemption, we should expect:
* 1. Preempt 4 containers from app1
* 2. the selected containers will be killed after configured timeout.
* 3. AM of app2 successfully allocated.
*/
conf.setBoolean(
CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
true);
conf.setLong(
CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
20*1000);
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
this.conf);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 38, new ArrayList<ContainerId>());
// Do allocation for node1/node2
for (int i = 0; i < 38; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 39 containers now
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(39, schedulerApp1.getLiveContainers().size());
// 20 from n1 and 19 from n2
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
am1.getApplicationAttemptId(), 20);
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
am1.getApplicationAttemptId(), 19);
// Submit app2 to queue-c and asks for a 4G container for AM
RMApp app2 = rm1.submitApp(4 * GB, "app", "user", null, "c");
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Call editSchedule: containers are selected to be preemption candidate
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
Assert.assertEquals(4, editPolicy.getToPreemptContainers().size());
// check live containers immediately, nothing happen
Assert.assertEquals(39, schedulerApp1.getLiveContainers().size());
Thread.sleep(20*1000);
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 35);
// Call allocation, containers are reserved
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
// Call editSchedule twice and allocation once, container should get allocated
editPolicy.editSchedule();
editPolicy.editSchedule();
int tick = 0;
while (schedulerApp2.getLiveContainers().size() != 1 && tick < 10) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
tick++;
Thread.sleep(100);
}
waitNumberOfReservedContainersFromApp(schedulerApp2, 0);
rm1.close();
}
} }