YARN-7437. Rename PlacementSet and SchedulingPlacementSet. (Wangda Tan via kkaranasos)

This commit is contained in:
Konstantinos Karanasos 2017-11-09 13:01:14 -08:00
parent a2c150a736
commit ac4d2b1081
25 changed files with 392 additions and 311 deletions

View File

@ -46,9 +46,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -82,8 +82,8 @@ public class AppSchedulingInfo {
private final ConcurrentSkipListSet<SchedulerRequestKey>
schedulerKeys = new ConcurrentSkipListSet<>();
final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
private final Map<SchedulerRequestKey, AppPlacementAllocator<SchedulerNode>>
schedulerKeyToAppPlacementAllocator = new ConcurrentHashMap<>();
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
@ -146,7 +146,7 @@ public class AppSchedulingInfo {
*/
private void clearRequests() {
schedulerKeys.clear();
schedulerKeyToPlacementSets.clear();
schedulerKeyToAppPlacementAllocator.clear();
LOG.info("Application " + applicationId + " requests cleared");
}
@ -190,9 +190,9 @@ public class AppSchedulingInfo {
dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
}
// Update scheduling placement set
// Update AppPlacementAllocator by dedup requests.
offswitchResourcesUpdated =
addToPlacementSets(
addRequestToAppPlacement(
recoverPreemptedRequestForAContainer, dedupRequests);
return offswitchResourcesUpdated;
@ -201,11 +201,11 @@ public class AppSchedulingInfo {
}
}
public void removePlacementSets(SchedulerRequestKey schedulerRequestKey) {
schedulerKeyToPlacementSets.remove(schedulerRequestKey);
public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) {
schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey);
}
boolean addToPlacementSets(
boolean addRequestToAppPlacement(
boolean recoverPreemptedRequestForAContainer,
Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
boolean offswitchResourcesUpdated = false;
@ -213,14 +213,15 @@ public class AppSchedulingInfo {
dedupRequests.entrySet()) {
SchedulerRequestKey schedulerRequestKey = entry.getKey();
if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
schedulerKeyToPlacementSets.put(schedulerRequestKey,
new LocalitySchedulingPlacementSet<>(this));
if (!schedulerKeyToAppPlacementAllocator.containsKey(
schedulerRequestKey)) {
schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
new LocalityAppPlacementAllocator<>(this));
}
// Update placement set
// Update AppPlacementAllocator
ResourceRequestUpdateResult pendingAmountChanges =
schedulerKeyToPlacementSets.get(schedulerRequestKey)
schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey)
.updateResourceRequests(
entry.getValue().values(),
recoverPreemptedRequestForAContainer);
@ -244,7 +245,7 @@ public class AppSchedulingInfo {
if (request.getNumContainers() <= 0) {
if (lastRequestContainers >= 0) {
schedulerKeys.remove(schedulerKey);
schedulerKeyToPlacementSets.remove(schedulerKey);
schedulerKeyToAppPlacementAllocator.remove(schedulerKey);
}
LOG.info("checking for deactivate of application :"
+ this.applicationId);
@ -356,8 +357,9 @@ public class AppSchedulingInfo {
List<ResourceRequest> ret = new ArrayList<>();
try {
this.readLock.lock();
for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
ret.addAll(ps.getResourceRequests().values());
for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
.values()) {
ret.addAll(ap.getResourceRequests().values());
}
} finally {
this.readLock.unlock();
@ -384,8 +386,9 @@ public class AppSchedulingInfo {
String resourceName) {
try {
this.readLock.lock();
SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
return (ps == null) ? PendingAsk.ZERO : ps.getPendingAsk(resourceName);
AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
schedulerKey);
return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName);
} finally {
this.readLock.unlock();
}
@ -424,7 +427,7 @@ public class AppSchedulingInfo {
updateMetricsForAllocatedContainer(type, node, containerAllocated);
}
return schedulerKeyToPlacementSets.get(schedulerKey).allocate(
return schedulerKeyToAppPlacementAllocator.get(schedulerKey).allocate(
schedulerKey, type, node);
} finally {
writeLock.unlock();
@ -442,23 +445,24 @@ public class AppSchedulingInfo {
this.writeLock.lock();
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
.values()) {
PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY);
if (ask.getCount() > 0) {
oldMetrics.decrPendingResources(
ps.getPrimaryRequestedNodePartition(),
ap.getPrimaryRequestedNodePartition(),
user, ask.getCount(), ask.getPerAllocationResource());
newMetrics.incrPendingResources(
ps.getPrimaryRequestedNodePartition(),
ap.getPrimaryRequestedNodePartition(),
user, ask.getCount(), ask.getPerAllocationResource());
Resource delta = Resources.multiply(ask.getPerAllocationResource(),
ask.getCount());
// Update Queue
queue.decPendingResource(
ps.getPrimaryRequestedNodePartition(), delta);
ap.getPrimaryRequestedNodePartition(), delta);
newQueue.incPendingResource(
ps.getPrimaryRequestedNodePartition(), delta);
ap.getPrimaryRequestedNodePartition(), delta);
}
}
oldMetrics.moveAppFrom(this);
@ -477,15 +481,16 @@ public class AppSchedulingInfo {
try {
this.writeLock.lock();
QueueMetrics metrics = queue.getMetrics();
for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
.values()) {
PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY);
if (ask.getCount() > 0) {
metrics.decrPendingResources(ps.getPrimaryRequestedNodePartition(),
metrics.decrPendingResources(ap.getPrimaryRequestedNodePartition(),
user, ask.getCount(), ask.getPerAllocationResource());
// Update Queue
queue.decPendingResource(
ps.getPrimaryRequestedNodePartition(),
ap.getPrimaryRequestedNodePartition(),
Resources.multiply(ask.getPerAllocationResource(),
ask.getCount()));
}
@ -559,11 +564,12 @@ public class AppSchedulingInfo {
SchedulerRequestKey schedulerKey) {
try {
readLock.lock();
SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
if (null == ps) {
AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
schedulerKey);
if (null == ap) {
return false;
}
return ps.canAllocate(type, node);
return ap.canAllocate(type, node);
} finally {
readLock.unlock();
}
@ -593,11 +599,10 @@ public class AppSchedulingInfo {
metrics.incrNodeTypeAggregations(user, type);
}
// Get placement-set by specified schedulerKey
// Now simply return all node of the input clusterPlacementSet
public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet(
// Get AppPlacementAllocator by specified schedulerKey
public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator(
SchedulerRequestKey schedulerkey) {
return (SchedulingPlacementSet<N>) schedulerKeyToPlacementSets.get(
return (AppPlacementAllocator<N>) schedulerKeyToAppPlacementAllocator.get(
schedulerkey);
}
@ -614,9 +619,9 @@ public class AppSchedulingInfo {
SchedulerRequestKey schedulerKey, String resourceName) {
try {
this.readLock.lock();
SchedulingPlacementSet ps =
schedulerKeyToPlacementSets.get(schedulerKey);
return (ps == null) || ps.canDelayTo(resourceName);
AppPlacementAllocator ap =
schedulerKeyToAppPlacementAllocator.get(schedulerKey);
return (ap == null) || ap.canDelayTo(resourceName);
} finally {
this.readLock.unlock();
}
@ -626,9 +631,9 @@ public class AppSchedulingInfo {
String nodePartition, SchedulingMode schedulingMode) {
try {
this.readLock.lock();
SchedulingPlacementSet ps =
schedulerKeyToPlacementSets.get(schedulerKey);
return (ps != null) && ps.acceptNodePartition(nodePartition,
AppPlacementAllocator ap =
schedulerKeyToAppPlacementAllocator.get(schedulerKey);
return (ap != null) && ap.acceptNodePartition(nodePartition,
schedulingMode);
} finally {
this.readLock.unlock();

View File

@ -34,8 +34,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -146,17 +145,17 @@ public class ContainerUpdateContext {
createResourceRequests(rmContainer, schedulerNode,
schedulerKey, resToIncrease);
updateResReqs.put(schedulerKey, resMap);
appSchedulingInfo.addToPlacementSets(false, updateResReqs);
appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs);
}
return true;
}
private void cancelPreviousRequest(SchedulerNode schedulerNode,
SchedulerRequestKey schedulerKey) {
SchedulingPlacementSet<SchedulerNode> schedulingPlacementSet =
appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
if (schedulingPlacementSet != null) {
Map<String, ResourceRequest> resourceRequests = schedulingPlacementSet
AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
if (appPlacementAllocator != null) {
Map<String, ResourceRequest> resourceRequests = appPlacementAllocator
.getResourceRequests();
ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY);
// Decrement the pending using a dummy RR with
@ -290,7 +289,7 @@ public class ContainerUpdateContext {
(rmContainer, node, schedulerKey,
rmContainer.getContainer().getResource());
reqsToUpdate.put(schedulerKey, resMap);
appSchedulingInfo.addToPlacementSets(true, reqsToUpdate);
appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate);
return UNDEFINED;
}
return retVal;

View File

@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.ConcurrentHashMultiset;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang.time.FastDateFormat;
@ -75,14 +74,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@ -91,6 +87,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ConcurrentHashMultiset;
/**
* Represents an application attempt from the viewpoint of the scheduler.
@ -316,9 +313,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
String resourceName) {
try {
readLock.lock();
SchedulingPlacementSet ps = appSchedulingInfo.getSchedulingPlacementSet(
AppPlacementAllocator ap = appSchedulingInfo.getAppPlacementAllocator(
schedulerKey);
return ps == null ? 0 : ps.getOutstandingAsksCount(resourceName);
return ap == null ? 0 : ap.getOutstandingAsksCount(resourceName);
} finally {
readLock.unlock();
}
@ -617,13 +614,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
try {
readLock.lock();
for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
SchedulingPlacementSet ps = getSchedulingPlacementSet(schedulerKey);
if (ps != null &&
ps.getOutstandingAsksCount(ResourceRequest.ANY) > 0) {
AppPlacementAllocator ap = getAppPlacementAllocator(schedulerKey);
if (ap != null &&
ap.getOutstandingAsksCount(ResourceRequest.ANY) > 0) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " headRoom=" + getHeadroom() + " currentConsumption="
+ attemptResourceUsage.getUsed().getMemorySize());
ps.showRequests();
ap.showRequests();
}
}
} finally {
@ -1334,14 +1331,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
this.isAttemptRecovering = isRecovering;
}
public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet(
public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator(
SchedulerRequestKey schedulerRequestKey) {
return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey);
return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey);
}
public Map<String, ResourceRequest> getResourceRequests(
SchedulerRequestKey schedulerRequestKey) {
return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey)
return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey)
.getResourceRequests();
}

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
/**
* Utility for logging scheduler activities
*/
// FIXME: make sure PlacementSet works with this class
// FIXME: make sure CandidateNodeSet works with this class
public class ActivitiesLogger {
private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class);

View File

@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -876,7 +876,7 @@ public abstract class AbstractCSQueue implements CSQueue {
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits resourceLimits,
SchedulingMode schedulingMode) {
return assignContainers(clusterResource, new SimplePlacementSet<>(node),
return assignContainers(clusterResource, new SimpleCandidateNodeSet<>(node),
resourceLimits, schedulingMode);
}

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -46,12 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
/**
* <code>CSQueue</code> represents a node in the tree of
@ -188,15 +185,16 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
/**
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
* @param ps {@link PlacementSet} of nodes which resources are available
* @param candidates {@link CandidateNodeSet} the nodes that are considered
* for the current placement.
* @param resourceLimits how much overall resource of this queue can use.
* @param schedulingMode Type of exclusive check when assign container on a
* NodeManager, see {@link SchedulingMode}.
* @return the assignment
*/
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits,
SchedulingMode schedulingMode);
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits resourceLimits, SchedulingMode schedulingMode);
/**
* A container assigned to the queue has completed.

View File

@ -132,9 +132,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.Lock;
@ -1183,7 +1183,7 @@ public class CapacityScheduler extends
/**
* We need to make sure when doing allocation, Node should be existed
* And we will construct a {@link PlacementSet} before proceeding
* And we will construct a {@link CandidateNodeSet} before proceeding
*/
private void allocateContainersToNode(NodeId nodeId,
boolean withNodeHeartbeat) {
@ -1192,8 +1192,10 @@ public class CapacityScheduler extends
int offswitchCount = 0;
int assignedContainers = 0;
PlacementSet<FiCaSchedulerNode> ps = new SimplePlacementSet<>(node);
CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat);
CandidateNodeSet<FiCaSchedulerNode> candidates =
new SimpleCandidateNodeSet<>(node);
CSAssignment assignment = allocateContainersToNode(candidates,
withNodeHeartbeat);
// Only check if we can allocate more container on the same node when
// scheduling is triggered by node heartbeat
if (null != assignment && withNodeHeartbeat) {
@ -1210,7 +1212,7 @@ public class CapacityScheduler extends
assignedContainers)) {
// Try to see if it is possible to allocate multiple container for
// the same node heartbeat
assignment = allocateContainersToNode(ps, true);
assignment = allocateContainersToNode(candidates, true);
if (null != assignment
&& assignment.getType() == NodeType.OFF_SWITCH) {
@ -1237,8 +1239,9 @@ public class CapacityScheduler extends
/*
* Logics of allocate container on a single node (Old behavior)
*/
private CSAssignment allocateContainerOnSingleNode(PlacementSet<FiCaSchedulerNode> ps,
FiCaSchedulerNode node, boolean withNodeHeartbeat) {
private CSAssignment allocateContainerOnSingleNode(
CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
boolean withNodeHeartbeat) {
// Backward compatible way to make sure previous behavior which allocation
// driven by node heartbeat works.
if (getNode(node.getNodeID()) != node) {
@ -1262,7 +1265,7 @@ public class CapacityScheduler extends
.getApplicationId() + " on node: " + node.getNodeID());
LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
assignment = queue.assignContainers(getClusterResource(), ps,
assignment = queue.assignContainers(getClusterResource(), candidates,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager
@ -1329,14 +1332,16 @@ public class CapacityScheduler extends
+ node.getUnallocatedResource());
}
return allocateOrReserveNewContainers(ps, withNodeHeartbeat);
return allocateOrReserveNewContainers(candidates, withNodeHeartbeat);
}
private CSAssignment allocateOrReserveNewContainers(
PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
CandidateNodeSet<FiCaSchedulerNode> candidates,
boolean withNodeHeartbeat) {
CSAssignment assignment = getRootQueue().assignContainers(
getClusterResource(), ps, new ResourceLimits(labelManager
.getResourceByLabel(ps.getPartition(), getClusterResource())),
getClusterResource(), candidates, new ResourceLimits(labelManager
.getResourceByLabel(candidates.getPartition(),
getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
@ -1346,30 +1351,34 @@ public class CapacityScheduler extends
assignment.getResource(), Resources.none())) {
if (withNodeHeartbeat) {
updateSchedulerHealth(lastNodeUpdateTime,
PlacementSetUtils.getSingleNode(ps).getNodeID(), assignment);
CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(),
assignment);
}
return assignment;
}
// Only do non-exclusive allocation when node has node-labels.
if (StringUtils.equals(ps.getPartition(), RMNodeLabelsManager.NO_LABEL)) {
if (StringUtils.equals(candidates.getPartition(),
RMNodeLabelsManager.NO_LABEL)) {
return null;
}
// Only do non-exclusive allocation when the node-label supports that
try {
if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
ps.getPartition())) {
candidates.getPartition())) {
return null;
}
} catch (IOException e) {
LOG.warn("Exception when trying to get exclusivity of node label=" + ps
LOG.warn(
"Exception when trying to get exclusivity of node label=" + candidates
.getPartition(), e);
return null;
}
// Try to use NON_EXCLUSIVE
assignment = getRootQueue().assignContainers(getClusterResource(), ps,
assignment = getRootQueue().assignContainers(getClusterResource(),
candidates,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager
@ -1386,13 +1395,14 @@ public class CapacityScheduler extends
* New behavior, allocate containers considering multiple nodes
*/
private CSAssignment allocateContainersOnMultiNodes(
PlacementSet<FiCaSchedulerNode> ps) {
CandidateNodeSet<FiCaSchedulerNode> candidates) {
// When this time look at multiple nodes, try schedule if the
// partition has any available resource or killable resource
if (getRootQueue().getQueueCapacities().getUsedCapacity(
ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource(
CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources
.none()) {
candidates.getPartition()) >= 1.0f
&& preemptionManager.getKillableResource(
CapacitySchedulerConfiguration.ROOT, candidates.getPartition())
== Resources.none()) {
if (LOG.isDebugEnabled()) {
LOG.debug("This node or this node partition doesn't have available or"
+ "killable resource");
@ -1400,11 +1410,12 @@ public class CapacityScheduler extends
return null;
}
return allocateOrReserveNewContainers(ps, false);
return allocateOrReserveNewContainers(candidates, false);
}
@VisibleForTesting
CSAssignment allocateContainersToNode(PlacementSet<FiCaSchedulerNode> ps,
CSAssignment allocateContainersToNode(
CandidateNodeSet<FiCaSchedulerNode> candidates,
boolean withNodeHeartbeat) {
if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
.isSchedulerReadyForAllocatingContainers()) {
@ -1413,14 +1424,14 @@ public class CapacityScheduler extends
// Backward compatible way to make sure previous behavior which allocation
// driven by node heartbeat works.
FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
// We have two different logics to handle allocation on single node / multi
// nodes.
if (null != node) {
return allocateContainerOnSingleNode(ps, node, withNodeHeartbeat);
} else {
return allocateContainersOnMultiNodes(ps);
return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat);
} else{
return allocateContainersOnMultiNodes(candidates);
}
}

View File

@ -69,8 +69,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
@ -970,10 +970,10 @@ public class LeafQueue extends AbstractCSQueue {
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
}
private CSAssignment allocateFromReservedContainer(
Resource clusterResource, PlacementSet<FiCaSchedulerNode> ps,
private CSAssignment allocateFromReservedContainer(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
if (null == node) {
return null;
}
@ -987,7 +987,8 @@ public class LeafQueue extends AbstractCSQueue {
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
node.getNodeID(), SystemClock.getInstance().getTime(), application);
CSAssignment assignment = application.assignContainers(clusterResource,
ps, currentResourceLimits, schedulingMode, reservedContainer);
candidates, currentResourceLimits, schedulingMode,
reservedContainer);
return assignment;
}
}
@ -997,43 +998,44 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: partition=" + ps.getPartition()
LOG.debug("assignContainers: partition=" + candidates.getPartition()
+ " #applications=" + orderingPolicy.getNumSchedulableEntities());
}
setPreemptionAllowed(currentResourceLimits, ps.getPartition());
setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
// Check for reserved resources, try to allocate reserved container first.
CSAssignment assignment = allocateFromReservedContainer(clusterResource,
ps, currentResourceLimits, schedulingMode);
candidates, currentResourceLimits, schedulingMode);
if (null != assignment) {
return assignment;
}
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(ps.getPartition())) {
&& !accessibleToPartition(candidates.getPartition())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + ps
ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + candidates
.getPartition());
return CSAssignment.NULL_ASSIGNMENT;
}
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
if (!hasPendingResourceRequest(ps.getPartition(), clusterResource,
if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource,
schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + ps.getPartition());
+ schedulingMode.name() + " node-partition=" + candidates
.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
@ -1078,7 +1080,8 @@ public class LeafQueue extends AbstractCSQueue {
cachedUserLimit = cul.userLimit;
}
Resource userLimit = computeUserLimitAndSetHeadroom(application,
clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit);
clusterResource, candidates.getPartition(), schedulingMode,
cachedUserLimit);
if (cul == null) {
cul = new CachedUserLimit(userLimit);
userLimits.put(application.getUser(), cul);
@ -1106,7 +1109,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to schedule
assignment = application.assignContainers(clusterResource,
ps, currentResourceLimits, schedulingMode, null);
candidates, currentResourceLimits, schedulingMode, null);
if (LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application " + application

View File

@ -65,8 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@ -479,16 +479,16 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits,
SchedulingMode schedulingMode) {
FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits resourceLimits, SchedulingMode schedulingMode) {
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(ps.getPartition())) {
&& !accessibleToPartition(candidates.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it is not able to access partition=" + ps
+ ", because it is not able to access partition=" + candidates
.getPartition());
}
@ -506,12 +506,12 @@ public class ParentQueue extends AbstractCSQueue {
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
if (!super.hasPendingResourceRequest(ps.getPartition(), clusterResource,
schedulingMode)) {
if (!super.hasPendingResourceRequest(candidates.getPartition(),
clusterResource, schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + ps
+ schedulingMode.name() + " node-partition=" + candidates
.getPartition());
}
@ -538,7 +538,8 @@ public class ParentQueue extends AbstractCSQueue {
// Are we over maximum-capacity for this queue?
// This will also consider parent's limits and also continuous reservation
// looking
if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(),
if (!super.canAssignToThisQueue(clusterResource,
candidates.getPartition(),
resourceLimits, Resources
.createResource(getMetrics().getReservedMB(),
getMetrics().getReservedVirtualCores()), schedulingMode)) {
@ -556,7 +557,7 @@ public class ParentQueue extends AbstractCSQueue {
// Schedule
CSAssignment assignedToChild = assignContainersToChildQueues(
clusterResource, ps, resourceLimits, schedulingMode);
clusterResource, candidates, resourceLimits, schedulingMode);
assignment.setType(assignedToChild.getType());
assignment.setRequestLocalityType(
assignedToChild.getRequestLocalityType());
@ -710,7 +711,7 @@ public class ParentQueue extends AbstractCSQueue {
}
private CSAssignment assignContainersToChildQueues(Resource cluster,
PlacementSet<FiCaSchedulerNode> ps, ResourceLimits limits,
CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits limits,
SchedulingMode schedulingMode) {
CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
@ -719,7 +720,7 @@ public class ParentQueue extends AbstractCSQueue {
// Try to assign to most 'under-served' sub-queue
for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(
ps.getPartition()); iter.hasNext(); ) {
candidates.getPartition()); iter.hasNext(); ) {
CSQueue childQueue = iter.next();
if(LOG.isDebugEnabled()) {
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
@ -729,10 +730,10 @@ public class ParentQueue extends AbstractCSQueue {
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, parentLimits,
ps.getPartition());
candidates.getPartition());
CSAssignment childAssignment = childQueue.assignContainers(cluster, ps,
childLimits, schedulingMode);
CSAssignment childAssignment = childQueue.assignContainers(cluster,
candidates, childLimits, schedulingMode);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocat
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -34,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -176,13 +175,14 @@ public abstract class AbstractContainerAllocator {
* </ul>
*
* @param clusterResource clusterResource
* @param ps PlacementSet
* @param candidates CandidateNodeSet
* @param schedulingMode scheduling mode (exclusive or nonexclusive)
* @param resourceLimits resourceLimits
* @param reservedContainer reservedContainer
* @return CSAssignemnt proposal
*/
public abstract CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, RMContainer reservedContainer);
CandidateNodeSet<FiCaSchedulerNode> candidates,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
RMContainer reservedContainer);
}

View File

@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocat
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerAllocator extends AbstractContainerAllocator {
private AbstractContainerAllocator regularContainerAllocator;
@ -50,10 +48,11 @@ public class ContainerAllocator extends AbstractContainerAllocator {
@Override
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, RMContainer reservedContainer) {
CandidateNodeSet<FiCaSchedulerNode> candidates,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
RMContainer reservedContainer) {
return regularContainerAllocator.assignContainers(clusterResource,
ps, schedulingMode, resourceLimits, reservedContainer);
candidates, schedulingMode, resourceLimits, reservedContainer);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
@ -50,9 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -91,15 +91,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
/*
* Pre-check if we can allocate a pending resource request
* (given schedulerKey) to a given PlacementSet.
* (given schedulerKey) to a given CandidateNodeSet.
* We will consider stuffs like exclusivity, pending resource, node partition,
* headroom, etc.
*/
private ContainerAllocation preCheckForPlacementSet(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
private ContainerAllocation preCheckForNodeCandidateSet(
Resource clusterResource, CandidateNodeSet<FiCaSchedulerNode> candidates,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
SchedulerRequestKey schedulerKey) {
Priority priority = schedulerKey.getPriority();
FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
ResourceRequest.ANY);
@ -164,7 +165,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
if (!checkHeadroom(clusterResource, resourceLimits, required,
ps.getPartition())) {
candidates.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot allocate required resource=" + required
+ " because of headroom");
@ -182,7 +183,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Only do this when request associated with given scheduler key accepts
// NO_LABEL under RESPECT_EXCLUSIVITY mode
if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL,
appInfo.getSchedulingPlacementSet(schedulerKey)
appInfo.getAppPlacementAllocator(schedulerKey)
.getPrimaryRequestedNodePartition())) {
missedNonPartitionedRequestSchedulingOpportunity =
application.addMissedNonPartitionedRequestSchedulingOpportunity(
@ -265,7 +266,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
SchedulerRequestKey schedulerKey, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources = Math.max(
application.getSchedulingPlacementSet(schedulerKey)
application.getAppPlacementAllocator(schedulerKey)
.getUniqueLocationAsks() - 1, 0);
// waitFactor can't be more than '1'
@ -780,15 +781,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
private ContainerAllocation allocate(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer) {
CandidateNodeSet<FiCaSchedulerNode> candidates,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
SchedulerRequestKey schedulerKey, RMContainer reservedContainer) {
// Do checks before determining which node to allocate
// Directly return if this check fails.
ContainerAllocation result;
if (reservedContainer == null) {
result = preCheckForPlacementSet(clusterResource, ps, schedulingMode,
resourceLimits, schedulerKey);
result = preCheckForNodeCandidateSet(clusterResource, candidates,
schedulingMode, resourceLimits, schedulerKey);
if (null != result) {
return result;
}
@ -801,14 +802,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
}
SchedulingPlacementSet<FiCaSchedulerNode> schedulingPS =
application.getAppSchedulingInfo().getSchedulingPlacementSet(
AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
application.getAppSchedulingInfo().getAppPlacementAllocator(
schedulerKey);
result = ContainerAllocation.PRIORITY_SKIPPED;
Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
ps);
candidates);
while (iter.hasNext()) {
FiCaSchedulerNode node = iter.next();
@ -827,19 +828,20 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
@Override
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits,
CandidateNodeSet<FiCaSchedulerNode> candidates,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
RMContainer reservedContainer) {
FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
if (reservedContainer == null) {
// Check if application needs more resource, skip if it doesn't need more.
if (!application.hasPendingResourceRequest(rc,
ps.getPartition(), clusterResource, schedulingMode)) {
candidates.getPartition(), clusterResource, schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-label=" + ps.getPartition());
+ schedulingMode.name() + " node-label=" + candidates
.getPartition());
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, application.getPriority(),
@ -849,9 +851,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Schedule in priority order
for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
ContainerAllocation result =
allocate(clusterResource, ps, schedulingMode, resourceLimits,
schedulerKey, null);
ContainerAllocation result = allocate(clusterResource, candidates,
schedulingMode, resourceLimits, schedulerKey, null);
AllocationState allocationState = result.getAllocationState();
if (allocationState == AllocationState.PRIORITY_SKIPPED) {
@ -869,7 +870,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
return CSAssignment.SKIP_ASSIGNMENT;
} else {
ContainerAllocation result =
allocate(clusterResource, ps, schedulingMode, resourceLimits,
allocate(clusterResource, candidates, schedulingMode, resourceLimits,
reservedContainer.getReservedSchedulerKey(), reservedContainer);
return getCSAssignmentFromAllocateResult(clusterResource, result,
reservedContainer, node);

View File

@ -76,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -224,10 +224,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return null;
}
SchedulingPlacementSet<FiCaSchedulerNode> ps =
appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
AppPlacementAllocator<FiCaSchedulerNode> ps =
appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
if (null == ps) {
LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName()
LOG.warn("Failed to get " + AppPlacementAllocator.class.getName()
+ " for application=" + getApplicationId() + " schedulerRequestKey="
+ schedulerKey);
return null;
@ -636,8 +636,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
Map<String, Resource> ret = new HashMap<>();
for (SchedulerRequestKey schedulerKey : appSchedulingInfo
.getSchedulerKeys()) {
SchedulingPlacementSet<FiCaSchedulerNode> ps =
appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
AppPlacementAllocator<FiCaSchedulerNode> ps =
appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
String nodePartition = ps.getPrimaryRequestedNodePartition();
Resource res = ret.get(nodePartition);
@ -844,8 +844,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode, RMContainer reservedContainer) {
CandidateNodeSet<FiCaSchedulerNode> ps,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode,
RMContainer reservedContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("pre-assignContainers for application "
+ getApplicationId());
@ -962,9 +963,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
@Override
@SuppressWarnings("unchecked")
public SchedulingPlacementSet<FiCaSchedulerNode> getSchedulingPlacementSet(
public AppPlacementAllocator<FiCaSchedulerNode> getAppPlacementAllocator(
SchedulerRequestKey schedulerRequestKey) {
return super.getSchedulingPlacementSet(schedulerRequestKey);
return super.getAppPlacementAllocator(schedulerRequestKey);
}
/**

View File

@ -1019,7 +1019,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
if (offswitchAsk.getCount() > 0) {
if (getSchedulingPlacementSet(schedulerKey).getUniqueLocationAsks()
if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
<= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()

View File

@ -32,26 +32,29 @@ import java.util.Map;
/**
* <p>
* Comparing to {@link PlacementSet}, this also maintains
* pending ResourceRequests:
* - When new ResourceRequest(s) added to scheduler, or,
* - Or new container allocated, scheduler can notify corresponding
* PlacementSet.
* This class has the following functionality:
* 1) Keeps track of pending resource requests when following events happen:
* - New ResourceRequests are added to scheduler.
* - New containers get allocated.
*
* 2) Determines the order that the nodes given in the {@link CandidateNodeSet}
* will be used for allocating containers.
* </p>
*
* <p>
* Different set of resource requests (E.g., resource requests with the
* same schedulerKey) can have one instance of PlacementSet, each PlacementSet
* can have different ways to order nodes depends on requests.
* And different set of resource requests (E.g., resource requests with the
* same schedulerKey) can have one instance of AppPlacementAllocator, each
* AppPlacementAllocator can have different ways to order nodes depends on
* requests.
* </p>
*/
public interface SchedulingPlacementSet<N extends SchedulerNode> {
public interface AppPlacementAllocator<N extends SchedulerNode> {
/**
* Get iterator of preferred node depends on requirement and/or availability
* @param clusterPlacementSet input cluster PlacementSet
* @param candidateNodeSet input CandidateNodeSet
* @return iterator of preferred node
*/
Iterator<N> getPreferredNodeIterator(PlacementSet<N> clusterPlacementSet);
Iterator<N> getPreferredNodeIterator(CandidateNodeSet<N> candidateNodeSet);
/**
* Replace existing ResourceRequest by the new requests
@ -115,8 +118,9 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> {
/**
* Can delay to give locality?
* TODO (wangda): This should be moved out of SchedulingPlacementSet
* TODO: This should be moved out of AppPlacementAllocator
* and should belong to specific delay scheduling policy impl.
* See YARN-7457 for more details.
*
* @param resourceName resourceName
* @return can/cannot
@ -124,7 +128,7 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> {
boolean canDelayTo(String resourceName);
/**
* Does this {@link SchedulingPlacementSet} accept resources on nodePartition?
* Does this {@link AppPlacementAllocator} accept resources on nodePartition?
*
* @param nodePartition nodePartition
* @param schedulingMode schedulingMode
@ -146,8 +150,9 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> {
* @return number of unique location asks with #pending greater than 0,
* (like /rack1, host1, etc.).
*
* TODO (wangda): This should be moved out of SchedulingPlacementSet
* TODO: This should be moved out of AppPlacementAllocator
* and should belong to specific delay scheduling policy impl.
* See YARN-7457 for more details.
*/
int getUniqueLocationAsks();

View File

@ -23,42 +23,38 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import java.util.Iterator;
import java.util.Map;
/**
* <p>
* PlacementSet is the central place that decide the order of node to fit
* asks by application.
* </p>
* A group of nodes which can be allocated by scheduler.
*
* <p>
* Also, PlacementSet can cache results (for example, ordered list) for
* better performance.
* </p>
* It will have following part:
*
* <p>
* PlacementSet can depend on one or more other PlacementSets.
* </p>
* 1) A map of nodes which can be schedule-able.
* 2) Version of the node set, version should be updated if any node added or
* removed from the node set. This will be used by
* {@link AppPlacementAllocator} or other class to check if it's required to
* invalidate local caches, etc.
* 3) Node partition of the candidate set.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface PlacementSet<N extends SchedulerNode> {
public interface CandidateNodeSet<N extends SchedulerNode> {
/**
* Get all nodes for this PlacementSet
* @return all nodes for this PlacementSet
* Get all nodes for this CandidateNodeSet
* @return all nodes for this CandidateNodeSet
*/
Map<NodeId, N> getAllNodes();
/**
* Version of the PlacementSet, can help other PlacementSet with dependencies
* deciding if update is required
* Version of the CandidateNodeSet, can help {@link AppPlacementAllocator} to
* decide if update is required
* @return version
*/
long getVersion();
/**
* Partition of the PlacementSet.
* Node partition of the node set.
* @return node partition
*/
String getPartition();

View File

@ -20,15 +20,23 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
public class PlacementSetUtils {
/*
* If the {@link PlacementSet} only has one entry, return it. otherwise
* return null
/**
* Utility methods for {@link CandidateNodeSet}.
*/
public static <N extends SchedulerNode> N getSingleNode(PlacementSet<N> ps) {
public final class CandidateNodeSetUtils {
private CandidateNodeSetUtils() {
}
/*
* If the {@link CandidateNodeSet} only has one entry, return it. Otherwise,
* return null.
*/
public static <N extends SchedulerNode> N getSingleNode(
CandidateNodeSet<N> candidates) {
N node = null;
if (1 == ps.getAllNodes().size()) {
node = ps.getAllNodes().values().iterator().next();
if (1 == candidates.getAllNodes().size()) {
node = candidates.getAllNodes().values().iterator().next();
}
return node;

View File

@ -39,10 +39,15 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
implements SchedulingPlacementSet<N> {
/**
* This is an implementation of the {@link AppPlacementAllocator} that takes
* into account locality preferences (node, rack, any) when allocating
* containers.
*/
public class LocalityAppPlacementAllocator<N extends SchedulerNode>
implements AppPlacementAllocator<N> {
private static final Log LOG =
LogFactory.getLog(LocalitySchedulingPlacementSet.class);
LogFactory.getLog(LocalityAppPlacementAllocator.class);
private final Map<String, ResourceRequest> resourceRequestMap =
new ConcurrentHashMap<>();
@ -53,7 +58,7 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
public LocalitySchedulingPlacementSet(AppSchedulingInfo info) {
public LocalityAppPlacementAllocator(AppSchedulingInfo info) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@ -63,11 +68,12 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
@Override
@SuppressWarnings("unchecked")
public Iterator<N> getPreferredNodeIterator(
PlacementSet<N> clusterPlacementSet) {
// Now only handle the case that single node in placementSet
// TODO, Add support to multi-hosts inside placement-set which is passed in.
CandidateNodeSet<N> candidateNodeSet) {
// Now only handle the case that single node in the candidateNodeSet
// TODO, Add support to multi-hosts inside candidateNodeSet which is passed
// in.
N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet);
N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
if (null != singleNode) {
return IteratorUtils.singletonIterator(singleNode);
}
@ -213,7 +219,7 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
appSchedulingInfo.checkForDeactivation();
resourceRequestMap.remove(ResourceRequest.ANY);
if (resourceRequestMap.isEmpty()) {
appSchedulingInfo.removePlacementSets(schedulerRequestKey);
appSchedulingInfo.removeAppPlacement(schedulerRequestKey);
}
}

View File

@ -22,24 +22,22 @@ import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
/**
* A simple PlacementSet which keeps an unordered map
* A simple CandidateNodeSet which keeps an unordered map
*/
public class SimplePlacementSet<N extends SchedulerNode>
implements PlacementSet<N> {
public class SimpleCandidateNodeSet<N extends SchedulerNode>
implements CandidateNodeSet<N> {
private Map<NodeId, N> map;
private String partition;
public SimplePlacementSet(N node) {
public SimpleCandidateNodeSet(N node) {
if (null != node) {
// Only one node in the initial PlacementSet
// Only one node in the initial CandidateNodeSet
this.map = ImmutableMap.of(node.getNodeID(), node);
this.partition = node.getPartition();
} else {
@ -48,7 +46,7 @@ public class SimplePlacementSet<N extends SchedulerNode>
}
}
public SimplePlacementSet(Map<NodeId, N> map, String partition) {
public SimpleCandidateNodeSet(Map<NodeId, N> map, String partition) {
this.map = map;
this.partition = partition;
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
* contains classes related to application monitor.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -140,7 +140,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@ -4199,7 +4199,8 @@ public class TestCapacityScheduler {
scheduler.handle(new NodeRemovedSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
// schedulerNode is removed, try allocate a container
scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true);
scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node),
true);
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(

View File

@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -147,9 +147,9 @@ public class TestChildQueueOrder {
// Next call - nothing
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue)
.assignContainers(eq(clusterResource), any(PlacementSet.class),
any(ResourceLimits.class), any(SchedulingMode.class));
when(queue).assignContainers(eq(clusterResource),
any(CandidateNodeSet.class), any(ResourceLimits.class),
any(SchedulingMode.class));
// Mock the node's resource availability
Resource available = node.getUnallocatedResource();
@ -159,9 +159,9 @@ public class TestChildQueueOrder {
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
any(ResourceLimits.class), any(SchedulingMode.class));
}).when(queue).assignContainers(eq(clusterResource),
any(CandidateNodeSet.class), any(ResourceLimits.class),
any(SchedulingMode.class));
doNothing().when(node).releaseContainer(any(ContainerId.class),
anyBoolean());
}
@ -425,10 +425,10 @@ public class TestChildQueueOrder {
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(d,b);
allocationOrder.verify(d).assignContainers(eq(clusterResource),
any(PlacementSet.class), any(ResourceLimits.class),
any(CandidateNodeSet.class), any(ResourceLimits.class),
any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(PlacementSet.class), any(ResourceLimits.class),
any(CandidateNodeSet.class), any(ResourceLimits.class),
any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);

View File

@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
@ -88,13 +88,14 @@ public class TestContainerResizing {
@Override
public CSAssignment allocateContainersToNode(
PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
CandidateNodeSet<FiCaSchedulerNode> candidates,
boolean withNodeHeartbeat) {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
LOG.debug("Thread interrupted.");
}
return super.allocateContainersToNode(ps, withNodeHeartbeat);
return super.allocateContainersToNode(candidates, withNodeHeartbeat);
}
}

View File

@ -656,7 +656,7 @@ public class TestNodeLabelContainerAllocation {
if (key.getPriority().getPriority() == priority) {
Assert.assertEquals("Expected partition is " + expectedPartition,
expectedPartition,
info.getSchedulingPlacementSet(key)
info.getAppPlacementAllocator(key)
.getPrimaryRequestedNodePartition());
}
}

View File

@ -29,7 +29,6 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import java.util.HashMap;
@ -54,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -181,8 +180,9 @@ public class TestParentQueue {
// Next call - nothing
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).when(queue)
.assignContainers(eq(clusterResource), any(PlacementSet.class),
any(ResourceLimits.class), any(SchedulingMode.class));
.assignContainers(eq(clusterResource),
any(CandidateNodeSet.class), any(ResourceLimits.class),
any(SchedulingMode.class));
// Mock the node's resource availability
Resource available = node.getUnallocatedResource();
@ -192,8 +192,9 @@ public class TestParentQueue {
return new CSAssignment(allocatedResource, type);
}
}).when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
any(ResourceLimits.class), any(SchedulingMode.class));
}).when(queue).assignContainers(eq(clusterResource),
any(CandidateNodeSet.class), any(ResourceLimits.class),
any(SchedulingMode.class));
}
private float computeQueueAbsoluteUsedCapacity(CSQueue queue,
@ -274,13 +275,14 @@ public class TestParentQueue {
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(),
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -294,9 +296,11 @@ public class TestParentQueue {
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -308,9 +312,11 @@ public class TestParentQueue {
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@ -326,9 +332,11 @@ public class TestParentQueue {
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@ -548,21 +556,24 @@ public class TestParentQueue {
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
applyAllocationToQueue(clusterResource, 1*GB, a);
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
applyAllocationToQueue(clusterResource, 1 * GB, a);
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
applyAllocationToQueue(clusterResource, 2*GB, root);
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
applyAllocationToQueue(clusterResource, 2 * GB, root);
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
applyAllocationToQueue(clusterResource, 2*GB, b);
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
@ -587,23 +598,27 @@ public class TestParentQueue {
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
applyAllocationToQueue(clusterResource, 2*GB, a);
root.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
applyAllocationToQueue(clusterResource, 2*GB, b);
root.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@ -720,12 +735,14 @@ public class TestParentQueue {
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -738,9 +755,11 @@ public class TestParentQueue {
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -801,9 +820,11 @@ public class TestParentQueue {
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@ -816,9 +837,11 @@ public class TestParentQueue {
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
any(CandidateNodeSet.class), anyResourceLimits(),
any(SchedulingMode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);