YARN-2920. Changed CapacityScheduler to kill containers on nodes where node labels are changed. Contributed by Wangda Tan

(cherry picked from commit fdf042dfff)

(cherry picked from commit 411836b74c)
This commit is contained in:
Jian He 2014-12-22 16:50:15 -08:00 committed by Vinod Kumar Vavilapalli
parent 2073fc0f84
commit 88f022da24
17 changed files with 414 additions and 45 deletions

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.UpdatedContainerInfo;
@ -162,7 +163,7 @@ public class NodeInfo {
@Override
public Set<String> getNodeLabels() {
return null;
return RMNodeLabelsManager.EMPTY_STRING_SET;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.UpdatedContainerInfo;
@ -150,6 +151,6 @@ public class RMNodeWrapper implements RMNode {
@Override
public Set<String> getNodeLabels() {
return null;
return RMNodeLabelsManager.EMPTY_STRING_SET;
}
}

View File

@ -171,6 +171,9 @@ Release 2.6.1 - UNRELEASED
YARN-3733. Fix DominantRC#compare() does not work as expected if
cluster resource is empty. (Rohith Sharmaks via wangda)
YARN-2920. Changed CapacityScheduler to kill containers on nodes where
node labels are changed. (Wangda Tan via jianhe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -430,6 +430,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmContext.setAMFinishingMonitor(amFinishingMonitor);
RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
rmContext.setNodeLabelManager(nlm);

View File

@ -35,7 +35,10 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;
@ -57,6 +60,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
new ConcurrentHashMap<String, Queue>();
protected AccessControlList adminAcl;
private RMContext rmContext = null;
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
@ -331,6 +336,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
return map;
}
@SuppressWarnings("unchecked")
private void updateResourceMappings(Map<String, Host> before,
Map<String, Host> after) {
// Get NMs in before only
@ -341,6 +347,10 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
for (Entry<String, Host> entry : after.entrySet()) {
allNMs.addAll(entry.getValue().nms.keySet());
}
// Map used to notify RM
Map<NodeId, Set<String>> newNodeToLabelsMap =
new HashMap<NodeId, Set<String>>();
// traverse all nms
for (NodeId nodeId : allNMs) {
@ -379,6 +389,9 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
Node newNM;
if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) {
Set<String> newLabels = getLabelsByNode(nodeId, after);
newNodeToLabelsMap.put(nodeId, ImmutableSet.copyOf(newLabels));
// no label in the past
if (newLabels.isEmpty()) {
// update labels
@ -405,6 +418,12 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
}
}
// Notify RM
if (rmContext != null && rmContext.getDispatcher() != null) {
rmContext.getDispatcher().getEventHandler().handle(
new NodeLabelsUpdateSchedulerEvent(newNodeToLabelsMap));
}
}
public Resource getResourceByLabel(String label, Resource clusterResource) {
@ -452,4 +471,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
return false;
}
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -860,9 +861,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public Set<String> getNodeLabels() {
if (context.getNodeLabelManager() == null) {
RMNodeLabelsManager nlm = context.getNodeLabelManager();
if (nlm == null || nlm.getLabelsOnNode(nodeId) == null) {
return CommonNodeLabelsManager.EMPTY_STRING_SET;
}
return context.getNodeLabelManager().getLabelsOnNode(nodeId);
return nlm.getLabelsOnNode(nodeId);
}
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,11 +34,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
@ -61,8 +65,11 @@ public abstract class SchedulerNode {
private final RMNode rmNode;
private final String nodeName;
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
private volatile Set<String> labels = null;
public SchedulerNode(RMNode node, boolean usePortForNodeName,
Set<String> labels) {
this.rmNode = node;
this.availableResource = Resources.clone(node.getTotalCapability());
this.totalResourceCapability = Resources.clone(node.getTotalCapability());
@ -71,6 +78,11 @@ public abstract class SchedulerNode {
} else {
nodeName = rmNode.getHostName();
}
this.labels = ImmutableSet.copyOf(labels);
}
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
}
public RMNode getRMNode() {
@ -275,4 +287,12 @@ public abstract class SchedulerNode {
}
allocateContainer(rmContainer);
}
public Set<String> getLabels() {
return labels;
}
public void updateLabels(Set<String> labels) {
this.labels = labels;
}
}

View File

@ -447,4 +447,9 @@ public abstract class AbstractCSQueue implements CSQueue {
public Map<QueueACL, AccessControlList> getACLs() {
return acls;
}
@Private
public Resource getUsedResourceByLabel(String nodeLabel) {
return usedResourcesByNodeLabels.get(nodeLabel);
}
}

View File

@ -143,6 +143,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public Resource getUsedResources();
/**
* Get the currently utilized resources which allocated at nodes with label
* specified
*
* @return used resources by the queue and it's children
*/
public Resource getUsedResourceByLabel(String nodeLabel);
/**
* Get the current run-state of the queue
* @return current run-state

View File

@ -25,15 +25,15 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -48,12 +48,15 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -79,12 +82,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -93,6 +99,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -106,11 +113,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
@ -966,6 +968,51 @@ public class CapacityScheduler extends
updateNodeResource(nm, resourceOption);
root.updateClusterResource(clusterResource);
}
/**
* Process node labels update on a node.
*
* TODO: Currently capacity scheduler will kill containers on a node when
* labels on the node changed. It is a simply solution to ensure guaranteed
* capacity on labels of queues. When YARN-2498 completed, we can let
* preemption policy to decide if such containers need to be killed or just
* keep them running.
*/
private synchronized void updateLabelsOnNode(NodeId nodeId,
Set<String> newLabels) {
FiCaSchedulerNode node = nodes.get(nodeId);
if (null == node) {
return;
}
// labels is same, we don't need do update
if (node.getLabels().size() == newLabels.size()
&& node.getLabels().containsAll(newLabels)) {
return;
}
// Kill running containers since label is changed
for (RMContainer rmContainer : node.getRunningContainers()) {
ContainerId containerId = rmContainer.getContainerId();
completedContainer(rmContainer,
ContainerStatus.newInstance(containerId,
ContainerState.COMPLETE,
String.format(
"Container=%s killed since labels on the node=%s changed",
containerId.toString(), nodeId.toString()),
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
}
// Unreserve container on this node
RMContainer reservedContainer = node.getReservedContainer();
if (null != reservedContainer) {
dropContainerReservation(reservedContainer);
}
// Update node labels after we've done this
node.updateLabels(newLabels);
}
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
@ -1049,6 +1096,19 @@ public class CapacityScheduler extends
nodeResourceUpdatedEvent.getResourceOption());
}
break;
case NODE_LABELS_UPDATE:
{
NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
(NodeLabelsUpdateSchedulerEvent) event;
for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
.getUpdatedNodeToLabels().entrySet()) {
NodeId id = entry.getKey();
Set<String> labels = entry.getValue();
updateLabelsOnNode(id, labels);
}
}
break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
@ -1117,14 +1177,8 @@ public class CapacityScheduler extends
}
private synchronized void addNode(RMNode nodeManager) {
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
usePortForNodeName));
usePortForNodeName, nodeManager.getNodeLabels()));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
int numNodes = numNodeManagers.incrementAndGet();
@ -1135,6 +1189,12 @@ public class CapacityScheduler extends
if (scheduleAsynchronously && numNodes == 1) {
asyncSchedulerThread.beginSchedule();
}
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
}
private synchronized void removeNode(RMNode nodeInfo) {

View File

@ -809,7 +809,7 @@ public class LeafQueue extends AbstractCSQueue {
// if our queue cannot access this node, just return
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
labelManager.getLabelsOnNode(node.getNodeID()))) {
node.getLabels())) {
return NULL_ASSIGNMENT;
}
@ -878,7 +878,7 @@ public class LeafQueue extends AbstractCSQueue {
// Check queue max-capacity limit
if (!canAssignToThisQueue(clusterResource, required,
labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
node.getLabels(), application, true)) {
return NULL_ASSIGNMENT;
}
@ -911,7 +911,7 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
labelManager.getLabelsOnNode(node.getNodeID()));
node.getLabels());
// Don't reset scheduling opportunities for non-local assignments
// otherwise the app will be delayed for each non-local assignment.
@ -1561,7 +1561,7 @@ public class LeafQueue extends AbstractCSQueue {
// check if the resource request can access the label
if (!SchedulerUtils.checkNodeLabelExpression(
labelManager.getLabelsOnNode(node.getNodeID()),
node.getLabels(),
request.getNodeLabelExpression())) {
// this is a reserved container, but we cannot allocate it now according
// to label not match. This can be caused by node label changed
@ -1752,8 +1752,7 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
if (removed) {
releaseResource(clusterResource, application,
container.getResource(),
labelManager.getLabelsOnNode(node.getNodeID()));
container.getResource(), node.getLabels());
LOG.info("completedContainer" +
" container=" + container +
" queue=" + this +
@ -1950,9 +1949,10 @@ public class LeafQueue extends AbstractCSQueue {
}
// Careful! Locking order is important!
synchronized (this) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer
.getContainer().getNodeId()));
.getResource(), node.getLabels());
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
@ -1989,9 +1989,10 @@ public class LeafQueue extends AbstractCSQueue {
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer
.getContainer().getNodeId()));
.getResource(), node.getLabels());
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@ -2006,9 +2007,10 @@ public class LeafQueue extends AbstractCSQueue {
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer()
.getNodeId()));
.getResource(), node.getLabels());
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()

View File

@ -73,6 +73,7 @@ public class ParentQueue extends AbstractCSQueue {
private final boolean rootQueue;
final Comparator<CSQueue> queueComparator;
volatile int numApplications;
private final CapacitySchedulerContext scheduler;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -80,7 +81,7 @@ public class ParentQueue extends AbstractCSQueue {
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.queueComparator = cs.getQueueComparator();
this.rootQueue = (parent == null);
@ -420,10 +421,10 @@ public class ParentQueue extends AbstractCSQueue {
Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
Set<String> nodeLabels = node.getLabels();
// if our queue cannot access this node, just return
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
labelManager.getLabelsOnNode(node.getNodeID()))) {
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
return assignment;
}
@ -434,7 +435,6 @@ public class ParentQueue extends AbstractCSQueue {
}
boolean localNeedToUnreserve = false;
Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID());
// Are we over maximum-capacity for this queue?
if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
@ -641,7 +641,7 @@ public class ParentQueue extends AbstractCSQueue {
// Book keeping
synchronized (this) {
super.releaseResource(clusterResource, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(node.getNodeID()));
.getResource(), node.getLabels());
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@ -703,9 +703,10 @@ public class ParentQueue extends AbstractCSQueue {
}
// Careful! Locking order is important!
synchronized (this) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer
.getContainer().getNodeId()));
.getResource(), node.getLabels());
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
@ -730,9 +731,10 @@ public class ParentQueue extends AbstractCSQueue {
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer
.getContainer().getNodeId()));
.getResource(), node.getLabels());
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@ -748,9 +750,11 @@ public class ParentQueue extends AbstractCSQueue {
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(),
labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId()));
node.getLabels());
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="

View File

@ -19,12 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -32,9 +34,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
Set<String> nodeLabels) {
super(node, usePortForNodeName, nodeLabels);
}
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
super(node, usePortForNodeName);
this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
}
@Override

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeId;
public class NodeLabelsUpdateSchedulerEvent extends SchedulerEvent {
private Map<NodeId, Set<String>> nodeToLabels;
public NodeLabelsUpdateSchedulerEvent(Map<NodeId, Set<String>> nodeToLabels) {
super(SchedulerEventType.NODE_LABELS_UPDATE);
this.nodeToLabels = nodeToLabels;
}
public Map<NodeId, Set<String>> getUpdatedNodeToLabels() {
return nodeToLabels;
}
}

View File

@ -25,6 +25,7 @@ public enum SchedulerEventType {
NODE_REMOVED,
NODE_UPDATE,
NODE_RESOURCE_UPDATE,
NODE_LABELS_UPDATE,
// Source: RMApp
APP_ADDED,

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -206,7 +207,7 @@ public class MockNodes {
@Override
public Set<String> getNodeLabels() {
return null;
return RMNodeLabelsManager.EMPTY_STRING_SET;
}
};

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
public class TestCapacitySchedulerNodeLabelUpdate {
private final int GB = 1024;
private YarnConfiguration conf;
RMNodeLabelsManager mgr;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new MemoryRMNodeLabelsManager();
mgr.init(conf);
}
private Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 100);
conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z"));
conf.setCapacityByLabel(A, "x", 100);
conf.setCapacityByLabel(A, "y", 100);
conf.setCapacityByLabel(A, "z", 100);
return conf;
}
private Set<String> toSet(String... elements) {
Set<String> set = Sets.newHashSet(elements);
return set;
}
private void checkUsedResource(MockRM rm, String queueName, int memory) {
checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
}
private void checkUsedResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queueName);
Assert.assertEquals(memory, queue.getUsedResourceByLabel(label).getMemory());
}
@Test (timeout = 30000)
public void testNodeUpdate() throws Exception {
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 8000);
MockNM nm2 = rm.registerNode("h2:1234", 8000);
MockNM nm3 = rm.registerNode("h3:1234", 8000);
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm.submitApp(GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
// request a container.
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
// queue-a used x=1G, ""=1G
checkUsedResource(rm, "a", 1024, "x");
checkUsedResource(rm, "a", 1024);
// change h1's label to z, container should be killed
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("z")));
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.KILLED, 10 * 1000));
// check used resource:
// queue-a used x=0G, ""=1G ("" not changed)
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 1024);
// request a container with label = y
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
Assert.assertTrue(rm.waitForState(nm2, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
// queue-a used y=1G, ""=1G
checkUsedResource(rm, "a", 1024, "y");
checkUsedResource(rm, "a", 1024);
// change h2's label to no label, container should be killed
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0),
CommonNodeLabelsManager.EMPTY_STRING_SET));
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.KILLED, 10 * 1000));
// check used resource:
// queue-a used x=0G, y=0G, ""=1G ("" not changed)
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 1024);
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// change h3's label to z, AM container should be killed
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0),
toSet("z")));
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.KILLED, 10 * 1000));
// check used resource:
// queue-a used x=0G, y=0G, ""=1G ("" not changed)
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 0);
rm.close();
}
}