YARN-4822. Refactor existing Preemption Policy of CS for easier adding new approach to select preemption candidates. Contributed by Wangda Tan
This commit is contained in:
parent
0852e44110
commit
f1f441b80f
|
@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResour
|
||||||
|
|
||||||
public interface SchedulingEditPolicy {
|
public interface SchedulingEditPolicy {
|
||||||
|
|
||||||
public void init(Configuration config, RMContext context,
|
void init(Configuration config, RMContext context,
|
||||||
PreemptableResourceScheduler scheduler);
|
PreemptableResourceScheduler scheduler);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -31,10 +31,10 @@ public interface SchedulingEditPolicy {
|
||||||
* allowed to track containers and affect the scheduler. The "actions"
|
* allowed to track containers and affect the scheduler. The "actions"
|
||||||
* performed are passed back through an EventHandler.
|
* performed are passed back through an EventHandler.
|
||||||
*/
|
*/
|
||||||
public void editSchedule();
|
void editSchedule();
|
||||||
|
|
||||||
public long getMonitoringInterval();
|
long getMonitoringInterval();
|
||||||
|
|
||||||
public String getPolicyName();
|
String getPolicyName();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,10 +45,6 @@ public class SchedulingMonitor extends AbstractService {
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMonitorInterval() {
|
|
||||||
return monitorInterval;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
|
public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
|
||||||
return scheduleEditPolicy;
|
return scheduleEditPolicy;
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
interface CapacitySchedulerPreemptionContext {
|
||||||
|
CapacityScheduler getScheduler();
|
||||||
|
|
||||||
|
TempQueuePerPartition getQueueByPartition(String queueName,
|
||||||
|
String partition);
|
||||||
|
|
||||||
|
Collection<TempQueuePerPartition> getQueuePartitions(String queueName);
|
||||||
|
|
||||||
|
ResourceCalculator getResourceCalculator();
|
||||||
|
|
||||||
|
RMContext getRMContext();
|
||||||
|
|
||||||
|
boolean isObserveOnly();
|
||||||
|
|
||||||
|
Set<ContainerId> getKillableContainers();
|
||||||
|
|
||||||
|
double getMaxIgnoreOverCapacity();
|
||||||
|
|
||||||
|
double getNaturalTerminationFactor();
|
||||||
|
|
||||||
|
Set<String> getLeafQueueNames();
|
||||||
|
|
||||||
|
Set<String> getAllPartitions();
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class CapacitySchedulerPreemptionUtils {
|
||||||
|
public static Map<String, Resource> getResToObtainByPartitionForLeafQueue(
|
||||||
|
CapacitySchedulerPreemptionContext context, String queueName,
|
||||||
|
Resource clusterResource) {
|
||||||
|
Map<String, Resource> resToObtainByPartition = new HashMap<>();
|
||||||
|
// compute resToObtainByPartition considered inter-queue preemption
|
||||||
|
for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) {
|
||||||
|
if (qT.preemptionDisabled) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only add resToObtainByPartition when actuallyToBePreempted resource >= 0
|
||||||
|
if (Resources.greaterThan(context.getResourceCalculator(),
|
||||||
|
clusterResource, qT.actuallyToBePreempted, Resources.none())) {
|
||||||
|
resToObtainByPartition.put(qT.partition,
|
||||||
|
Resources.clone(qT.actuallyToBePreempted));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return resToObtainByPartition;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isContainerAlreadySelected(RMContainer container,
|
||||||
|
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
|
||||||
|
if (null == selectedCandidates) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<RMContainer> containers = selectedCandidates.get(
|
||||||
|
container.getApplicationAttemptId());
|
||||||
|
if (containers == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return containers.contains(container);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,364 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
public class FifoCandidatesSelector
|
||||||
|
extends PreemptionCandidatesSelector {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(FifoCandidatesSelector.class);
|
||||||
|
private PreemptableResourceCalculator preemptableAmountCalculator;
|
||||||
|
|
||||||
|
FifoCandidatesSelector(
|
||||||
|
CapacitySchedulerPreemptionContext preemptionContext) {
|
||||||
|
super(preemptionContext);
|
||||||
|
|
||||||
|
preemptableAmountCalculator = new PreemptableResourceCalculator(
|
||||||
|
preemptionContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
|
||||||
|
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
|
||||||
|
Resource clusterResource, Resource totalPreemptionAllowed) {
|
||||||
|
// Calculate how much resources we need to preempt
|
||||||
|
preemptableAmountCalculator.computeIdealAllocation(clusterResource,
|
||||||
|
totalPreemptionAllowed);
|
||||||
|
|
||||||
|
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
|
||||||
|
new HashMap<>();
|
||||||
|
List<RMContainer> skippedAMContainerlist = new ArrayList<>();
|
||||||
|
|
||||||
|
// Loop all leaf queues
|
||||||
|
for (String queueName : preemptionContext.getLeafQueueNames()) {
|
||||||
|
// check if preemption disabled for the queue
|
||||||
|
if (preemptionContext.getQueueByPartition(queueName,
|
||||||
|
RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("skipping from queue=" + queueName
|
||||||
|
+ " because it's a non-preemptable queue");
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// compute resToObtainByPartition considered inter-queue preemption
|
||||||
|
LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
|
||||||
|
RMNodeLabelsManager.NO_LABEL).leafQueue;
|
||||||
|
|
||||||
|
Map<String, Resource> resToObtainByPartition =
|
||||||
|
CapacitySchedulerPreemptionUtils
|
||||||
|
.getResToObtainByPartitionForLeafQueue(preemptionContext,
|
||||||
|
queueName, clusterResource);
|
||||||
|
|
||||||
|
synchronized (leafQueue) {
|
||||||
|
// go through all ignore-partition-exclusivity containers first to make
|
||||||
|
// sure such containers will be preemptionCandidates first
|
||||||
|
Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
|
||||||
|
leafQueue.getIgnoreExclusivityRMContainers();
|
||||||
|
for (String partition : resToObtainByPartition.keySet()) {
|
||||||
|
if (ignorePartitionExclusivityContainers.containsKey(partition)) {
|
||||||
|
TreeSet<RMContainer> rmContainers =
|
||||||
|
ignorePartitionExclusivityContainers.get(partition);
|
||||||
|
// We will check container from reverse order, so latter submitted
|
||||||
|
// application's containers will be preemptionCandidates first.
|
||||||
|
for (RMContainer c : rmContainers.descendingSet()) {
|
||||||
|
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
|
||||||
|
selectedCandidates)) {
|
||||||
|
// Skip already selected containers
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
boolean preempted = tryPreemptContainerAndDeductResToObtain(
|
||||||
|
resToObtainByPartition, c, clusterResource, preemptMap,
|
||||||
|
totalPreemptionAllowed);
|
||||||
|
if (!preempted) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// preempt other containers
|
||||||
|
Resource skippedAMSize = Resource.newInstance(0, 0);
|
||||||
|
Iterator<FiCaSchedulerApp> desc =
|
||||||
|
leafQueue.getOrderingPolicy().getPreemptionIterator();
|
||||||
|
while (desc.hasNext()) {
|
||||||
|
FiCaSchedulerApp fc = desc.next();
|
||||||
|
// When we complete preempt from one partition, we will remove from
|
||||||
|
// resToObtainByPartition, so when it becomes empty, we can get no
|
||||||
|
// more preemption is needed
|
||||||
|
if (resToObtainByPartition.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
preemptFrom(fc, clusterResource, resToObtainByPartition,
|
||||||
|
skippedAMContainerlist, skippedAMSize, preemptMap,
|
||||||
|
totalPreemptionAllowed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Can try preempting AMContainers (still saving atmost
|
||||||
|
// maxAMCapacityForThisQueue AMResource's) if more resources are
|
||||||
|
// required to be preemptionCandidates from this Queue.
|
||||||
|
Resource maxAMCapacityForThisQueue = Resources.multiply(
|
||||||
|
Resources.multiply(clusterResource,
|
||||||
|
leafQueue.getAbsoluteCapacity()),
|
||||||
|
leafQueue.getMaxAMResourcePerQueuePercent());
|
||||||
|
|
||||||
|
preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist,
|
||||||
|
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
|
||||||
|
totalPreemptionAllowed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return preemptMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* As more resources are needed for preemption, saved AMContainers has to be
|
||||||
|
* rescanned. Such AMContainers can be preemptionCandidates based on resToObtain, but
|
||||||
|
* maxAMCapacityForThisQueue resources will be still retained.
|
||||||
|
*
|
||||||
|
* @param clusterResource
|
||||||
|
* @param preemptMap
|
||||||
|
* @param skippedAMContainerlist
|
||||||
|
* @param skippedAMSize
|
||||||
|
* @param maxAMCapacityForThisQueue
|
||||||
|
*/
|
||||||
|
private void preemptAMContainers(Resource clusterResource,
|
||||||
|
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
|
||||||
|
List<RMContainer> skippedAMContainerlist,
|
||||||
|
Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
|
||||||
|
Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) {
|
||||||
|
for (RMContainer c : skippedAMContainerlist) {
|
||||||
|
// Got required amount of resources for preemption, can stop now
|
||||||
|
if (resToObtainByPartition.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
|
||||||
|
// container selection iteration for preemption will be stopped.
|
||||||
|
if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
|
||||||
|
maxAMCapacityForThisQueue)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean preempted =
|
||||||
|
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
|
||||||
|
clusterResource, preemptMap, totalPreemptionAllowed);
|
||||||
|
if (preempted) {
|
||||||
|
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
skippedAMContainerlist.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean preemptMapContains(
|
||||||
|
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
|
||||||
|
ApplicationAttemptId attemptId, RMContainer rmContainer) {
|
||||||
|
Set<RMContainer> rmContainers;
|
||||||
|
if (null == (rmContainers = preemptMap.get(attemptId))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return rmContainers.contains(rmContainer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return should we preempt rmContainer. If we should, deduct from
|
||||||
|
* <code>resourceToObtainByPartition</code>
|
||||||
|
*/
|
||||||
|
private boolean tryPreemptContainerAndDeductResToObtain(
|
||||||
|
Map<String, Resource> resourceToObtainByPartitions,
|
||||||
|
RMContainer rmContainer, Resource clusterResource,
|
||||||
|
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
|
||||||
|
Resource totalPreemptionAllowed) {
|
||||||
|
ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
|
||||||
|
|
||||||
|
// We will not account resource of a container twice or more
|
||||||
|
if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
|
||||||
|
Resource toObtainByPartition =
|
||||||
|
resourceToObtainByPartitions.get(nodePartition);
|
||||||
|
|
||||||
|
if (null != toObtainByPartition && Resources.greaterThan(rc,
|
||||||
|
clusterResource, toObtainByPartition, Resources.none()) && Resources
|
||||||
|
.fitsIn(rc, clusterResource, rmContainer.getAllocatedResource(),
|
||||||
|
totalPreemptionAllowed)) {
|
||||||
|
Resources.subtractFrom(toObtainByPartition,
|
||||||
|
rmContainer.getAllocatedResource());
|
||||||
|
Resources.subtractFrom(totalPreemptionAllowed,
|
||||||
|
rmContainer.getAllocatedResource());
|
||||||
|
|
||||||
|
// When we have no more resource need to obtain, remove from map.
|
||||||
|
if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
|
||||||
|
Resources.none())) {
|
||||||
|
resourceToObtainByPartitions.remove(nodePartition);
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Marked container=" + rmContainer.getContainerId()
|
||||||
|
+ " in partition=" + nodePartition
|
||||||
|
+ " to be preemption candidates");
|
||||||
|
}
|
||||||
|
// Add to preemptMap
|
||||||
|
addToPreemptMap(preemptMap, attemptId, rmContainer);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getPartitionByNodeId(NodeId nodeId) {
|
||||||
|
return preemptionContext.getScheduler().getSchedulerNode(nodeId)
|
||||||
|
.getPartition();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a target preemption for a specific application, select containers
|
||||||
|
* to preempt (after unreserving all reservation for that app).
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private void preemptFrom(FiCaSchedulerApp app,
|
||||||
|
Resource clusterResource, Map<String, Resource> resToObtainByPartition,
|
||||||
|
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
|
||||||
|
Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
|
||||||
|
Resource totalPreemptionAllowed) {
|
||||||
|
ApplicationAttemptId appId = app.getApplicationAttemptId();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Looking at application=" + app.getApplicationAttemptId()
|
||||||
|
+ " resourceToObtain=" + resToObtainByPartition);
|
||||||
|
}
|
||||||
|
|
||||||
|
// first drop reserved containers towards rsrcPreempt
|
||||||
|
List<RMContainer> reservedContainers =
|
||||||
|
new ArrayList<>(app.getReservedContainers());
|
||||||
|
for (RMContainer c : reservedContainers) {
|
||||||
|
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
|
||||||
|
selectedContainers)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (resToObtainByPartition.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to preempt this container
|
||||||
|
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
|
||||||
|
clusterResource, selectedContainers, totalPreemptionAllowed);
|
||||||
|
|
||||||
|
if (!preemptionContext.isObserveOnly()) {
|
||||||
|
preemptionContext.getRMContext().getDispatcher().getEventHandler()
|
||||||
|
.handle(new ContainerPreemptEvent(appId, c,
|
||||||
|
SchedulerEventType.KILL_RESERVED_CONTAINER));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if more resources are to be freed go through all live containers in
|
||||||
|
// reverse priority and reverse allocation order and mark them for
|
||||||
|
// preemption
|
||||||
|
List<RMContainer> liveContainers =
|
||||||
|
new ArrayList<>(app.getLiveContainers());
|
||||||
|
|
||||||
|
sortContainers(liveContainers);
|
||||||
|
|
||||||
|
for (RMContainer c : liveContainers) {
|
||||||
|
if (resToObtainByPartition.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
|
||||||
|
selectedContainers)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip already marked to killable containers
|
||||||
|
if (null != preemptionContext.getKillableContainers() && preemptionContext
|
||||||
|
.getKillableContainers().contains(c.getContainerId())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip AM Container from preemption for now.
|
||||||
|
if (c.isAMContainer()) {
|
||||||
|
skippedAMContainerlist.add(c);
|
||||||
|
Resources.addTo(skippedAMSize, c.getAllocatedResource());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to preempt this container
|
||||||
|
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
|
||||||
|
clusterResource, selectedContainers, totalPreemptionAllowed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compare by reversed priority order first, and then reversed containerId
|
||||||
|
* order
|
||||||
|
* @param containers
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
static void sortContainers(List<RMContainer> containers){
|
||||||
|
Collections.sort(containers, new Comparator<RMContainer>() {
|
||||||
|
@Override
|
||||||
|
public int compare(RMContainer a, RMContainer b) {
|
||||||
|
Comparator<Priority> c = new org.apache.hadoop.yarn.server
|
||||||
|
.resourcemanager.resource.Priority.Comparator();
|
||||||
|
int priorityComp = c.compare(b.getContainer().getPriority(),
|
||||||
|
a.getContainer().getPriority());
|
||||||
|
if (priorityComp != 0) {
|
||||||
|
return priorityComp;
|
||||||
|
}
|
||||||
|
return b.getContainerId().compareTo(a.getContainerId());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addToPreemptMap(
|
||||||
|
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
|
||||||
|
ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
|
||||||
|
Set<RMContainer> set;
|
||||||
|
if (null == (set = preemptMap.get(appAttemptId))) {
|
||||||
|
set = new HashSet<>();
|
||||||
|
preemptMap.put(appAttemptId, set);
|
||||||
|
}
|
||||||
|
set.add(containerToPreempt);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,370 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.PriorityQueue;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate how much resources need to be preempted for each queue,
|
||||||
|
* will be used by {@link FifoCandidatesSelector}
|
||||||
|
*/
|
||||||
|
public class PreemptableResourceCalculator {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(PreemptableResourceCalculator.class);
|
||||||
|
|
||||||
|
private final CapacitySchedulerPreemptionContext context;
|
||||||
|
private final ResourceCalculator rc;
|
||||||
|
|
||||||
|
static class TQComparator implements Comparator<TempQueuePerPartition> {
|
||||||
|
private ResourceCalculator rc;
|
||||||
|
private Resource clusterRes;
|
||||||
|
|
||||||
|
TQComparator(ResourceCalculator rc, Resource clusterRes) {
|
||||||
|
this.rc = rc;
|
||||||
|
this.clusterRes = clusterRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
|
||||||
|
if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculates idealAssigned / guaranteed
|
||||||
|
// TempQueues with 0 guarantees are always considered the most over
|
||||||
|
// capacity and therefore considered last for resources.
|
||||||
|
private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
|
||||||
|
double pctOver = Integer.MAX_VALUE;
|
||||||
|
if (q != null && Resources.greaterThan(
|
||||||
|
rc, clusterRes, q.guaranteed, Resources.none())) {
|
||||||
|
pctOver =
|
||||||
|
Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed);
|
||||||
|
}
|
||||||
|
return (pctOver);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public PreemptableResourceCalculator(CapacitySchedulerPreemptionContext preemptionContext) {
|
||||||
|
context = preemptionContext;
|
||||||
|
rc = preemptionContext.getResourceCalculator();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes a normalizedGuaranteed capacity based on active queues
|
||||||
|
* @param rc resource calculator
|
||||||
|
* @param clusterResource the total amount of resources in the cluster
|
||||||
|
* @param queues the list of queues to consider
|
||||||
|
*/
|
||||||
|
private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
|
||||||
|
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
|
||||||
|
Resource activeCap = Resource.newInstance(0, 0);
|
||||||
|
|
||||||
|
if (ignoreGuar) {
|
||||||
|
for (TempQueuePerPartition q : queues) {
|
||||||
|
q.normalizedGuarantee = 1.0f / queues.size();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (TempQueuePerPartition q : queues) {
|
||||||
|
Resources.addTo(activeCap, q.guaranteed);
|
||||||
|
}
|
||||||
|
for (TempQueuePerPartition q : queues) {
|
||||||
|
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
|
||||||
|
q.guaranteed, activeCap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take the most underserved TempQueue (the one on the head). Collect and
|
||||||
|
// return the list of all queues that have the same idealAssigned
|
||||||
|
// percentage of guaranteed.
|
||||||
|
protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
|
||||||
|
PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
|
||||||
|
ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
|
||||||
|
while (!orderedByNeed.isEmpty()) {
|
||||||
|
TempQueuePerPartition q1 = orderedByNeed.remove();
|
||||||
|
underserved.add(q1);
|
||||||
|
TempQueuePerPartition q2 = orderedByNeed.peek();
|
||||||
|
// q1's pct of guaranteed won't be larger than q2's. If it's less, then
|
||||||
|
// return what has already been collected. Otherwise, q1's pct of
|
||||||
|
// guaranteed == that of q2, so add q2 to underserved list during the
|
||||||
|
// next pass.
|
||||||
|
if (q2 == null || tqComparator.compare(q1,q2) < 0) {
|
||||||
|
return underserved;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return underserved;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a set of queues compute the fix-point distribution of unassigned
|
||||||
|
* resources among them. As pending request of a queue are exhausted, the
|
||||||
|
* queue is removed from the set and remaining capacity redistributed among
|
||||||
|
* remaining queues. The distribution is weighted based on guaranteed
|
||||||
|
* capacity, unless asked to ignoreGuarantee, in which case resources are
|
||||||
|
* distributed uniformly.
|
||||||
|
*/
|
||||||
|
private void computeFixpointAllocation(ResourceCalculator rc,
|
||||||
|
Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
|
||||||
|
Resource unassigned, boolean ignoreGuarantee) {
|
||||||
|
// Prior to assigning the unused resources, process each queue as follows:
|
||||||
|
// If current > guaranteed, idealAssigned = guaranteed + untouchable extra
|
||||||
|
// Else idealAssigned = current;
|
||||||
|
// Subtract idealAssigned resources from unassigned.
|
||||||
|
// If the queue has all of its needs met (that is, if
|
||||||
|
// idealAssigned >= current + pending), remove it from consideration.
|
||||||
|
// Sort queues from most under-guaranteed to most over-guaranteed.
|
||||||
|
TQComparator tqComparator = new TQComparator(rc, tot_guarant);
|
||||||
|
PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
|
||||||
|
tqComparator);
|
||||||
|
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
|
||||||
|
TempQueuePerPartition q = i.next();
|
||||||
|
if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
|
||||||
|
q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
|
||||||
|
} else {
|
||||||
|
q.idealAssigned = Resources.clone(q.current);
|
||||||
|
}
|
||||||
|
Resources.subtractFrom(unassigned, q.idealAssigned);
|
||||||
|
// If idealAssigned < (current + pending), q needs more resources, so
|
||||||
|
// add it to the list of underserved queues, ordered by need.
|
||||||
|
Resource curPlusPend = Resources.add(q.current, q.pending);
|
||||||
|
if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
|
||||||
|
orderedByNeed.add(q);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//assign all cluster resources until no more demand, or no resources are left
|
||||||
|
while (!orderedByNeed.isEmpty()
|
||||||
|
&& Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
|
||||||
|
Resource wQassigned = Resource.newInstance(0, 0);
|
||||||
|
// we compute normalizedGuarantees capacity based on currently active
|
||||||
|
// queues
|
||||||
|
resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
|
||||||
|
|
||||||
|
// For each underserved queue (or set of queues if multiple are equally
|
||||||
|
// underserved), offer its share of the unassigned resources based on its
|
||||||
|
// normalized guarantee. After the offer, if the queue is not satisfied,
|
||||||
|
// place it back in the ordered list of queues, recalculating its place
|
||||||
|
// in the order of most under-guaranteed to most over-guaranteed. In this
|
||||||
|
// way, the most underserved queue(s) are always given resources first.
|
||||||
|
Collection<TempQueuePerPartition> underserved =
|
||||||
|
getMostUnderservedQueues(orderedByNeed, tqComparator);
|
||||||
|
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
|
||||||
|
.hasNext();) {
|
||||||
|
TempQueuePerPartition sub = i.next();
|
||||||
|
Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
|
||||||
|
unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
|
||||||
|
Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
|
||||||
|
Resource wQdone = Resources.subtract(wQavail, wQidle);
|
||||||
|
|
||||||
|
if (Resources.greaterThan(rc, tot_guarant,
|
||||||
|
wQdone, Resources.none())) {
|
||||||
|
// The queue is still asking for more. Put it back in the priority
|
||||||
|
// queue, recalculating its order based on need.
|
||||||
|
orderedByNeed.add(sub);
|
||||||
|
}
|
||||||
|
Resources.addTo(wQassigned, wQdone);
|
||||||
|
}
|
||||||
|
Resources.subtractFrom(unassigned, wQassigned);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method computes (for a single level in the tree, passed as a {@code
|
||||||
|
* List<TempQueue>}) the ideal assignment of resources. This is done
|
||||||
|
* recursively to allocate capacity fairly across all queues with pending
|
||||||
|
* demands. It terminates when no resources are left to assign, or when all
|
||||||
|
* demand is satisfied.
|
||||||
|
*
|
||||||
|
* @param rc resource calculator
|
||||||
|
* @param queues a list of cloned queues to be assigned capacity to (this is
|
||||||
|
* an out param)
|
||||||
|
* @param totalPreemptionAllowed total amount of preemption we allow
|
||||||
|
* @param tot_guarant the amount of capacity assigned to this pool of queues
|
||||||
|
*/
|
||||||
|
private void computeIdealResourceDistribution(ResourceCalculator rc,
|
||||||
|
List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
|
||||||
|
Resource tot_guarant) {
|
||||||
|
|
||||||
|
// qAlloc tracks currently active queues (will decrease progressively as
|
||||||
|
// demand is met)
|
||||||
|
List<TempQueuePerPartition> qAlloc = new ArrayList<>(queues);
|
||||||
|
// unassigned tracks how much resources are still to assign, initialized
|
||||||
|
// with the total capacity for this set of queues
|
||||||
|
Resource unassigned = Resources.clone(tot_guarant);
|
||||||
|
|
||||||
|
// group queues based on whether they have non-zero guaranteed capacity
|
||||||
|
Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<>();
|
||||||
|
Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>();
|
||||||
|
|
||||||
|
for (TempQueuePerPartition q : qAlloc) {
|
||||||
|
if (Resources
|
||||||
|
.greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
|
||||||
|
nonZeroGuarQueues.add(q);
|
||||||
|
} else {
|
||||||
|
zeroGuarQueues.add(q);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// first compute the allocation as a fixpoint based on guaranteed capacity
|
||||||
|
computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
|
||||||
|
false);
|
||||||
|
|
||||||
|
// if any capacity is left unassigned, distributed among zero-guarantee
|
||||||
|
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
|
||||||
|
if (!zeroGuarQueues.isEmpty()
|
||||||
|
&& Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
|
||||||
|
computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// based on ideal assignment computed above and current assignment we derive
|
||||||
|
// how much preemption is required overall
|
||||||
|
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
|
||||||
|
for (TempQueuePerPartition t:queues) {
|
||||||
|
if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
|
||||||
|
Resources.addTo(totPreemptionNeeded,
|
||||||
|
Resources.subtract(t.current, t.idealAssigned));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we need to preempt more than is allowed, compute a factor (0<f<1)
|
||||||
|
// that is used to scale down how much we ask back from each queue
|
||||||
|
float scalingFactor = 1.0F;
|
||||||
|
if (Resources.greaterThan(rc, tot_guarant,
|
||||||
|
totPreemptionNeeded, totalPreemptionAllowed)) {
|
||||||
|
scalingFactor = Resources.divide(rc, tot_guarant,
|
||||||
|
totalPreemptionAllowed, totPreemptionNeeded);
|
||||||
|
}
|
||||||
|
|
||||||
|
// assign to each queue the amount of actual preemption based on local
|
||||||
|
// information of ideal preemption and scaling factor
|
||||||
|
for (TempQueuePerPartition t : queues) {
|
||||||
|
t.assignPreemption(scalingFactor, rc, tot_guarant);
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
for (TempQueuePerPartition t : queues) {
|
||||||
|
LOG.debug(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method recursively computes the ideal assignment of resources to each
|
||||||
|
* level of the hierarchy. This ensures that leafs that are over-capacity but
|
||||||
|
* with parents within capacity will not be preemptionCandidates. Preemptions are allowed
|
||||||
|
* within each subtree according to local over/under capacity.
|
||||||
|
*
|
||||||
|
* @param root the root of the cloned queue hierachy
|
||||||
|
* @param totalPreemptionAllowed maximum amount of preemption allowed
|
||||||
|
* @return a list of leaf queues updated with preemption targets
|
||||||
|
*/
|
||||||
|
private void recursivelyComputeIdealAssignment(
|
||||||
|
TempQueuePerPartition root, Resource totalPreemptionAllowed) {
|
||||||
|
if (root.getChildren() != null &&
|
||||||
|
root.getChildren().size() > 0) {
|
||||||
|
// compute ideal distribution at this level
|
||||||
|
computeIdealResourceDistribution(rc, root.getChildren(),
|
||||||
|
totalPreemptionAllowed, root.idealAssigned);
|
||||||
|
// compute recursively for lower levels and build list of leafs
|
||||||
|
for(TempQueuePerPartition t : root.getChildren()) {
|
||||||
|
recursivelyComputeIdealAssignment(t, totalPreemptionAllowed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void calculateResToObtainByPartitionForLeafQueues(
|
||||||
|
Set<String> leafQueueNames, Resource clusterResource) {
|
||||||
|
// Loop all leaf queues
|
||||||
|
for (String queueName : leafQueueNames) {
|
||||||
|
// check if preemption disabled for the queue
|
||||||
|
if (context.getQueueByPartition(queueName,
|
||||||
|
RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("skipping from queue=" + queueName
|
||||||
|
+ " because it's a non-preemptable queue");
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// compute resToObtainByPartition considered inter-queue preemption
|
||||||
|
for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) {
|
||||||
|
// we act only if we are violating balance by more than
|
||||||
|
// maxIgnoredOverCapacity
|
||||||
|
if (Resources.greaterThan(rc, clusterResource, qT.current,
|
||||||
|
Resources.multiply(qT.guaranteed, 1.0 + context.getMaxIgnoreOverCapacity()))) {
|
||||||
|
// we introduce a dampening factor naturalTerminationFactor that
|
||||||
|
// accounts for natural termination of containers
|
||||||
|
Resource resToObtain = Resources.multiply(qT.toBePreempted,
|
||||||
|
context.getNaturalTerminationFactor());
|
||||||
|
// Only add resToObtain when it >= 0
|
||||||
|
if (Resources.greaterThan(rc, clusterResource, resToObtain,
|
||||||
|
Resources.none())) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Queue=" + queueName + " partition=" + qT.partition
|
||||||
|
+ " resource-to-obtain=" + resToObtain);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
qT.actuallyToBePreempted = Resources.clone(resToObtain);
|
||||||
|
} else {
|
||||||
|
qT.actuallyToBePreempted = Resources.none();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void computeIdealAllocation(Resource clusterResource,
|
||||||
|
Resource totalPreemptionAllowed) {
|
||||||
|
for (String partition : context.getAllPartitions()) {
|
||||||
|
TempQueuePerPartition tRoot =
|
||||||
|
context.getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
|
||||||
|
// compute the ideal distribution of resources among queues
|
||||||
|
// updates cloned queues state accordingly
|
||||||
|
tRoot.idealAssigned = tRoot.guaranteed;
|
||||||
|
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// based on ideal allocation select containers to be preempted from each
|
||||||
|
// calculate resource-to-obtain by partition for each leaf queues
|
||||||
|
calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(),
|
||||||
|
clusterResource);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,52 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public abstract class PreemptionCandidatesSelector {
|
||||||
|
protected CapacitySchedulerPreemptionContext preemptionContext;
|
||||||
|
protected ResourceCalculator rc;
|
||||||
|
|
||||||
|
PreemptionCandidatesSelector(
|
||||||
|
CapacitySchedulerPreemptionContext preemptionContext) {
|
||||||
|
this.preemptionContext = preemptionContext;
|
||||||
|
this.rc = preemptionContext.getResourceCalculator();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get preemption candidates from computed resource sharing and already
|
||||||
|
* selected candidates.
|
||||||
|
*
|
||||||
|
* @param selectedCandidates already selected candidates from previous policies
|
||||||
|
* @param clusterResource
|
||||||
|
* @param totalPreemptedResourceAllowed how many resources allowed to be
|
||||||
|
* preempted in this round
|
||||||
|
* @return merged selected candidates.
|
||||||
|
*/
|
||||||
|
public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
|
||||||
|
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
|
||||||
|
Resource clusterResource, Resource totalPreemptedResourceAllowed);
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,159 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Temporary data-structure tracking resource availability, pending resource
|
||||||
|
* need, current utilization. This is per-queue-per-partition data structure
|
||||||
|
*/
|
||||||
|
public class TempQueuePerPartition {
|
||||||
|
// Following fields are copied from scheduler
|
||||||
|
final String queueName;
|
||||||
|
final Resource current;
|
||||||
|
final Resource pending;
|
||||||
|
final Resource guaranteed;
|
||||||
|
final Resource maxCapacity;
|
||||||
|
final Resource killable;
|
||||||
|
final String partition;
|
||||||
|
|
||||||
|
// Following fields are setted and used by candidate selection policies
|
||||||
|
Resource idealAssigned;
|
||||||
|
Resource toBePreempted;
|
||||||
|
Resource untouchableExtra;
|
||||||
|
Resource preemptableExtra;
|
||||||
|
// For logging purpose
|
||||||
|
Resource actuallyToBePreempted;
|
||||||
|
|
||||||
|
double normalizedGuarantee;
|
||||||
|
|
||||||
|
final ArrayList<TempQueuePerPartition> children;
|
||||||
|
LeafQueue leafQueue;
|
||||||
|
boolean preemptionDisabled;
|
||||||
|
|
||||||
|
TempQueuePerPartition(String queueName, Resource current, Resource pending,
|
||||||
|
Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
|
||||||
|
String partition, Resource killable) {
|
||||||
|
this.queueName = queueName;
|
||||||
|
this.current = current;
|
||||||
|
this.pending = pending;
|
||||||
|
this.guaranteed = guaranteed;
|
||||||
|
this.maxCapacity = maxCapacity;
|
||||||
|
this.idealAssigned = Resource.newInstance(0, 0);
|
||||||
|
this.actuallyToBePreempted = Resource.newInstance(0, 0);
|
||||||
|
this.toBePreempted = Resource.newInstance(0, 0);
|
||||||
|
this.normalizedGuarantee = Float.NaN;
|
||||||
|
this.children = new ArrayList<>();
|
||||||
|
this.untouchableExtra = Resource.newInstance(0, 0);
|
||||||
|
this.preemptableExtra = Resource.newInstance(0, 0);
|
||||||
|
this.preemptionDisabled = preemptionDisabled;
|
||||||
|
this.partition = partition;
|
||||||
|
this.killable = killable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLeafQueue(LeafQueue l) {
|
||||||
|
assert children.size() == 0;
|
||||||
|
this.leafQueue = l;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When adding a child we also aggregate its pending resource needs.
|
||||||
|
* @param q the child queue to add to this queue
|
||||||
|
*/
|
||||||
|
public void addChild(TempQueuePerPartition q) {
|
||||||
|
assert leafQueue == null;
|
||||||
|
children.add(q);
|
||||||
|
Resources.addTo(pending, q.pending);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayList<TempQueuePerPartition> getChildren(){
|
||||||
|
return children;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function "accepts" all the resources it can (pending) and return
|
||||||
|
// the unused ones
|
||||||
|
Resource offer(Resource avail, ResourceCalculator rc,
|
||||||
|
Resource clusterResource) {
|
||||||
|
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
|
||||||
|
Resources.subtract(maxCapacity, idealAssigned),
|
||||||
|
Resource.newInstance(0, 0));
|
||||||
|
// remain = avail - min(avail, (max - assigned), (current + pending - assigned))
|
||||||
|
Resource accepted =
|
||||||
|
Resources.min(rc, clusterResource,
|
||||||
|
absMaxCapIdealAssignedDelta,
|
||||||
|
Resources.min(rc, clusterResource, avail, Resources.subtract(
|
||||||
|
Resources.add(current, pending), idealAssigned)));
|
||||||
|
Resource remain = Resources.subtract(avail, accepted);
|
||||||
|
Resources.addTo(idealAssigned, accepted);
|
||||||
|
return remain;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append(" NAME: " + queueName)
|
||||||
|
.append(" CUR: ").append(current)
|
||||||
|
.append(" PEN: ").append(pending)
|
||||||
|
.append(" GAR: ").append(guaranteed)
|
||||||
|
.append(" NORM: ").append(normalizedGuarantee)
|
||||||
|
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
|
||||||
|
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
|
||||||
|
.append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted)
|
||||||
|
.append(" UNTOUCHABLE: ").append(untouchableExtra)
|
||||||
|
.append(" PREEMPTABLE: ").append(preemptableExtra)
|
||||||
|
.append("\n");
|
||||||
|
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void assignPreemption(float scalingFactor, ResourceCalculator rc,
|
||||||
|
Resource clusterResource) {
|
||||||
|
if (Resources.greaterThan(rc, clusterResource,
|
||||||
|
Resources.subtract(current, killable), idealAssigned)) {
|
||||||
|
toBePreempted = Resources.multiply(Resources
|
||||||
|
.subtract(Resources.subtract(current, killable), idealAssigned),
|
||||||
|
scalingFactor);
|
||||||
|
} else {
|
||||||
|
toBePreempted = Resource.newInstance(0, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void appendLogString(StringBuilder sb) {
|
||||||
|
sb.append(queueName).append(", ")
|
||||||
|
.append(current.getMemory()).append(", ")
|
||||||
|
.append(current.getVirtualCores()).append(", ")
|
||||||
|
.append(pending.getMemory()).append(", ")
|
||||||
|
.append(pending.getVirtualCores()).append(", ")
|
||||||
|
.append(guaranteed.getMemory()).append(", ")
|
||||||
|
.append(guaranteed.getVirtualCores()).append(", ")
|
||||||
|
.append(idealAssigned.getMemory()).append(", ")
|
||||||
|
.append(idealAssigned.getVirtualCores()).append(", ")
|
||||||
|
.append(toBePreempted.getMemory()).append(", ")
|
||||||
|
.append(toBePreempted.getVirtualCores() ).append(", ")
|
||||||
|
.append(actuallyToBePreempted.getMemory()).append(", ")
|
||||||
|
.append(actuallyToBePreempted.getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1020,4 +1020,49 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
public boolean getLazyPreemptionEnabled() {
|
public boolean getLazyPreemptionEnabled() {
|
||||||
return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED);
|
return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** If true, run the policy but do not affect the cluster with preemption and
|
||||||
|
* kill events. */
|
||||||
|
public static final String PREEMPTION_OBSERVE_ONLY =
|
||||||
|
"yarn.resourcemanager.monitor.capacity.preemption.observe_only";
|
||||||
|
public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false;
|
||||||
|
|
||||||
|
/** Time in milliseconds between invocations of this policy */
|
||||||
|
public static final String PREEMPTION_MONITORING_INTERVAL =
|
||||||
|
"yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
|
||||||
|
public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L;
|
||||||
|
|
||||||
|
/** Time in milliseconds between requesting a preemption from an application
|
||||||
|
* and killing the container. */
|
||||||
|
public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL =
|
||||||
|
"yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
|
||||||
|
public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L;
|
||||||
|
|
||||||
|
/** Maximum percentage of resources preemptionCandidates in a single round. By
|
||||||
|
* controlling this value one can throttle the pace at which containers are
|
||||||
|
* reclaimed from the cluster. After computing the total desired preemption,
|
||||||
|
* the policy scales it back within this limit. */
|
||||||
|
public static final String TOTAL_PREEMPTION_PER_ROUND =
|
||||||
|
"yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
|
||||||
|
public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f;
|
||||||
|
|
||||||
|
/** Maximum amount of resources above the target capacity ignored for
|
||||||
|
* preemption. This defines a deadzone around the target capacity that helps
|
||||||
|
* prevent thrashing and oscillations around the computed target balance.
|
||||||
|
* High values would slow the time to capacity and (absent natural
|
||||||
|
* completions) it might prevent convergence to guaranteed capacity. */
|
||||||
|
public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY =
|
||||||
|
"yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
|
||||||
|
public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f;
|
||||||
|
/**
|
||||||
|
* Given a computed preemption target, account for containers naturally
|
||||||
|
* expiring and preempt only this percentage of the delta. This determines
|
||||||
|
* the rate of geometric convergence into the deadzone ({@link
|
||||||
|
* #PREEMPTION_MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
|
||||||
|
* will reclaim almost 95% of resources within 5 * {@link
|
||||||
|
* #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
|
||||||
|
public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR =
|
||||||
|
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
|
||||||
|
public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
|
||||||
|
0.2f;
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,12 +86,6 @@ public class PreemptableQueue {
|
||||||
return res == null ? Resources.none() : res;
|
return res == null ? Resources.none() : res;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public Map<ContainerId, RMContainer> getKillableContainers(String partition) {
|
|
||||||
Map<ContainerId, RMContainer> map = killableContainers.get(partition);
|
|
||||||
return map == null ? Collections.EMPTY_MAP : map;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() {
|
public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() {
|
||||||
return killableContainers;
|
return killableContainers;
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,7 +146,7 @@ public class PreemptionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() {
|
public Map<String, PreemptableQueue> getShallowCopyOfPreemptableQueues() {
|
||||||
try {
|
try {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
Map<String, PreemptableQueue> map = new HashMap<>();
|
Map<String, PreemptableQueue> map = new HashMap<>();
|
||||||
|
|
|
@ -17,38 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Matchers.anyString;
|
|
||||||
import static org.mockito.Matchers.argThat;
|
|
||||||
import static org.mockito.Matchers.isA;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.never;
|
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.Deque;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.NavigableSet;
|
|
||||||
import java.util.Random;
|
|
||||||
import java.util.StringTokenizer;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
@ -63,7 +31,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TempQueuePerPartition;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
@ -95,6 +62,32 @@ import org.mockito.ArgumentMatcher;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Deque;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.NavigableSet;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.StringTokenizer;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestProportionalCapacityPreemptionPolicy {
|
public class TestProportionalCapacityPreemptionPolicy {
|
||||||
|
|
||||||
static final long TS = 3141592653L;
|
static final long TS = 3141592653L;
|
||||||
|
@ -105,11 +98,10 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
float setAMResourcePercent = 0.0f;
|
float setAMResourcePercent = 0.0f;
|
||||||
Random rand = null;
|
Random rand = null;
|
||||||
Clock mClock = null;
|
Clock mClock = null;
|
||||||
Configuration conf = null;
|
CapacitySchedulerConfiguration conf = null;
|
||||||
CapacityScheduler mCS = null;
|
CapacityScheduler mCS = null;
|
||||||
RMContext rmContext = null;
|
RMContext rmContext = null;
|
||||||
RMNodeLabelsManager lm = null;
|
RMNodeLabelsManager lm = null;
|
||||||
CapacitySchedulerConfiguration schedConf = null;
|
|
||||||
EventHandler<SchedulerEvent> mDisp = null;
|
EventHandler<SchedulerEvent> mDisp = null;
|
||||||
ResourceCalculator rc = new DefaultResourceCalculator();
|
ResourceCalculator rc = new DefaultResourceCalculator();
|
||||||
Resource clusterResources = null;
|
Resource clusterResources = null;
|
||||||
|
@ -132,7 +124,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2);
|
AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2);
|
||||||
int value;
|
int value;
|
||||||
|
|
||||||
private priority(int value) {
|
priority(int value) {
|
||||||
this.value = value;
|
this.value = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,12 +138,17 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
@Before
|
@Before
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void setup() {
|
public void setup() {
|
||||||
conf = new Configuration(false);
|
conf = new CapacitySchedulerConfiguration(new Configuration(false));
|
||||||
conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
|
conf.setLong(
|
||||||
conf.setLong(MONITORING_INTERVAL, 3000);
|
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
|
||||||
|
conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
|
||||||
|
3000);
|
||||||
// report "ideal" preempt
|
// report "ideal" preempt
|
||||||
conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
|
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
||||||
conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
|
1.0f);
|
||||||
|
conf.setFloat(
|
||||||
|
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
|
||||||
|
1.0f);
|
||||||
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
||||||
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
|
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
|
||||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
||||||
|
@ -164,8 +161,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
mCS = mock(CapacityScheduler.class);
|
mCS = mock(CapacityScheduler.class);
|
||||||
when(mCS.getResourceCalculator()).thenReturn(rc);
|
when(mCS.getResourceCalculator()).thenReturn(rc);
|
||||||
lm = mock(RMNodeLabelsManager.class);
|
lm = mock(RMNodeLabelsManager.class);
|
||||||
schedConf = new CapacitySchedulerConfiguration();
|
when(mCS.getConfiguration()).thenReturn(conf);
|
||||||
when(mCS.getConfiguration()).thenReturn(schedConf);
|
|
||||||
rmContext = mock(RMContext.class);
|
rmContext = mock(RMContext.class);
|
||||||
when(mCS.getRMContext()).thenReturn(rmContext);
|
when(mCS.getRMContext()).thenReturn(rmContext);
|
||||||
when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
|
when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
|
||||||
|
@ -271,7 +267,9 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
{ -1, 1, 1, 1 }, // req granularity
|
{ -1, 1, 1, 1 }, // req granularity
|
||||||
{ 3, 0, 0, 0 }, // subqueues
|
{ 3, 0, 0, 0 }, // subqueues
|
||||||
};
|
};
|
||||||
conf.setLong(WAIT_TIME_BEFORE_KILL, killTime);
|
conf.setLong(
|
||||||
|
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
|
||||||
|
killTime);
|
||||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||||
|
|
||||||
// ensure all pending rsrc from A get preempted from other queues
|
// ensure all pending rsrc from A get preempted from other queues
|
||||||
|
@ -308,7 +306,9 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
{ -1, 1, 1, 1 }, // req granularity
|
{ -1, 1, 1, 1 }, // req granularity
|
||||||
{ 3, 0, 0, 0 }, // subqueues
|
{ 3, 0, 0, 0 }, // subqueues
|
||||||
};
|
};
|
||||||
conf.setFloat(MAX_IGNORED_OVER_CAPACITY, (float) 0.1);
|
conf.setFloat(
|
||||||
|
CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY,
|
||||||
|
(float) 0.1);
|
||||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||||
policy.editSchedule();
|
policy.editSchedule();
|
||||||
// ignore 10% overcapacity to avoid jitter
|
// ignore 10% overcapacity to avoid jitter
|
||||||
|
@ -330,7 +330,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
{ 3, 0, 0, 0 }, // subqueues
|
{ 3, 0, 0, 0 }, // subqueues
|
||||||
};
|
};
|
||||||
|
|
||||||
schedConf.setPreemptionDisabled("root.queueB", true);
|
conf.setPreemptionDisabled("root.queueB", true);
|
||||||
|
|
||||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||||
policy.editSchedule();
|
policy.editSchedule();
|
||||||
|
@ -343,7 +343,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
// event handler will count only events from the following test and not the
|
// event handler will count only events from the following test and not the
|
||||||
// previous one.
|
// previous one.
|
||||||
setup();
|
setup();
|
||||||
schedConf.setPreemptionDisabled("root.queueB", false);
|
conf.setPreemptionDisabled("root.queueB", false);
|
||||||
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
||||||
|
|
||||||
policy2.editSchedule();
|
policy2.editSchedule();
|
||||||
|
@ -382,7 +382,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
// Need to call setup() again to reset mDisp
|
// Need to call setup() again to reset mDisp
|
||||||
setup();
|
setup();
|
||||||
// Turn off preemption for queueB and it's children
|
// Turn off preemption for queueB and it's children
|
||||||
schedConf.setPreemptionDisabled("root.queueA.queueB", true);
|
conf.setPreemptionDisabled("root.queueA.queueB", true);
|
||||||
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
||||||
policy2.editSchedule();
|
policy2.editSchedule();
|
||||||
ApplicationAttemptId expectedAttemptOnQueueC =
|
ApplicationAttemptId expectedAttemptOnQueueC =
|
||||||
|
@ -429,7 +429,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
// Need to call setup() again to reset mDisp
|
// Need to call setup() again to reset mDisp
|
||||||
setup();
|
setup();
|
||||||
// Turn off preemption for queueB(appA)
|
// Turn off preemption for queueB(appA)
|
||||||
schedConf.setPreemptionDisabled("root.queueA.queueB", true);
|
conf.setPreemptionDisabled("root.queueA.queueB", true);
|
||||||
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
||||||
policy2.editSchedule();
|
policy2.editSchedule();
|
||||||
// Now that queueB(appA) is not preemptable, verify that resources come
|
// Now that queueB(appA) is not preemptable, verify that resources come
|
||||||
|
@ -439,8 +439,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
|
|
||||||
setup();
|
setup();
|
||||||
// Turn off preemption for two of the 3 queues with over-capacity.
|
// Turn off preemption for two of the 3 queues with over-capacity.
|
||||||
schedConf.setPreemptionDisabled("root.queueD.queueE", true);
|
conf.setPreemptionDisabled("root.queueD.queueE", true);
|
||||||
schedConf.setPreemptionDisabled("root.queueA.queueB", true);
|
conf.setPreemptionDisabled("root.queueA.queueB", true);
|
||||||
ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
|
||||||
policy3.editSchedule();
|
policy3.editSchedule();
|
||||||
|
|
||||||
|
@ -481,7 +481,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
// Turn off preemption for queueA and it's children. queueF(appC)'s request
|
// Turn off preemption for queueA and it's children. queueF(appC)'s request
|
||||||
// should starve.
|
// should starve.
|
||||||
setup(); // Call setup() to reset mDisp
|
setup(); // Call setup() to reset mDisp
|
||||||
schedConf.setPreemptionDisabled("root.queueA", true);
|
conf.setPreemptionDisabled("root.queueA", true);
|
||||||
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
||||||
policy2.editSchedule();
|
policy2.editSchedule();
|
||||||
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
|
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
|
||||||
|
@ -505,7 +505,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
{ -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity
|
{ -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity
|
||||||
{ 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
|
{ 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
|
||||||
};
|
};
|
||||||
schedConf.setPreemptionDisabled("root.queueA.queueC", true);
|
conf.setPreemptionDisabled("root.queueA.queueC", true);
|
||||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||||
policy.editSchedule();
|
policy.editSchedule();
|
||||||
// Although queueC(appB) is way over capacity and is untouchable,
|
// Although queueC(appB) is way over capacity and is untouchable,
|
||||||
|
@ -529,7 +529,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
|
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
|
||||||
};
|
};
|
||||||
|
|
||||||
schedConf.setPreemptionDisabled("root", true);
|
conf.setPreemptionDisabled("root", true);
|
||||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||||
policy.editSchedule();
|
policy.editSchedule();
|
||||||
// All queues should be non-preemptable, so request should starve.
|
// All queues should be non-preemptable, so request should starve.
|
||||||
|
@ -556,7 +556,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
|
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
|
||||||
};
|
};
|
||||||
// QueueE inherits non-preemption from QueueD
|
// QueueE inherits non-preemption from QueueD
|
||||||
schedConf.setPreemptionDisabled("root.queueD", true);
|
conf.setPreemptionDisabled("root.queueD", true);
|
||||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||||
policy.editSchedule();
|
policy.editSchedule();
|
||||||
// appC is running on QueueE. QueueE is over absMaxCap, but is not
|
// appC is running on QueueE. QueueE is over absMaxCap, but is not
|
||||||
|
@ -596,7 +596,10 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
{ -1, 1, 1, 0 }, // req granularity
|
{ -1, 1, 1, 0 }, // req granularity
|
||||||
{ 3, 0, 0, 0 }, // subqueues
|
{ 3, 0, 0, 0 }, // subqueues
|
||||||
};
|
};
|
||||||
conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 0.1);
|
conf.setFloat(
|
||||||
|
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
|
||||||
|
(float) 0.1);
|
||||||
|
|
||||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||||
policy.editSchedule();
|
policy.editSchedule();
|
||||||
// ignore 10% imbalance between over-capacity queues
|
// ignore 10% imbalance between over-capacity queues
|
||||||
|
@ -616,7 +619,10 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
{ -1, 1, 1, 0 }, // req granularity
|
{ -1, 1, 1, 0 }, // req granularity
|
||||||
{ 3, 0, 0, 0 }, // subqueues
|
{ 3, 0, 0, 0 }, // subqueues
|
||||||
};
|
};
|
||||||
conf.setBoolean(OBSERVE_ONLY, true);
|
conf.setBoolean(CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
|
||||||
|
true);
|
||||||
|
when(mCS.getConfiguration()).thenReturn(
|
||||||
|
new CapacitySchedulerConfiguration(conf));
|
||||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||||
policy.editSchedule();
|
policy.editSchedule();
|
||||||
// verify even severe imbalance not affected
|
// verify even severe imbalance not affected
|
||||||
|
@ -735,7 +741,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
containers.add(rm4);
|
containers.add(rm4);
|
||||||
|
|
||||||
// sort them
|
// sort them
|
||||||
ProportionalCapacityPreemptionPolicy.sortContainers(containers);
|
FifoCandidatesSelector.sortContainers(containers);
|
||||||
|
|
||||||
// verify the "priority"-first, "reverse container-id"-second
|
// verify the "priority"-first, "reverse container-id"-second
|
||||||
// ordering is enforced correctly
|
// ordering is enforced correctly
|
||||||
|
@ -957,7 +963,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
|
|
||||||
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
|
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
|
||||||
ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(
|
ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(
|
||||||
conf, rmContext, mCS, mClock);
|
rmContext, mCS, mClock);
|
||||||
clusterResources = Resource.newInstance(
|
clusterResources = Resource.newInstance(
|
||||||
leafAbsCapacities(qData[0], qData[7]), 0);
|
leafAbsCapacities(qData[0], qData[7]), 0);
|
||||||
ParentQueue mRoot = buildMockRootQueue(rand, qData);
|
ParentQueue mRoot = buildMockRootQueue(rand, qData);
|
||||||
|
@ -967,11 +973,6 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
return policy;
|
return policy;
|
||||||
}
|
}
|
||||||
|
|
||||||
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
|
|
||||||
String[][] resData) {
|
|
||||||
return buildPolicy(qData, resData, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
|
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
|
||||||
String[][] resData, boolean useDominantResourceCalculator) {
|
String[][] resData, boolean useDominantResourceCalculator) {
|
||||||
if (useDominantResourceCalculator) {
|
if (useDominantResourceCalculator) {
|
||||||
|
@ -979,7 +980,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
new DominantResourceCalculator());
|
new DominantResourceCalculator());
|
||||||
}
|
}
|
||||||
ProportionalCapacityPreemptionPolicy policy =
|
ProportionalCapacityPreemptionPolicy policy =
|
||||||
new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
|
new ProportionalCapacityPreemptionPolicy(rmContext, mCS, mClock);
|
||||||
clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
|
clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
|
||||||
qData[2]);
|
qData[2]);
|
||||||
ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
|
ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
|
||||||
|
@ -1124,7 +1125,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
String qName = "";
|
String qName = "";
|
||||||
while(tokenizer.hasMoreTokens()) {
|
while(tokenizer.hasMoreTokens()) {
|
||||||
qName += tokenizer.nextToken();
|
qName += tokenizer.nextToken();
|
||||||
preemptionDisabled = schedConf.getPreemptionDisabled(qName, preemptionDisabled);
|
preemptionDisabled = conf.getPreemptionDisabled(qName, preemptionDisabled);
|
||||||
qName += ".";
|
qName += ".";
|
||||||
}
|
}
|
||||||
return preemptionDisabled;
|
return preemptionDisabled;
|
||||||
|
|
|
@ -18,29 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Matchers.argThat;
|
|
||||||
import static org.mockito.Matchers.eq;
|
|
||||||
import static org.mockito.Matchers.isA;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.never;
|
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -81,6 +58,25 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
|
LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
|
||||||
|
@ -94,8 +90,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
|
|
||||||
private ResourceCalculator rc = new DefaultResourceCalculator();
|
private ResourceCalculator rc = new DefaultResourceCalculator();
|
||||||
private Clock mClock = null;
|
private Clock mClock = null;
|
||||||
private Configuration conf = null;
|
private CapacitySchedulerConfiguration conf = null;
|
||||||
private CapacitySchedulerConfiguration csConf = null;
|
|
||||||
private CapacityScheduler cs = null;
|
private CapacityScheduler cs = null;
|
||||||
private EventHandler<SchedulerEvent> mDisp = null;
|
private EventHandler<SchedulerEvent> mDisp = null;
|
||||||
private ProportionalCapacityPreemptionPolicy policy = null;
|
private ProportionalCapacityPreemptionPolicy policy = null;
|
||||||
|
@ -107,24 +102,23 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
org.apache.log4j.Logger.getRootLogger().setLevel(
|
org.apache.log4j.Logger.getRootLogger().setLevel(
|
||||||
org.apache.log4j.Level.DEBUG);
|
org.apache.log4j.Level.DEBUG);
|
||||||
|
|
||||||
conf = new Configuration(false);
|
conf = new CapacitySchedulerConfiguration(new Configuration(false));
|
||||||
conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
|
conf.setLong(
|
||||||
conf.setLong(MONITORING_INTERVAL, 3000);
|
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
|
||||||
|
conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
|
||||||
|
3000);
|
||||||
// report "ideal" preempt
|
// report "ideal" preempt
|
||||||
conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
|
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
||||||
conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
|
(float) 1.0);
|
||||||
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
conf.setFloat(
|
||||||
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
|
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
|
||||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
(float) 1.0);
|
||||||
// FairScheduler doesn't support this test,
|
|
||||||
// Set CapacityScheduler as the scheduler for this test.
|
|
||||||
conf.set("yarn.resourcemanager.scheduler.class",
|
|
||||||
CapacityScheduler.class.getName());
|
|
||||||
|
|
||||||
mClock = mock(Clock.class);
|
mClock = mock(Clock.class);
|
||||||
cs = mock(CapacityScheduler.class);
|
cs = mock(CapacityScheduler.class);
|
||||||
when(cs.getResourceCalculator()).thenReturn(rc);
|
when(cs.getResourceCalculator()).thenReturn(rc);
|
||||||
when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
|
when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
|
||||||
|
when(cs.getConfiguration()).thenReturn(conf);
|
||||||
|
|
||||||
nlm = mock(RMNodeLabelsManager.class);
|
nlm = mock(RMNodeLabelsManager.class);
|
||||||
mDisp = mock(EventHandler.class);
|
mDisp = mock(EventHandler.class);
|
||||||
|
@ -134,11 +128,9 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
Dispatcher disp = mock(Dispatcher.class);
|
Dispatcher disp = mock(Dispatcher.class);
|
||||||
when(rmContext.getDispatcher()).thenReturn(disp);
|
when(rmContext.getDispatcher()).thenReturn(disp);
|
||||||
when(disp.getEventHandler()).thenReturn(mDisp);
|
when(disp.getEventHandler()).thenReturn(mDisp);
|
||||||
csConf = new CapacitySchedulerConfiguration();
|
|
||||||
when(cs.getConfiguration()).thenReturn(csConf);
|
|
||||||
when(cs.getRMContext()).thenReturn(rmContext);
|
when(cs.getRMContext()).thenReturn(rmContext);
|
||||||
|
|
||||||
policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
|
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
|
||||||
partitionToResource = new HashMap<>();
|
partitionToResource = new HashMap<>();
|
||||||
nodeIdToSchedulerNodes = new HashMap<>();
|
nodeIdToSchedulerNodes = new HashMap<>();
|
||||||
nameToCSQueues = new HashMap<>();
|
nameToCSQueues = new HashMap<>();
|
||||||
|
@ -576,7 +568,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
"c\t" // app3 in c
|
"c\t" // app3 in c
|
||||||
+ "(1,1,n1,x,20,false)"; // 20x in n1
|
+ "(1,1,n1,x,20,false)"; // 20x in n1
|
||||||
|
|
||||||
csConf.setPreemptionDisabled("root.b", true);
|
conf.setPreemptionDisabled("root.b", true);
|
||||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
policy.editSchedule();
|
policy.editSchedule();
|
||||||
|
|
||||||
|
@ -901,7 +893,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
when(cs.getClusterResource()).thenReturn(clusterResource);
|
when(cs.getClusterResource()).thenReturn(clusterResource);
|
||||||
mockApplications(appsConfig);
|
mockApplications(appsConfig);
|
||||||
|
|
||||||
policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs,
|
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
|
||||||
mClock);
|
mClock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1235,7 +1227,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
|
|
||||||
// Setup preemption disabled
|
// Setup preemption disabled
|
||||||
when(queue.getPreemptionDisabled()).thenReturn(
|
when(queue.getPreemptionDisabled()).thenReturn(
|
||||||
csConf.getPreemptionDisabled(queuePath, false));
|
conf.getPreemptionDisabled(queuePath, false));
|
||||||
|
|
||||||
nameToCSQueues.put(queueName, queue);
|
nameToCSQueues.put(queueName, queue);
|
||||||
when(cs.getQueue(eq(queueName))).thenReturn(queue);
|
when(cs.getQueue(eq(queueName))).thenReturn(queue);
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -81,14 +82,15 @@ public class TestCapacitySchedulerPreemption {
|
||||||
conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
|
conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
|
||||||
|
|
||||||
// Set preemption related configurations
|
// Set preemption related configurations
|
||||||
conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
|
conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
|
||||||
0);
|
0);
|
||||||
conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
|
conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
|
||||||
true);
|
true);
|
||||||
|
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
||||||
|
1.0f);
|
||||||
conf.setFloat(
|
conf.setFloat(
|
||||||
ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
|
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
|
||||||
conf.setFloat(
|
1.0f);
|
||||||
ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
|
|
||||||
mgr = new NullRMNodeLabelsManager();
|
mgr = new NullRMNodeLabelsManager();
|
||||||
mgr.init(this.conf);
|
mgr.init(this.conf);
|
||||||
clock = mock(Clock.class);
|
clock = mock(Clock.class);
|
||||||
|
@ -484,6 +486,10 @@ public class TestCapacitySchedulerPreemption {
|
||||||
.isEmpty());
|
.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Ignore this test now because it could be a premature optimization
|
||||||
|
*/
|
||||||
|
@Ignore
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
|
public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue